dpdk网卡收包分析
阅读原文时间:2023年07月11日阅读:1

一个网络报文从网卡接收到被应用处理,中间主要需要经历两个阶段:

阶段一:网卡通过其DMA硬件将收到的报文写入到收包队列中(入队)
阶段二:应用从收包队列中读取报文(出队)
由于目前正在使用vpp/dpdk 优化waf引擎的工作,所以就看看ixgbe网卡在dpdk框架下是怎么工作的。
下面分别介绍一下 收包队列结构 初始化(使能) 收包流程

收发包的配置最主要的工作就是配置网卡的收发队列,设置DMA拷贝数据包的地址等。使用数据包时,只要去对应队列取出指定地址的数据即可;主题配置函数见 rte_eth_dev_configure ;当收发队列配置完成后,就调用设备的配置函数,进行最后的配置。(*dev->dev_ops->dev_configure)(dev),-----进入ixgbe_dev_configure()来分析其过程,主要是调用了ixgbe_check_mq_mode()来检查队列的模式。然后设置允许接收批量和向量的模式

2.数据包的获取和发送,主要是从队列中获取到数据包或者把数据包放到队列中。
收包队列的构造主要是通过网卡队列设置函数 rte_eth_rx_queue_setup设置相关参数;最后,调用到队列的setup函数做最后的初始化。ret = (*dev->dev_ops->rx_queue_setup)(dev, rx_queue_id, nb_rx_desc,
socket_id, rx_conf, mp);
对于ixgbe设备,rx_queue_setup就是函数ixgbe_dev_rx_queue_setup()

说一说主要的结构体:

