DPDK-l2fwd详解
阅读原文时间:2021年04月21日阅读:1

概要

版本:DPDK-1.8.0

原作者:张云尧       出处:http://aidaiz.com/dpdk_l2fwd/

本例中实现了相邻端口之间的相互转发。
比如一共4个端口可用,那么端口1收到数据后会转发给端口2,端口2收到数据后会转发给端口1,端口3和端口4也会相互转发。

编译

设置环境变量

  1




  2




  export RTE\_SDK=/(RTE\_SDK) 
  #DPDK的路径




  export RTE\_TARGET=x86\_64-native-linuxapp-gcc 
  #DPDK的编译目标

进入示例目录

  1




  cd /(RTE\_SDK)/example/l2wfd

编译

  1




  make

运行

  1




  ./build/l2wfd \[EAL options\] -- -p PORTMASK \[-q NQ -T t\]
  • EAL options

    • DPDK EAL的默认参数,必须参数为-c COREMASK -n NUM。
    • COREMASK:一个十六进制位掩码表示分配的逻辑内核数量。
    • NUM:一个十进制整数表示内存通道数量。
  • -p PORTMASK
    PORTMASK:一个十六进制位掩码表示分配的端口数量。

  • -q NQ
    NQ:表示分配给每个逻辑内核的收发队列数量。

  • -T t
    t: 表示打印统计数据到屏幕上的时间间隔,默认为10秒。

    1

    ./build/l2fwd -c f -n 4 -- -q 4 -p ffff

表示,分配给4个逻辑内核,每个内核分别有4个收发队列,而一共分配了16个端口。

详解

初始化EAL(Environment Abstraciton Layer)

  1




  2




  3




  ret = rte\_eal\_init(argc, argv);




  if (ret < 
  0)




      rte\_exit(EXIT\_FAILURE, 
  "Invalid EAL arguments\\n");

参数传递

  1




  2




  3




  ret = l2fwd\_parse\_args(argc, argv);




  if (ret < 
  0)




      rte\_exit(EXIT\_FAILURE, 
  "Invalid L2FWD arguments\\n");

EAL参数传递已经在rte_eal_init()函数中完成了,这里主要传递“–”后面的参数。
传递参数之后,得到三个变量。

  • l2fwd_enabled_port_mask:可用端口位掩码
  • l2fwd_rx_queue_per_lcore:每个逻辑内核的收取队列数量
  • timer_period:打印统计数据的时间间隔

创建内存池

  1




  2




  3




  4




  5




  6




  7




  8




  9




  l2fwd\_pktmbuf\_pool =




      rte\_mempool\_create(
  "mbuf\_pool", NB\_MBUF,




                 MBUF\_SIZE, 
  32,





  sizeof(struct rte\_pktmbuf\_pool\_private),




                 rte\_pktmbuf\_pool\_init, 
  NULL,




                 rte\_pktmbuf\_init, 
  NULL,




                 rte\_socket\_id(), 
  0);




  if (l2fwd\_pktmbuf\_pool == 
  NULL)




      rte\_exit(EXIT\_FAILURE, 
  "Cannot init mbuf pool\\n");
  • “mbuf_pool”:内存池的名称
  • NB_MBUF:内存池中存储mbuf的数量
  • MBUF_SIZE: mbuf的大小
  • 32:内存池缓存的大小

端口处理

  1




  2




  3




  4




  5




  6




  7




  //rte\_eth\_dev\_count()函数返回端口总数




  nb\_ports = rte\_eth\_dev\_count();




  if (nb\_ports == 
  0)




      rte\_exit(EXIT\_FAILURE, 
  "No Ethernet ports - bye\\n");






  if (nb\_ports > RTE\_MAX\_ETHPORTS)




      nb\_ports = RTE\_MAX\_ETHPORTS;




  1




  2




  3




  4




  5




  for (portid = 
  0; portid < nb\_ports; portid++) {





  //跳过未分配或者不可用端口





  if ((l2fwd\_enabled\_port\_mask & (
  1 << portid)) == 
  0)





  continue;




  }

