版本:DPDK-1.8.0
原作者:张云尧 出处:http://aidaiz.com/dpdk_l2fwd/
本例中实现了相邻端口之间的相互转发。
比如一共4个端口可用,那么端口1收到数据后会转发给端口2,端口2收到数据后会转发给端口1,端口3和端口4也会相互转发。
设置环境变量
1
2
export RTE\_SDK=/(RTE\_SDK)
#DPDK的路径
export RTE\_TARGET=x86\_64-native-linuxapp-gcc
#DPDK的编译目标
进入示例目录
1
cd /(RTE\_SDK)/example/l2wfd
编译
1
make
1
./build/l2wfd \[EAL options\] -- -p PORTMASK \[-q NQ -T t\]
EAL options
-p PORTMASK
PORTMASK:一个十六进制位掩码表示分配的端口数量。
-q NQ
NQ:表示分配给每个逻辑内核的收发队列数量。
-T t
t: 表示打印统计数据到屏幕上的时间间隔,默认为10秒。
1
./build/l2fwd -c f -n 4 -- -q 4 -p ffff
表示,分配给4个逻辑内核,每个内核分别有4个收发队列,而一共分配了16个端口。
1
2
3
ret = rte\_eal\_init(argc, argv);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"Invalid EAL arguments\\n");
1
2
3
ret = l2fwd\_parse\_args(argc, argv);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"Invalid L2FWD arguments\\n");
EAL参数传递已经在rte_eal_init()函数中完成了,这里主要传递“–”后面的参数。
传递参数之后,得到三个变量。
1
2
3
4
5
6
7
8
9
l2fwd\_pktmbuf\_pool =
rte\_mempool\_create(
"mbuf\_pool", NB\_MBUF,
MBUF\_SIZE,
32,
sizeof(struct rte\_pktmbuf\_pool\_private),
rte\_pktmbuf\_pool\_init,
NULL,
rte\_pktmbuf\_init,
NULL,
rte\_socket\_id(),
0);
if (l2fwd\_pktmbuf\_pool ==
NULL)
rte\_exit(EXIT\_FAILURE,
"Cannot init mbuf pool\\n");
1
2
3
4
5
6
7
//rte\_eth\_dev\_count()函数返回端口总数
nb\_ports = rte\_eth\_dev\_count();
if (nb\_ports ==
0)
rte\_exit(EXIT\_FAILURE,
"No Ethernet ports - bye\\n");
if (nb\_ports > RTE\_MAX\_ETHPORTS)
nb\_ports = RTE\_MAX\_ETHPORTS;
1
2
3
4
5
for (portid =
0; portid < nb\_ports; portid++) {
//跳过未分配或者不可用端口
if ((l2fwd\_enabled\_port\_mask & (
1 << portid)) ==
0)
continue;
}
可用端口位掩码表示,左数第n位如果为1,表示端口n可用,如果左数第n位如果为0,表示端口n不可用。
要得到第x位为1还是0,我们的方法是将1左移x位,得到一个只在x位为1,其他位都为0的数,再与位掩码相与。结果为1,那么第x位为1,结果位0,那么第x位为0.
这里设置数据包进入端口后,转发给相邻的端口。
每两个端口为一对,相互转发。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
for (portid =
0; portid < nb\_ports; portid++) {
if ((l2fwd\_enabled\_port\_mask & (
1 << portid)) ==
0)
continue;
if (nb\_ports\_in\_mask %
2) {
l2fwd\_dst\_ports\[portid\] = last\_port;
l2fwd\_dst\_ports\[last\_port\] = portid;
}
else
last\_port = portid;
nb\_ports\_in\_mask++;
rte\_eth\_dev\_info\_get(portid, &dev\_info);
}
if (nb\_ports\_in\_mask %
2) {
printf(
"Notice: odd number of ports in portmask.\\n");
l2fwd\_dst\_ports\[last\_port\] = last\_port;
}
为每个端口分配到相应的逻辑内核
每个端口只对应一个逻辑内核
每个逻辑内核对应l2fwd_rx_queue_per_lcore个端口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
for (portid =
0; portid < nb\_ports; portid++) {
if ((l2fwd\_enabled\_port\_mask & (
1 << portid)) ==
0)
continue;
//得到一个收取队列未分配满且可用的逻辑内核
while (rte\_lcore\_is\_enabled(rx\_lcore\_id) ==
0 ||
lcore\_queue\_conf\[rx\_lcore\_id\].n\_rx\_port ==
l2fwd\_rx\_queue\_per\_lcore) {
rx\_lcore\_id++;
if (rx\_lcore\_id >= RTE\_MAX\_LCORE)
rte\_exit(EXIT\_FAILURE,
"Not enough cores\\n");
}
if (qconf != &lcore\_queue\_conf\[rx\_lcore\_id\])
/\* Assigned a new logical core in the loop above. \*/
qconf = &lcore\_queue\_conf\[rx\_lcore\_id\];
qconf->rx\_port\_list\[qconf->n\_rx\_port\] = portid;
qconf->n\_rx\_port++;
printf(
"Lcore %u: RX port %u\\n", rx\_lcore\_id, (
unsigned) portid);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
for (portid =
0; portid < nb\_ports; portid++) {
if ((l2fwd\_enabled\_port\_mask & (
1 << portid)) ==
0) {
printf(
"Skipping disabled port %u\\n", (
unsigned) portid);
nb\_ports\_available--;
continue;
}
printf(
"Initializing port %u... ", (
unsigned) portid);
fflush(
stdout);
//初始化端口,第二个参数和第三个参数表示分配收取队列和发送队列的数量
ret = rte\_eth\_dev\_configure(portid,
1,
1, &port\_conf);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"Cannot configure device: err=%d, port=%u\\n",
ret, (
unsigned) portid);
//得到端口对应的mac地址,存入l2fwd\_ports\_eth\_addr\[\]数组
rte\_eth\_macaddr\_get(portid,&l2fwd\_ports\_eth\_addr\[portid\]);
fflush(
stdout);
//初始化一个收取队列,nb\_rxd指收取队列的大小,最大能够存储mbuf的数量
ret = rte\_eth\_rx\_queue\_setup(portid,
0, nb\_rxd,
rte\_eth\_dev\_socket\_id(portid),
NULL,
l2fwd\_pktmbuf\_pool);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"rte\_eth\_rx\_queue\_setup:err=%d, port=%u\\n",
ret, (
unsigned) portid);
fflush(
stdout);
//初始化一个发送队列,nb\_txd指发送队列的大小,最大能够存储mbuf的数量
ret = rte\_eth\_tx\_queue\_setup(portid,
0, nb\_txd,
rte\_eth\_dev\_socket\_id(portid),
NULL);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"rte\_eth\_tx\_queue\_setup:err=%d, port=%u\\n",
ret, (
unsigned) portid);
//开始运行该端口
ret = rte\_eth\_dev\_start(portid);
if (ret <
0)
rte\_exit(EXIT\_FAILURE,
"rte\_eth\_dev\_start:err=%d, port=%u\\n",
ret, (
unsigned) portid);
printf(
"done: \\n");
rte\_eth\_promiscuous\_enable(portid);
printf(
"Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\\n\\n",
(
unsigned) portid,
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
0\],
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
1\],
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
2\],
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
3\],
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
4\],
l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
5\]);
//初始化端口的统计数据
memset(&port\_statistics,
0,
sizeof(port\_statistics));
}
1
check\_all\_ports\_link\_status(nb\_ports, l2fwd\_enabled\_port\_mask);
1
2
3
4
5
rte\_eal\_mp\_remote\_launch(l2fwd\_launch\_one\_lcore,
NULL, CALL\_MASTER);
RTE\_LCORE\_FOREACH\_SLAVE(lcore\_id) {
if (rte\_eal\_wait\_lcore(lcore\_id) <
0)
return
\-1;
}
收包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
for (i =
0; i < qconf->n\_rx\_port; i++) {
portid = qconf->rx\_port\_list\[i\];
//收包,一次最多收取MAX\_PKT\_BURST个数据包
nb\_rx = rte\_eth\_rx\_burst((
uint8\_t) portid,
0,
pkts\_burst, MAX\_PKT\_BURST);
//更新统计数据
port\_statistics\[portid\].rx += nb\_rx;
for (j =
0; j < nb\_rx; j++) {
m = pkts\_burst\[j\];
rte\_prefetch0(rte\_pktmbuf\_mtod(m,
void \*));
//转发
l2fwd\_simple\_forward(m, portid);
}
}
转发
替换源MAC地址和目的MAC地址
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static void
l2fwd\_simple\_forward
(struct rte\_mbuf \*m, unsigned portid)
{
struct ether\_hdr \*eth;
void \*tmp;
unsigned dst\_port;
dst\_port = l2fwd\_dst\_ports\[portid\];
eth = rte\_pktmbuf\_mtod(m, struct ether\_hdr \*);
//目的地址
/\* 02:00:00:00:00:xx \*/
tmp = ð->d\_addr.addr\_bytes\[
0\];
\*((
uint64\_t \*)tmp) =
0x000000000002 + ((
uint64\_t)dst\_port <<
40);
//源地址
ether\_addr\_copy(&l2fwd\_ports\_eth\_addr\[dst\_port\], ð->s\_addr);
l2fwd\_send\_packet(m, (
uint8\_t) dst\_port);
}
将数据包推送至发送队列,如果发送队列存够MAX_PKT_BURST,即每次最大收取包的数量,就会发包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int
l2fwd\_send\_packet
(struct rte\_mbuf \*m, uint8\_t port)
{
unsigned lcore\_id, len;
struct lcore\_queue\_conf \*qconf;
lcore\_id = rte\_lcore\_id();
qconf = &lcore\_queue\_conf\[lcore\_id\];
len = qconf->tx\_mbufs\[port\].len;
qconf->tx\_mbufs\[port\].m\_table\[len\] = m;
len++;
//当发包队列存够MAX\_PKT\_BURST,发包
if (unlikely(len == MAX\_PKT\_BURST)) {
l2fwd\_send\_burst(qconf, MAX\_PKT\_BURST, port);
len =
0;
}
qconf->tx\_mbufs\[port\].len = len;
return
0;
}
每隔一定时间也会发包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//上次收包时间和这次收包时间差
diff\_tsc = cur\_tsc - prev\_tsc;
//如果时间差大于我们设定的阈值,这里是100us
if (unlikely(diff\_tsc > drain\_tsc)) {
for (portid =
0; portid < RTE\_MAX\_ETHPORTS; portid++) {
if (qconf->tx\_mbufs\[portid\].len ==
0)
continue;
//发包
l2fwd\_send\_burst(&lcore\_queue\_conf\[lcore\_id\],
qconf->tx\_mbufs\[portid\].len,
(
uint8\_t) portid);
qconf->tx\_mbufs\[portid\].len =
0;
}
if (timer\_period >
0) {
timer\_tsc += diff\_tsc;
//如果累积时间超过我们设定的阈值,就打印出统计数据,默认是10s
if (unlikely(timer\_tsc >= (
uint64\_t) timer\_period)) {
//打印数据在发生在主逻辑内核上
if (lcore\_id == rte\_get\_master\_lcore()) {
//打印统计数据
print\_stats();
//累积时间置零
timer\_tsc =
0;
}
}
}
prev\_tsc = cur\_tsc;
}
这两种情况都会产生发包,无论是发送队列存够阈值MAX_PKT_BURST,或者,时间差超过阈值brain_tsc,都会把发送队列上MAX_PKT_BURST个数据包推送出去,如果不足MAX_PKT_BURST,则把发送队列上全部数据包推送出去。
发包函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
static int
l2fwd\_send\_burst
(struct lcore\_queue\_conf \*qconf, unsigned n, uint8\_t port)
{
struct rte\_mbuf \*\*m\_table;
unsigned ret;
unsigned queueid =
0;
m\_table = (struct rte\_mbuf \*\*)qconf->tx\_mbufs\[port\].m\_table;
//发包
ret = rte\_eth\_tx\_burst(port, (
uint16\_t) queueid, m\_table, (
uint16\_t) n);
//更新统计数据
port\_statistics\[port\].tx += ret;
//丢包
if (unlikely(ret < n)) {
//更新统计数据
port\_statistics\[port\].dropped += (n - ret);
do {
//把丢包部分free掉
rte\_pktmbuf\_free(m\_table\[ret\]);
}
while (++ret < n);
}
return
0;
}
在函数rte_eth_tx_burst()中:
手机扫一扫
移动阅读更方便
你可能感兴趣的文章