/* Receive Descriptor - Advanced
pkt_addr:报文数据的物理地址,网卡DMA将报文数据通过该物理地址写入
对应的内存空间。
hdr_addr:报文的头信息,hdr_addr的最后一个bit为DD位,因为是union结构,
即status_error的最后一个bit也对应DD位。
网卡每次来了新的数据包,就检查rx_ring当前这个buf的DD位是否为0,
如果为0那么表示当前buf可以使用,就让DMA将数据包copy到这个buf中,
然后设置DD为1。如果为1,那么网卡就认为rx_ring队列满了,
直接会将这个包给丢弃掉,记录一次imiss。(0->1)*/
union ixgbe_adv_rx_desc {
struct {
__le64 pkt_addr; /* Packet buffer address */
__le64 hdr_addr; /* Header buffer address */
} read;
struct {
struct {
union {
__le32 data;
struct {
__le16 pkt_info; /* RSS, Pkt type */
__le16 hdr_info; /* Splithdr, hdrlen */
} hs_rss;
} lo_dword;
union {
__le32 rss; /* RSS Hash */
struct {
__le16 ip_id; /* IP id */
__le16 csum; /* Packet Checksum */
} csum_ip;
} hi_dword;
} lower;
struct {
__le32 status_error; /* ext status/error */
__le16 length; /* Packet length */
__le16 vlan; /* VLAN tag */
} upper;
} wb; /* writeback */
};
/**
* Structure associated with each descriptor of the RX ring of a RX queue.
sw_ring是由一个动态申请的数组构建的环形队列,队列的元素是ixgbe_rx_entry类型,
队列的大小可配,一般最大可配4096
mbuf:报文mbuf结构指针,mbuf用于管理一个报文,主要包含报文相关信息和报文数据。
*/
struct ixgbe_rx_entry {
struct rte_mbuf *mbuf; /**< mbuf associated with RX descriptor. */
};
/**
* Structure associated with each RX queue.
*/
struct ixgbe_rx_queue {
struct rte_mempool *mb_pool; /**< mbuf pool to populate RX ring. */
/*rx_ring主要存储报文数据的物理地址,物理地址供网卡DMA使用,
也称为DMA地址(硬件使用物理地址,将报文copy到报文物理位置上)。*/
volatile union ixgbe_adv_rx_desc *rx_ring; /**< RX ring virtual address. */
uint64_t rx_ring_phys_addr; /**< RX ring DMA address. */
volatile uint32_t *rdt_reg_addr; /**< RDT register address. */
volatile uint32_t *rdh_reg_addr; /**< RDH register address. */
/*sw_ring主要存储报文数据的虚拟地址,虚拟地址供应用使用
(软件使用虚拟地址,读取报文)报文数据的物理地址可以由报文数据的虚拟地址转化得到。*/
struct ixgbe_rx_entry *sw_ring; /**< address of RX software ring. */
struct ixgbe_scattered_rx_entry *sw_sc_ring; /**< address of scattered Rx software ring. */
struct rte_mbuf *pkt_first_seg; /**< First segment of current packet. */
struct rte_mbuf *pkt_last_seg; /**< Last segment of current packet. */
uint64_t mbuf_initializer; /**< value to init mbufs */
uint16_t nb_rx_desc; /**< number of RX descriptors. */
uint16_t rx_tail; /**< current value of RDT register. */
uint16_t nb_rx_hold; /**< number of held free RX desc. */
uint16_t rx_nb_avail; /**< nr of staged pkts ready to ret to app */
uint16_t rx_next_avail; /**< idx of next staged pkt to ret to app */
uint16_t rx_free_trigger; /**< triggers rx buffer allocation */
uint8_t rx_using_sse;
/**< indicates that vector RX is in use */
#ifdef RTE_LIBRTE_SECURITY
uint8_t using_ipsec;
/**< indicates that IPsec RX feature is in use */
#endif
#ifdef RTE_IXGBE_INC_VECTOR
uint16_t rxrearm_nb; /**< number of remaining to be re-armed */
uint16_t rxrearm_start; /**< the idx we start the re-arming from */
#endif
uint16_t rx_free_thresh; /**< max free RX desc to hold. */
uint16_t queue_id; /**< RX queue index. */
uint16_t reg_idx; /**< RX queue register index. */
uint16_t pkt_type_mask; /**< Packet type mask for different NICs. */
uint16_t port_id; /**< Device port identifier. */
uint8_t crc_len; /**< 0 if CRC stripped, 4 otherwise. */
uint8_t drop_en; /**< If not 0, set SRRCTL.Drop_En. */
uint8_t rx_deferred_start; /**< not in global dev start. */
/** flags to set in mbuf when a vlan is detected. */
uint64_t vlan_flags;
uint64_t offloads; /**< Rx offloads with DEV_RX_OFFLOAD_* */
/** need to alloc dummy mbuf, for wraparound when scanning hw ring */
struct rte_mbuf fake_mbuf;
/** hold packets to return to application */
struct rte_mbuf *rx_stage[RTE_PMD_IXGBE_RX_MAX_BURST*2];
};`

收包队列的启动主要是通过调用rte_eth_dev_start

DPDK是零拷贝的,那么分配的mem_pool中的对象怎么和队列以及驱动联系起来呢????

设备的启动是从rte_eth_dev_start()中开始,会调用

diag = (*dev->dev_ops->dev_start)(dev);

找到设备启动的真正启动函数:ixgbe_dev_start

其中队列初始化流程函数为:

ixgbe_alloc_rx_queue_mbufs(struct ixgbe_rx_queue *rxq)
{
struct ixgbe_rx_entry *rxe = rxq->sw_ring;
uint64_t dma_addr;
unsigned int i;

/\* Initialize software ring entries  
队列所属内存池的ring中循环取出了nb\_rx\_desc个mbuf指针,  
填充rxq->sw\_ring。每个指针都指向内存池里的一个数据包空间  
然后就先填充了新分配的mbuf结构,最最重要的是填充计算了dma\_addr  
初始化queue ring,即rxd的信息,标明了驱动把数据包放在dma\_addr处。  
最后把分配的mbuf“放入”queue 的sw\_ring中,  
这样,驱动收过来的包,就直接放在了sw\_ring中。  
\*/  
for (i = 0; i < rxq->nb\_rx\_desc; i++) {  
    volatile union ixgbe\_adv\_rx\_desc \*rxd;  
    struct rte\_mbuf \*mbuf = rte\_mbuf\_raw\_alloc(rxq->mb\_pool);

    if (mbuf == NULL) {  
        PMD\_INIT\_LOG(ERR, "RX mbuf alloc failed queue\_id=%u",  
                 (unsigned) rxq->queue\_id);  
        return -ENOMEM;  
    }

    mbuf->data\_off = RTE\_PKTMBUF\_HEADROOM;  
    mbuf->port = rxq->port\_id;

    dma\_addr =  
        rte\_cpu\_to\_le\_64(rte\_mbuf\_data\_iova\_default(mbuf));  
    rxd = &rxq->rx\_ring\[i\];  
    rxd->read.hdr\_addr = 0;  
    rxd->read.pkt\_addr = dma\_addr;  
    rxe\[i\].mbuf = mbuf;  
}

return 0;  

}

数据包的获取

网卡收到报文后,先存于网卡本地的buffer-Rx(Rx FIFO)中,然后由DMA通过PCI总线将报文数据写入操作系统的内存中,即数据报文完成入队操作,那么数据包的获取就是指上层应用从队列中去取出这些数据包

业务层面获取数据包是从rte_eth_rx_burst()开始:

int16_t nb_rx = (*dev->rx_pkt_burst)(dev->data->rx_queues[queue_id], rx_pkts, nb_pkts);

这里的dev->rx_pkt_burst在驱动初始化的时候已经注册过了,对于ixgbe设备,就是ixgbe_recv_pkts()函数

uint16_t
ixgbe_recv_pkts(void *rx_queue, struct rte_mbuf **rx_pkts,
uint16_t nb_pkts)
{
struct ixgbe_rx_queue *rxq;
volatile union ixgbe_adv_rx_desc *rx_ring;
volatile union ixgbe_adv_rx_desc *rxdp;
struct ixgbe_rx_entry *sw_ring;
struct ixgbe_rx_entry *rxe;
struct rte_mbuf *rxm;
struct rte_mbuf *nmb;
union ixgbe_adv_rx_desc rxd;
uint64_t dma_addr;
uint32_t staterr;
uint32_t pkt_info;
uint16_t pkt_len;
uint16_t rx_id;
uint16_t nb_rx;
uint16_t nb_hold;
uint64_t pkt_flags;
uint64_t vlan_flags;

nb\_rx = 0;  
nb\_hold = 0;  
rxq = rx\_queue;  
rx\_id = rxq->rx\_tail;//从队列的tail位置开始取包  
rx\_ring = rxq->rx\_ring;  
sw\_ring = rxq->sw\_ring;  
vlan\_flags = rxq->vlan\_flags;  
while (nb\_rx < nb\_pkts) {//循环获取nb\_pkts个包  
    /\*  
     \* The order of operations here is important as the DD status  
     \* bit must not be read after any other descriptor fields.  
     \* rx\_ring and rxdp are pointing to volatile data so the order  
     \* of accesses cannot be reordered by the compiler. If they were  
     \* not volatile, they could be reordered which could lead to  
     \* using invalid descriptor fields when read from rxd.  
     \*/  
    rxdp = &rx\_ring\[rx\_id\];  
    staterr = rxdp->wb.upper.status\_error;  
//检查DD位是否为1,是1则说明该位置已放入数据包,否则表示没有报文,退出  
    if (!(staterr & rte\_cpu\_to\_le\_32(IXGBE\_RXDADV\_STAT\_DD)))  
        break;  
    rxd = \*rxdp;

    /\*  
     \* End of packet.  
     \*  
     \* If the IXGBE\_RXDADV\_STAT\_EOP flag is not set, the RX packet  
     \* is likely to be invalid and to be dropped by the various  
     \* validation checks performed by the network stack.  
     \*  
     \* Allocate a new mbuf to replenish the RX ring descriptor.  
     \* If the allocation fails:  
     \*    - arrange for that RX descriptor to be the first one  
     \*      being parsed the next time the receive function is  
     \*      invoked \[on the same queue\].  
     \*  
     \*    - Stop parsing the RX ring and return immediately.  
     \*  
     \* This policy do not drop the packet received in the RX  
     \* descriptor for which the allocation of a new mbuf failed.  
     \* Thus, it allows that packet to be later retrieved if  
     \* mbuf have been freed in the mean time.  
     \* As a side effect, holding RX descriptors instead of  
     \* systematically giving them back to the NIC may lead to  
     \* RX ring exhaustion situations.  
     \* However, the NIC can gracefully prevent such situations  
     \* to happen by sending specific "back-pressure" flow control  
     \* frames to its peer(s).  
     \*/  
    PMD\_RX\_LOG(DEBUG, "port\_id=%u queue\_id=%u rx\_id=%u "  
           "ext\_err\_stat=0x%08x pkt\_len=%u",  
           (unsigned) rxq->port\_id, (unsigned) rxq->queue\_id,  
           (unsigned) rx\_id, (unsigned) staterr,  
           (unsigned) rte\_le\_to\_cpu\_16(rxd.wb.upper.length));  
    //申请一个mbuf(nmb),用于交换  
    nmb = rte\_mbuf\_raw\_alloc(rxq->mb\_pool);  
    if (nmb == NULL) {  
        PMD\_RX\_LOG(DEBUG, "RX mbuf alloc failed port\_id=%u "  
               "queue\_id=%u", (unsigned) rxq->port\_id,  
               (unsigned) rxq->queue\_id);  
        rte\_eth\_devices\[rxq->port\_id\].data->rx\_mbuf\_alloc\_failed++;  
        break;  
    }

    nb\_hold++;

    rxe = &sw\_ring\[rx\_id\];  
    rx\_id++;  
    if (rx\_id == rxq->nb\_rx\_desc)  
        rx\_id = 0;

    /\* Prefetch next mbuf while processing current one. \*/  
    rte\_ixgbe\_prefetch(sw\_ring\[rx\_id\].mbuf);

    /\*  
     \* When next RX descriptor is on a cache-line boundary,  
     \* prefetch the next 4 RX descriptors and the next 8 pointers  
     \* to mbufs.  
     \*/  
    if ((rx\_id & 0x3) == 0) {  
        rte\_ixgbe\_prefetch(&rx\_ring\[rx\_id\]);  
        rte\_ixgbe\_prefetch(&sw\_ring\[rx\_id\]);  
    }  
    //从sw\_ring中读取一个报文mbuf(存入rxm)  
    rxm = rxe->mbuf;  
     //往sw\_ring中填空一个新报文mbuf(nmb)  
    rxe->mbuf = nmb;  
     //新mbuf对应的报文数据物理地址填入rx\_ring对应位置,并将hdr\_addr置0(DD位置0)  
    dma\_addr =  
        rte\_cpu\_to\_le\_64(rte\_mbuf\_data\_iova\_default(nmb));  
    rxdp->read.hdr\_addr = 0;  
    rxdp->read.pkt\_addr = dma\_addr;

    /\*  
     \* Initialize the returned mbuf.  
     \* 1) setup generic mbuf fields:  
     \*    - number of segments,  
     \*    - next segment,  
     \*    - packet length,  
     \*    - RX port identifier.  
     \* 2) integrate hardware offload data, if any:  
     \*    - RSS flag & hash,  
     \*    - IP checksum flag,  
     \*    - VLAN TCI, if any,  
     \*    - error flags.  
     \*/  
    pkt\_len = (uint16\_t) (rte\_le\_to\_cpu\_16(rxd.wb.upper.length) -  
                  rxq->crc\_len);  
    //对读取mbuf的报文信息进行初始化  
    rxm->data\_off = RTE\_PKTMBUF\_HEADROOM;  
    rte\_packet\_prefetch((char \*)rxm->buf\_addr + rxm->data\_off);  
    rxm->nb\_segs = 1;  
    rxm->next = NULL;  
    rxm->pkt\_len = pkt\_len;  
    rxm->data\_len = pkt\_len;  
    rxm->port = rxq->port\_id;

    pkt\_info = rte\_le\_to\_cpu\_32(rxd.wb.lower.lo\_dword.data);  
    /\* Only valid if PKT\_RX\_VLAN set in pkt\_flags \*/  
    rxm->vlan\_tci = rte\_le\_to\_cpu\_16(rxd.wb.upper.vlan);

    pkt\_flags = rx\_desc\_status\_to\_pkt\_flags(staterr, vlan\_flags);  
    pkt\_flags = pkt\_flags | rx\_desc\_error\_to\_pkt\_flags(staterr);  
    pkt\_flags = pkt\_flags |  
        ixgbe\_rxd\_pkt\_info\_to\_pkt\_flags((uint16\_t)pkt\_info);  
    rxm->ol\_flags = pkt\_flags;  
    rxm->packet\_type =  
        ixgbe\_rxd\_pkt\_info\_to\_pkt\_type(pkt\_info,  
                           rxq->pkt\_type\_mask);

    if (likely(pkt\_flags & PKT\_RX\_RSS\_HASH))  
        rxm->hash.rss = rte\_le\_to\_cpu\_32(  
                    rxd.wb.lower.hi\_dword.rss);  
    else if (pkt\_flags & PKT\_RX\_FDIR) {  
        rxm->hash.fdir.hash = rte\_le\_to\_cpu\_16(  
                rxd.wb.lower.hi\_dword.csum\_ip.csum) &  
                IXGBE\_ATR\_HASH\_MASK;  
        rxm->hash.fdir.id = rte\_le\_to\_cpu\_16(  
                rxd.wb.lower.hi\_dword.csum\_ip.ip\_id);  
    }  
    /\*  
     \* Store the mbuf address into the next entry of the array  
     \* of returned packets.  
     \*///读取的报文mbuf存入rx\_pkts  
    rx\_pkts\[nb\_rx++\] = rxm;  
}  
rxq->rx\_tail = rx\_id;

/\*  
 \* If the number of free RX descriptors is greater than the RX free  
 \* threshold of the queue, advance the Receive Descriptor Tail (RDT)  
 \* register.  
 \* Update the RDT with the value of the last processed RX descriptor  
 \* minus 1, to guarantee that the RDT register is never equal to the  
 \* RDH register, which creates a "full" ring situtation from the  
 \* hardware point of view...  
 \*/  
nb\_hold = (uint16\_t) (nb\_hold + rxq->nb\_rx\_hold);  
if (nb\_hold > rxq->rx\_free\_thresh) {  
    PMD\_RX\_LOG(DEBUG, "port\_id=%u queue\_id=%u rx\_tail=%u "  
           "nb\_hold=%u nb\_rx=%u",  
           (unsigned) rxq->port\_id, (unsigned) rxq->queue\_id,  
           (unsigned) rx\_id, (unsigned) nb\_hold,  
           (unsigned) nb\_rx);  
    rx\_id = (uint16\_t) ((rx\_id == 0) ?  
                 (rxq->nb\_rx\_desc - 1) : (rx\_id - 1));  
    IXGBE\_PCI\_REG\_WRITE(rxq->rdt\_reg\_addr, rx\_id);  
    nb\_hold = 0;  
}  
rxq->nb\_rx\_hold = nb\_hold;  
return nb\_rx;  

}

手机扫一扫

移动阅读更方便

阿里云服务器
腾讯云服务器
七牛云服务器

你可能感兴趣的文章