可用端口位掩码表示,左数第n位如果为1,表示端口n可用,如果左数第n位如果为0,表示端口n不可用。
要得到第x位为1还是0,我们的方法是将1左移x位,得到一个只在x位为1,其他位都为0的数,再与位掩码相与。结果为1,那么第x位为1,结果位0,那么第x位为0.

设置每个端口的目的端口

这里设置数据包进入端口后,转发给相邻的端口。
每两个端口为一对,相互转发。

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  for (portid = 
  0; portid < nb\_ports; portid++) {





  if ((l2fwd\_enabled\_port\_mask & (
  1 << portid)) == 
  0)





  continue;







  if (nb\_ports\_in\_mask % 
  2) {




          l2fwd\_dst\_ports\[portid\] = last\_port;




          l2fwd\_dst\_ports\[last\_port\] = portid;




      }





  else




          last\_port = portid;






      nb\_ports\_in\_mask++;






      rte\_eth\_dev\_info\_get(portid, &dev\_info);




  }




  if (nb\_ports\_in\_mask % 
  2) {





  printf(
  "Notice: odd number of ports in portmask.\\n");




      l2fwd\_dst\_ports\[last\_port\] = last\_port;




  }

初始化端口的配置信息

为每个端口分配到相应的逻辑内核
每个端口只对应一个逻辑内核
每个逻辑内核对应l2fwd_rx_queue_per_lcore个端口

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  21




  for (portid = 
  0; portid < nb\_ports; portid++) {





  if ((l2fwd\_enabled\_port\_mask & (
  1 << portid)) == 
  0)





  continue;







  //得到一个收取队列未分配满且可用的逻辑内核





  while (rte\_lcore\_is\_enabled(rx\_lcore\_id) == 
  0 ||




             lcore\_queue\_conf\[rx\_lcore\_id\].n\_rx\_port ==




             l2fwd\_rx\_queue\_per\_lcore) {




          rx\_lcore\_id++;





  if (rx\_lcore\_id >= RTE\_MAX\_LCORE)




              rte\_exit(EXIT\_FAILURE, 
  "Not enough cores\\n");




      }







  if (qconf != &lcore\_queue\_conf\[rx\_lcore\_id\])





  /\* Assigned a new logical core in the loop above. \*/




          qconf = &lcore\_queue\_conf\[rx\_lcore\_id\];






      qconf->rx\_port\_list\[qconf->n\_rx\_port\] = portid;




      qconf->n\_rx\_port++;





  printf(
  "Lcore %u: RX port %u\\n", rx\_lcore\_id, (
  unsigned) portid);




  }

初始化每个端口

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  21




  22




  23




  24




  25




  26




  27




  28




  29




  30




  31




  32




  33




  34




  35




  36




  37




  38




  39




  40




  41




  42




  43




  44




  45




  46




  47




  48




  49




  50




  51




  52




  53




  54




  55




  56




  57




  58




  59




  for (portid = 
  0; portid < nb\_ports; portid++) {





  if ((l2fwd\_enabled\_port\_mask & (
  1 << portid)) == 
  0) {





  printf(
  "Skipping disabled port %u\\n", (
  unsigned) portid);




          nb\_ports\_available--;





  continue;




      }







  printf(
  "Initializing port %u... ", (
  unsigned) portid);




      fflush(
  stdout);





  //初始化端口,第二个参数和第三个参数表示分配收取队列和发送队列的数量




      ret = rte\_eth\_dev\_configure(portid, 
  1, 
  1, &port\_conf);





  if (ret < 
  0)




          rte\_exit(EXIT\_FAILURE, 
  "Cannot configure device: err=%d, port=%u\\n",




                ret, (
  unsigned) portid);







  //得到端口对应的mac地址,存入l2fwd\_ports\_eth\_addr\[\]数组




      rte\_eth\_macaddr\_get(portid,&l2fwd\_ports\_eth\_addr\[portid\]);






      fflush(
  stdout);





  //初始化一个收取队列,nb\_rxd指收取队列的大小,最大能够存储mbuf的数量




      ret = rte\_eth\_rx\_queue\_setup(portid, 
  0, nb\_rxd,




                       rte\_eth\_dev\_socket\_id(portid),





  NULL,




                       l2fwd\_pktmbuf\_pool);





  if (ret < 
  0)




          rte\_exit(EXIT\_FAILURE, 
  "rte\_eth\_rx\_queue\_setup:err=%d, port=%u\\n",




                ret, (
  unsigned) portid);






      fflush(
  stdout);





  //初始化一个发送队列,nb\_txd指发送队列的大小,最大能够存储mbuf的数量




      ret = rte\_eth\_tx\_queue\_setup(portid, 
  0, nb\_txd,




              rte\_eth\_dev\_socket\_id(portid),





  NULL);





  if (ret < 
  0)




          rte\_exit(EXIT\_FAILURE, 
  "rte\_eth\_tx\_queue\_setup:err=%d, port=%u\\n",




              ret, (
  unsigned) portid);







  //开始运行该端口




      ret = rte\_eth\_dev\_start(portid);





  if (ret < 
  0)




          rte\_exit(EXIT\_FAILURE, 
  "rte\_eth\_dev\_start:err=%d, port=%u\\n",




                ret, (
  unsigned) portid);







  printf(
  "done: \\n");






      rte\_eth\_promiscuous\_enable(portid);







  printf(
  "Port %u, MAC address: %02X:%02X:%02X:%02X:%02X:%02X\\n\\n",




              (
  unsigned) portid,




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  0\],




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  1\],




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  2\],




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  3\],




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  4\],




              l2fwd\_ports\_eth\_addr\[portid\].addr\_bytes\[
  5\]);







  //初始化端口的统计数据





  memset(&port\_statistics, 
  0, 
  sizeof(port\_statistics));




  }

检查每个端口的连接状态

  1




  check\_all\_ports\_link\_status(nb\_ports, l2fwd\_enabled\_port\_mask);

在每个逻辑内核上启动线程,开始转发

  1




  2




  3




  4




  5




  rte\_eal\_mp\_remote\_launch(l2fwd\_launch\_one\_lcore, 
  NULL, CALL\_MASTER);




  RTE\_LCORE\_FOREACH\_SLAVE(lcore\_id) {





  if (rte\_eal\_wait\_lcore(lcore\_id) < 
  0)





  return 
  \-1;




  }

收包

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  for (i = 
  0; i < qconf->n\_rx\_port; i++) {






      portid = qconf->rx\_port\_list\[i\];





  //收包,一次最多收取MAX\_PKT\_BURST个数据包




      nb\_rx = rte\_eth\_rx\_burst((
  uint8\_t) portid, 
  0,




                   pkts\_burst, MAX\_PKT\_BURST);







  //更新统计数据




      port\_statistics\[portid\].rx += nb\_rx;







  for (j = 
  0; j < nb\_rx; j++) {




          m = pkts\_burst\[j\];




          rte\_prefetch0(rte\_pktmbuf\_mtod(m, 
  void \*));





  //转发




          l2fwd\_simple\_forward(m, portid);




      }




  }

转发
替换源MAC地址和目的MAC地址

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  static void




  l2fwd\_simple\_forward
  (struct rte\_mbuf \*m, unsigned portid)




  {





  struct ether\_hdr \*eth;





  void \*tmp;





  unsigned dst\_port;






      dst\_port = l2fwd\_dst\_ports\[portid\];




      eth = rte\_pktmbuf\_mtod(m, struct ether\_hdr \*);







  //目的地址





  /\* 02:00:00:00:00:xx \*/




      tmp = &eth->d\_addr.addr\_bytes\[
  0\];




      \*((
  uint64\_t \*)tmp) = 
  0x000000000002 + ((
  uint64\_t)dst\_port << 
  40);







  //源地址




      ether\_addr\_copy(&l2fwd\_ports\_eth\_addr\[dst\_port\], &eth->s\_addr);






      l2fwd\_send\_packet(m, (
  uint8\_t) dst\_port);




  }

将数据包推送至发送队列,如果发送队列存够MAX_PKT_BURST,即每次最大收取包的数量,就会发包

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  21




  22




  static int




  l2fwd\_send\_packet
  (struct rte\_mbuf \*m, uint8\_t port)




  {





  unsigned lcore\_id, len;





  struct lcore\_queue\_conf \*qconf;






      lcore\_id = rte\_lcore\_id();






      qconf = &lcore\_queue\_conf\[lcore\_id\];




      len = qconf->tx\_mbufs\[port\].len;




      qconf->tx\_mbufs\[port\].m\_table\[len\] = m;




      len++;







  //当发包队列存够MAX\_PKT\_BURST,发包





  if (unlikely(len == MAX\_PKT\_BURST)) {




          l2fwd\_send\_burst(qconf, MAX\_PKT\_BURST, port);




          len = 
  0;




      }






      qconf->tx\_mbufs\[port\].len = len;





  return 
  0;




  }

每隔一定时间也会发包

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  21




  22




  23




  24




  25




  26




  27




  28




  29




  30




  31




  32




  33




  34




  //上次收包时间和这次收包时间差




  diff\_tsc = cur\_tsc - prev\_tsc;




  //如果时间差大于我们设定的阈值,这里是100us




  if (unlikely(diff\_tsc > drain\_tsc)) {







  for (portid = 
  0; portid < RTE\_MAX\_ETHPORTS; portid++) {            





  if (qconf->tx\_mbufs\[portid\].len == 
  0)





  continue;





  //发包




          l2fwd\_send\_burst(&lcore\_queue\_conf\[lcore\_id\],




                   qconf->tx\_mbufs\[portid\].len,




                   (
  uint8\_t) portid);




                  qconf->tx\_mbufs\[portid\].len = 
  0;




      }







  if (timer\_period > 
  0) {






          timer\_tsc += diff\_tsc;







  //如果累积时间超过我们设定的阈值,就打印出统计数据,默认是10s





  if (unlikely(timer\_tsc >= (
  uint64\_t) timer\_period)) {







  //打印数据在发生在主逻辑内核上





  if (lcore\_id == rte\_get\_master\_lcore()) {





  //打印统计数据




                  print\_stats();





  //累积时间置零




                  timer\_tsc = 
  0;




              }




          }




      }






      prev\_tsc = cur\_tsc;




  }

这两种情况都会产生发包,无论是发送队列存够阈值MAX_PKT_BURST,或者,时间差超过阈值brain_tsc,都会把发送队列上MAX_PKT_BURST个数据包推送出去,如果不足MAX_PKT_BURST,则把发送队列上全部数据包推送出去。

发包函数

  1




  2




  3




  4




  5




  6




  7




  8




  9




  10




  11




  12




  13




  14




  15




  16




  17




  18




  19




  20




  21




  22




  23




  24




  static int




  l2fwd\_send\_burst
  (struct lcore\_queue\_conf \*qconf, unsigned n, uint8\_t port)




  {





  struct rte\_mbuf \*\*m\_table;





  unsigned ret;





  unsigned queueid =
  0;






      m\_table = (struct rte\_mbuf \*\*)qconf->tx\_mbufs\[port\].m\_table;





  //发包




      ret = rte\_eth\_tx\_burst(port, (
  uint16\_t) queueid, m\_table, (
  uint16\_t) n);





  //更新统计数据




      port\_statistics\[port\].tx += ret;





  //丢包





  if (unlikely(ret < n)) {





  //更新统计数据




          port\_statistics\[port\].dropped += (n - ret);





  do {





  //把丢包部分free掉




              rte\_pktmbuf\_free(m\_table\[ret\]);




          } 
  while (++ret < n);




      }







  return 
  0;




  }

在函数rte_eth_tx_burst()中:

  • port:端口号。
  • queueid:端口中的发送队列号。本例中每个端口都只有一个发送队列,所以固定为0。
  • m_table:**rte_mbuf数据
  • n:发送包的数量

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章