OVS-DPDK 流表查询详解
阅读原文时间:2023年07月08日阅读:3

一图胜千言:

flow和miniflow

在介绍之前先说一些概念:里面有两个结构很重要,一个是flow一个是miniflow这里介绍一下他们的数据结构和构造函数。

flow的特点是8字节对齐的,存储报文相关字段和其他原数据,用于匹配流表,数据包含四个层次:

  1. metadata: 入端口号,寄存器等信息
  2. l2: 源目的mac,vlan和mpls等信息
  3. l3: ipv4/ipv6源目的ip,ttl等信息
  4. l4: 源目的端口号,icmp code和type等信息。

flow的坏处就是占用了很大的字节,并且有很多字段都是0,在2.8版本中flow的大小是672字节。

miniflow是flow的压缩版,因为flow占用字节很大,比如可以支持ARP,IP等报文,填充了arp字段,icmp报文就是空的了,浪费了很多信息。过程中用到hash作为key,也是根据miniflow计算hash值,不是用的flow。

struct miniflow {
struct flowmap map;
};
struct flowmap {
map_t bits[FLOWMAP_UNITS];
};

miniflow其包含两部分内容:

  1. struct flowmap map;是bit数组,使用其中的bit表示flow中哪个8字节存在有效数据,flow中占多少个8字节,那么就需要map中多个个bit,并且按照64bit向上取整。
  2. 第二部分是有效数据,有效数据动态分配,根据struct flowmap map;中1bit数个数进行分配,大小为bit数*8字节,该部分直接跟在map后面。该部分存储在netdev_flow_key结构中的buf数组。

miniflow数据结构:

//flow是8字节对齐的,除8得到flow中包含8字节的个数
#define FLOW_U64S (sizeof(struct flow) / sizeof(uint64_t))

//map大小为8字节,MAP_T_BITS 为64位
typedef unsigned long long map_t;
#define MAP_T_BITS (sizeof(map_t) * CHAR_BIT)

//每位表示一个u64,FLOWMAP_UNITS 表示最少需要几个64位
#define FLOWMAP_UNITS DIV_ROUND_UP(FLOW_U64S, MAP_T_BITS)

struct flowmap {
map_t bits[FLOWMAP_UNITS];
};

struct miniflow {
struct flowmap map;
/* Followed by:
* uint64_t values[n];
* where 'n' is miniflow_n_values(miniflow). */
};

struct netdev_flow_key {
uint32_t hash;
uint32_t len;
struct miniflow mf; // bits
uint64_t buf[FLOW_MAX_PACKET_U64S]; // 就是上边所说的value
};

// 有些字段是互斥的
#define FLOW_MAX_PACKET_U64S (FLOW_U64S \
/* Unused in datapath */ - FLOW_U64_SIZE(regs) \
- FLOW_U64_SIZE(metadata) \
/* L2.5/3 */ - FLOW_U64_SIZE(nw_src) /* incl. nw_dst */ \
- FLOW_U64_SIZE(mpls_lse) \
/* L4 */ - FLOW_U64_SIZE(tp_src) \
)

miniflow优点:

void
miniflow_extract(struct dp_packet *packet, struct miniflow *dst)
{

// 初始化赋值有两个关键,一个是这个values: return (uint64_t *)(mf + 1);
// 就是上边说的
uint64_t *values = miniflow_values(dst);
struct mf_ctx mf = { FLOWMAP_EMPTY_INITIALIZER, values,
values + FLOW_U64S };

if (md->skb_priority || md->pkt_mark) {
miniflow_push_uint32(mf, skb_priority, md->skb_priority);
miniflow_push_uint32(mf, pkt_mark, md->pkt_mark);
}
miniflow_push_be16(mf, dl_type, dl_type);
miniflow_pad_to_64(mf, dl_type);

// 去取网络层信息,从这里可以看出,ovs暂时只支持IP,IPV6,ARP,RARP报文  
if (OVS\_LIKELY(dl\_type == htons(ETH\_TYPE\_IP))){...}  
else if  
...

// 提取传输层,从这里可以看出,ovs暂时支持传输层协议有TCP,UDP,SCTP,ICMP,ICMPV6  
if (OVS\_LIKELY(nw\_proto == IPPROTO\_TCP)){...}  
else if  
...

miniflow_push_uint32()

在上面将value保存到miniflow时,用到了几个辅助函数,比如下面的miniflow_push_uint32用来将一个32位的值保存到miniflow中FIELD对应的位置。其首先调用offsetof获取field在flow中的偏移字节数,因为flow是8字节对齐的,所以一个四字节的成员变量要么位于8字节的起始位置,要么位于8字节的中间位置,即对8取模值肯定为0或者4,再调用miniflow_push_uint32_保存到对应的位置,并设置map中对应的bit为1。

#define miniflow_push_uint32(MF, FIELD, VALUE) \
miniflow_push_uint32_(MF, offsetof(struct flow, FIELD), VALUE)

#define miniflow_push_uint32_(MF, OFS, VALUE) \
{ \
MINIFLOW_ASSERT(MF.data < MF.end); \
\
//成员变量位于起始位置,需要调用miniflow_set_map设置对应的bit为1
if ((OFS) % 8 == 0) { \
miniflow_set_map(MF, OFS / 8); \
*(uint32_t *)MF.data = VALUE; \
} else if ((OFS) % 8 == 4) { \
//成员变量不在起始位置,要判断此变量所在的bit为1
miniflow_assert_in_map(MF, OFS / 8); \
*((uint32_t *)MF.data + 1) = VALUE; \
MF.data++; \
} \
}

/* Initializes 'dst' as a copy of 'src'. */
void
miniflow_expand(const struct miniflow *src, struct flow *dst)
{
memset(dst, 0, sizeof *dst);
flow_union_with_miniflow(dst, src);
}

/* Perform a bitwise OR of miniflow 'src' flow data with the equivalent
* fields in 'dst', storing the result in 'dst'. */
static inline void
flow_union_with_miniflow(struct flow *dst, const struct miniflow *src)
{
flow_union_with_miniflow_subset(dst, src, src->map);
}

static inline void
flow_union_with_miniflow_subset(struct flow *dst, const struct miniflow *src,
struct flowmap subset)
{
uint64_t *dst_u64 = (uint64_t *) dst;
const uint64_t *p = miniflow_get_values(src);
map_t map;
//遍历所有的map
FLOWMAP_FOR_EACH_MAP (map, subset) {
size_t idx;
//遍历map中所有的非0bit
MAP_FOR_EACH_INDEX(idx, map) {
dst_u64[idx] |= *p++;
}
dst_u64 += MAP_T_BITS;
}
}

流表查询过程

该部分入口在lib/dpif-netdev.c,就是最开始的那个图。

查询的缓存分为两层:一个是DFC,一个是dpcls,相当于microflow和megaflow,DFC由两部分组成,DFC(datapath flow cache):EMC(Exact match cache)+SMC(Signature match cache),另一部分就是dpcls(datapath classifer)。

SMC默认关闭:bool smc_enable = smap_get_bool(other_config, "smc-enable", false);

函数执行流程(不包含SMC的):

static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
bool md_is_valid, odp_port_t port_no)
{
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
struct netdev_flow_key keys[PKT_ARRAY_SIZE];
struct netdev_flow_key *missed_keys[PKT_ARRAY_SIZE];
struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
size_t n_batches;
struct dp_packet_flow_map flow_map[PKT_ARRAY_SIZE];
uint8_t index_map[PKT_ARRAY_SIZE];
size_t n_flows, i;

odp\_port\_t in\_port;

n\_batches = 0;  
// 1. dfc\_processing之后会把miss的放到packets里  
//    找到的可能已经batched了,或者放到flow\_map里了  
//    flow\_map里是未bathed的,可能直接是\*flow或者是NULL,是NULL再去下一层cache查  
dfc\_processing(pmd, packets, keys, missed\_keys, batches, &n\_batches,  
               flow\_map, &n\_flows, index\_map, md\_is\_valid, port\_no);

// 2. 如果有miss的,再去找fast-path,也就是查dpcls  
if (!dp\_packet\_batch\_is\_empty(packets)) {  
    in\_port = packets->packets\[0\]->md.in\_port.odp\_port;  
    fast\_path\_processing(pmd, packets, missed\_keys,  
                         flow\_map, index\_map, in\_port);  
}

/\* Batch rest of packets which are in flow map. \*/  
for (i = 0; i < n\_flows; i++) {  
    struct dp\_packet\_flow\_map \*map = &flow\_map\[i\];

    if (OVS\_UNLIKELY(!map->flow)) {  
        continue;  
    }  
    dp\_netdev\_queue\_batches(map->packet, map->flow, map->tcp\_flags,  
                            batches, &n\_batches);  
}

for (i = 0; i < n\_batches; i++) {  
    batches\[i\].flow->batch = NULL;  
}

// 执行每个packet的action  
for (i = 0; i < n\_batches; i++) {  
    packet\_batch\_per\_flow\_execute(&batches\[i\], pmd);  
}  

}

static inline size_t
dfc_processing(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets_,
struct netdev_flow_key *keys,
struct netdev_flow_key **missed_keys,
struct packet_batch_per_flow batches[], size_t *n_batches,
struct dp_packet_flow_map *flow_map,
size_t *n_flows, uint8_t *index_map,
bool md_is_valid, odp_port_t port_no)
{
struct netdev_flow_key *key = &keys[0];
size_t n_missed = 0, n_emc_hit = 0;
struct dfc_cache *cache = &pmd->flow_cache;
struct dp_packet *packet;
size_t cnt = dp_packet_batch_size(packets_);
// emc的插入概率,如果为0,表示不开启emc
uint32_t cur_min = pmd->ctx.emc_insert_min;
int i;
uint16_t tcp_flags;
bool smc_enable_db;
// 记录未batched的个数
size_t map_cnt = 0;
// 这个变量用于保序
bool batch_enable = true;
// 获取smc是否开启参数
atomic_read_relaxed(&pmd->dp->smc_enable_db, &smc_enable_db);
pmd_perf_update_counter(&pmd->perf_stats,
md_is_valid ? PMD_STAT_RECIRC : PMD_STAT_RECV,
cnt);

do\_dfc\_hook(pmd, packets\_, batches, n\_batches);  
cnt = dp\_packet\_batch\_size(packets\_);     

// 逐个对dp\_packet\_batch中的每一个packet进行处理  
DP\_PACKET\_BATCH\_REFILL\_FOR\_EACH (i, cnt, packet, packets\_) {  
    struct dp\_netdev\_flow \*flow;  
    // 若packet包长小于以太头的长度直接丢包  
    if (OVS\_UNLIKELY(dp\_packet\_size(packet) < ETH\_HEADER\_LEN)) {  
        dp\_packet\_delete(packet);  
        COVERAGE\_INC(datapath\_drop\_rx\_invalid\_packet);  
        continue;  
    }  
    // 对数据手工预取可减少读取延迟,从而提高性能  
    if (i != cnt - 1) {  
        struct dp\_packet \*\*packets = packets\_->packets;  
        /\* Prefetch next packet data and metadata. \*/  
        OVS\_PREFETCH(dp\_packet\_data(packets\[i+1\]));  
        pkt\_metadata\_prefetch\_init(&packets\[i+1\]->md);  
    }

    // 初始化metadata首先将pkt\_metadata中flow\_in\_port前的字节全部设为0  
    // 将in\_port.odp\_port设为port\_no, tunnel.ipv6\_dst设为in6addr\_any  
    if (!md\_is\_valid) {  
        pkt\_metadata\_init(&packet->md, port\_no);  
    }  
    // 报文转化为miniflow, 上文有讲  
    miniflow\_extract(packet, &key->mf);  
    key->len = 0; /\* Not computed yet. \*/  
    // 计算当前报文miniflow的hash值  
    key->hash =  
            (md\_is\_valid == false)  
            ? dpif\_netdev\_packet\_get\_rss\_hash\_orig\_pkt(packet, &key->mf)  
            : dpif\_netdev\_packet\_get\_rss\_hash(packet, &key->mf);

    // 根据key->hash,emc\_entry alive,miniflow 3个条件得到dp\_netdev\_flow  
    // cur\_min = 0,表示不可能插入,后面有讲什么时候才会插入EMC  
    flow = (cur\_min != 0) ? emc\_lookup(&cache->emc\_cache, key) : NULL;

    if (OVS\_LIKELY(flow)) {  
        tcp\_flags = miniflow\_get\_tcp\_flags(&key->mf);  
        n\_emc\_hit++; // 命中次数+1  
        // 为了保证报文的顺序,所有的packet对应的flow都用flow\_map存储  
        // flow\_map里面就是packet数量对应的(packet,flow,tcp\_flag)  
        // 最后会把这些在dp\_netdev\_input\_\_里重新把顺序合并一下  
        if (OVS\_LIKELY(batch\_enable)) {  
            // 把查到的flow加到batches里第n\_batches个batch里  
            dp\_netdev\_queue\_batches(packet, flow, tcp\_flags, batches,  
                                    n\_batches);  
        } else {

            packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,  
                                       flow\_map, map\_cnt++);  
        }  
    } else {  
        // 这些数据结构用于smc查询时的记录  
        // 没查到把packet放到packets\_里,从下标0再开始放  
        // 最后packets\_都是未查到的  
        dp\_packet\_batch\_refill(packets\_, packet, i);  
        index\_map\[n\_missed\] = map\_cnt;  
        flow\_map\[map\_cnt++\].flow = NULL;  
        missed\_keys\[n\_missed\] = key;  
        key = &keys\[++n\_missed\];  
        batch\_enable = false; // 之后的都是未batched的  
    }  
}  
\*n\_flows = map\_cnt;

pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_EXACT\_HIT, n\_emc\_hit);  
// 如果没有开启smc,直接返回了  
if (!smc\_enable\_db) {  
    return dp\_packet\_batch\_size(packets\_);  
}

smc\_lookup\_batch(pmd, keys, missed\_keys, packets\_,  
                 n\_missed, flow\_map, index\_map);

return dp\_packet\_batch\_size(packets\_);  

}

1.1 emc查询:emc_lookup()

static inline struct dp_netdev_flow *
emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
{
struct emc_entry *current_entry;
// 这里说一下,一个hash分配两个桶,长度为13位,cache桶的大小为1<<13 // struct emc\_cache { // struct emc\_entry entries\[EM\_FLOW\_HASH\_ENTRIES\]; // int sweep\_idx; /\* For emc\_cache\_slow\_sweep(). \*/ // }; EMC\_FOR\_EACH\_POS\_WITH\_HASH (cache, current\_entry, key->hash) {
if (current_entry->key.hash == key->hash
&& emc_entry_alive(current_entry)
&& emc_flow_key_equal_mf(&current_entry->key, &key->mf)) {
/* We found the entry with the 'key->mf' miniflow */
return current_entry->flow;
}
}
return NULL;
}

#define EM_FLOW_HASH_SHIFT 13
#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT) #define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1) #define EM_FLOW_HASH_SEGS 2 #define EMC_FOR_EACH_POS_WITH_HASH(EMC, CURRENT_ENTRY, HASH) \ for (uint32_t i__ = 0, srch_hash__ = (HASH); \ (CURRENT_ENTRY) = &(EMC)->entries[srch_hash__ & EM_FLOW_HASH_MASK], \
i__ < EM_FLOW_HASH_SEGS; \ i__++, srch_hash__ >>= EM_FLOW_HASH_SHIFT)

// 比较miniflow是否相同
static inline bool
emc_flow_key_equal_mf(const struct netdev_flow_key *key,
const struct miniflow *mf)
{
return !memcmp(&key->mf, mf, key->len);
}

EMC查询函数执行:

1.2 smc查询:smc_lookup_batch()

static inline void
smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
struct netdev_flow_key *keys,
struct netdev_flow_key **missed_keys,
struct dp_packet_batch *packets_,
const int cnt,
struct dp_packet_flow_map *flow_map,
uint8_t *index_map)
{
int i;
struct dp_packet *packet;
size_t n_smc_hit = 0, n_missed = 0;
struct dfc_cache *cache = &pmd->flow_cache;
struct smc_cache *smc_cache = &cache->smc_cache;
const struct cmap_node *flow_node;
int recv_idx;
uint16_t tcp_flags;

/\* Prefetch buckets for all packets \*/  
for (i = 0; i < cnt; i++) {  
    OVS\_PREFETCH(&smc\_cache->buckets\[keys\[i\].hash & SMC\_MASK\]);  
}

DP\_PACKET\_BATCH\_REFILL\_FOR\_EACH (i, cnt, packet, packets\_) {  
    struct dp\_netdev\_flow \*flow = NULL;  
    // 找到hash相同的flow链表的头节点  
    flow\_node = smc\_entry\_get(pmd, keys\[i\].hash);  
    bool hit = false;  
    /\* Get the original order of this packet in received batch. \*/  
    recv\_idx = index\_map\[i\];

    if (OVS\_LIKELY(flow\_node != NULL)) {  
        // 遍历一下看看哪一个是相同的,这个通过offsetof找到存放该cmap结构体的首地址  
        // dp\_netdev\_flow里面的首地址就是,  
        CMAP\_NODE\_FOR\_EACH (flow, node, flow\_node) {  
            /\* Since we dont have per-port megaflow to check the port  
             \* number, we need to  verify that the input ports match. \*/  
            if (OVS\_LIKELY(dpcls\_rule\_matches\_key(&flow->cr, &keys\[i\]) &&  
            flow->flow.in\_port.odp\_port == packet->md.in\_port.odp\_port)) {  
                tcp\_flags = miniflow\_get\_tcp\_flags(&keys\[i\].mf);  
                keys\[i\].len =  
                    netdev\_flow\_key\_size(miniflow\_n\_values(&keys\[i\].mf));  
                if (emc\_probabilistic\_insert(pmd, &keys\[i\], flow)) {  
                    if (flow->status == OFFLOAD\_NONE) {  
                        queue\_netdev\_flow\_put(pmd->dp->dp\_flow\_offload, \\  
                                pmd->dp->class, \\  
                                flow, NULL, DP\_NETDEV\_FLOW\_OFFLOAD\_OP\_ADD);  
                    }  
                }  
                packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,  
                                           flow\_map, recv\_idx);  
                n\_smc\_hit++;  
                hit = true;  
                break;  
            }  
        }  
        if (hit) {  
            continue;  
        }  
    }  
    // SMC也miss了,和之前一样,把miss的放packets\_里,从0开始放  
    dp\_packet\_batch\_refill(packets\_, packet, i);  
    index\_map\[n\_missed\] = recv\_idx;  
    missed\_keys\[n\_missed++\] = &keys\[i\];  
}

pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_SMC\_HIT, n\_smc\_hit);  

}

查找hash相同的链表头:smc_entry_get()

static inline const struct cmap_node *
smc_entry_get(struct dp_netdev_pmd_thread *pmd, const uint32_t hash)
{
struct smc_cache *cache = &(pmd->flow_cache).smc_cache;
// smc_cache桶的大小是(1<<18),SMC_MASK=(1<<18)- 1 // 先通过后hash的后18位定位到桶 struct smc_bucket *bucket = &cache->buckets[hash & SMC_MASK];
// 一个桶有4个16位的sig,存key->hash前16位,正好是64位
// 遍历4个元素看那个匹配,获得匹配后的cmap的下标
uint16_t sig = hash >> 16;
uint16_t index = UINT16_MAX;

for (int i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {  
    if (bucket->sig\[i\] == sig) {  
        index = bucket->flow\_idx\[i\];  
        break;  
    }  
}  
// 通过index找到在dpcls里的桶位置  
if (index != UINT16\_MAX) {  
    return cmap\_find\_by\_index(&pmd->flow\_table, index);  
}  
return NULL;  

}

1.3 更新emc:emc_probabilistic_insert()

命中SMC后,插入回上一层cache(EMC)里:emc_probabilistic_insert()

插入EMC的条件:

  默认插入流表的概率是1%,可以通过ovs-vsctl set Open_vSwitch . other_config:emc-insert-prob=10 设置概率,表示平均10条流表有1条插入,当为0时禁用EMC,当为1的时候,百分百插入。设置后会在代码里设置emc_insert_min字段为uint_max/10,插入的时候生成一个uint_random(),如果随机数小于emc_insert_min才会插入。

static inline bool
emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
const struct netdev_flow_key *key,
struct dp_netdev_flow *flow)
{
/* Insert an entry into the EMC based on probability value 'min'. By
* default the value is UINT32_MAX / 100 which yields an insertion
* probability of 1/100 ie. 1% */
uint32_t min = pmd->ctx.emc_insert_min;
if (min && random_uint32() <= min) { emc_insert(&(pmd->flow_cache).emc_cache, key, flow);
return true;
}
return false;
}

emc_insert同样有我在内核查询里的问题,如果cache里没有该miniflow,会找一个hash值小的entry,覆盖这个entry,那如果有一个hash很大的flow被插入了,但是这个flow之后就没用过了,那岂不是这个entry就浪费了,不会被用到。

找到了合适的emc_entry。则将报文对应的netdev_dev_flow key信息存储到该表项中。而对于这个表项,原有的emc_entry.flow有可能还有指向一条旧的流表,需要将这条流表的引用计数减1,如果减1后达到0,则释放该流表空间。同时更新emc_entry.flow重新指向新的流表。到此为止,EMC表项更新完毕。

static inline void
emc_insert(struct emc_cache *cache, const struct netdev_flow_key *key,
struct dp_netdev_flow *flow)
{
struct emc_entry *to_be_replaced = NULL;
struct emc_entry *current_entry;

EMC\_FOR\_EACH\_POS\_WITH\_HASH(cache, current\_entry, key->hash) {  
    if (netdev\_flow\_key\_equal(&current\_entry->key, key)) {  
        /\* We found the entry with the 'mf' miniflow \*/  
        emc\_change\_entry(current\_entry, flow, NULL);  
        return;  
    }  
    /\* Replacement policy: put the flow in an empty (not alive) entry, or  
     \* in the first entry where it can be \*/

    if (!to\_be\_replaced  
        || (emc\_entry\_alive(to\_be\_replaced)  
            && !emc\_entry\_alive(current\_entry))  
        || current\_entry->key.hash < to\_be\_replaced->key.hash) {  
        // 这个黄色判断就是我迷惑的地方  
        to\_be\_replaced = current\_entry;  
    }  
}  
/\* We didn't find the miniflow in the cache.  
 \* The 'to\_be\_replaced' entry is where the new flow will be stored \*/  
emc\_change\_entry(to\_be\_replaced, flow, key);  

}

1.4 EMC的轮训更新

在pmd_thread_main()里面:

if (lc++ > 1024) {
lc = 0;

coverage\_try\_clear();

// 这里的optimize是排序一下TSS  
dp\_netdev\_pmd\_try\_optimize(pmd, poll\_list, poll\_cnt);  
dp\_netdev\_pmd\_hook\_idle\_run(pmd);  

#ifdef ENABLE_EMC
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(pmd->dp, &((pmd->flow_cache).emc_cache));
}
#else
ovsrcu_try_quiesce();
#endif

for (i = 0; i < poll\_cnt; i++) {  
    uint64\_t current\_seq =  
             netdev\_get\_change\_seq(poll\_list\[i\].rxq->port->netdev);  
    if (poll\_list\[i\].change\_seq != current\_seq) {  
        poll\_list\[i\].change\_seq = current\_seq;  
        poll\_list\[i\].rxq\_enabled =  
                     netdev\_rxq\_enabled(poll\_list\[i\].rxq->rx);  
    }  
}  

}

dpcls是megaflow的查询过程,使用TSS算法,是个很老的算法了,看源码之前,先讲一下ovs里面的TSS,之前内核已经讲过,但是没有讲OVS里做的优化,下边再说一次,然后建议再去看一下这个有很多图的博客OVS-DPDK Datapath Classifier,这样之后对整个dpcls流程就有所了解了。

TSS算法原理

OVS 在内核态使用了元组空间搜索算法(Tuple Space Search,简称 TSS)进行流表查找,元组空间搜索算法的核心思想是,把所有规则按照每个字段的前缀长度进行组合,并划分为不同的元组中,然后在这些元组集合中进行哈希查找。我们举例说明,假设现有 10 条规则以及 3 个匹配字段,每个匹配字段长度均为 4:

我们将每条规则各匹配字段的前缀长度提取出来,按照前缀长度进行组合,并根据前缀长度组合进行分组:

我们将每个前缀长度组合称为 元组,每个元组对应于哈希表的一个桶,同一前缀长度组合内的所有规则放置在同一个哈希桶内:

10 条规则被划分为 4 个元组,因此最多只需要四次查找,就可以找到对应的规则。

算法优缺点

为什么OVS选择TSS,而不选择其他查找算法?论文给出了以下三点解释:

(1)在虚拟化数据中心环境下,流的添加删除比较频繁,TSS支持高效的、常数时间的表项更新; (2)TSS支持任意匹配域的组合; (3)TSS存储空间随着流的数量线性增长,空间复杂度为 O(N),N 为规则数目。

元组空间搜索算法的缺点是,由于基于哈希表实现,因此查找的时间复杂度不能确定。当所有规则各个字段的前缀长度组合数目过多时,查找性能会大大降低,最坏情况下需要查找所有规则。

OVS里做的排序优化

查找的过程需要从前向后遍历所有元组,命中了就不用往后查了。OVS给每个元组加了一个命中次数,命中次数越多,元组这个链表越靠前,这样就可以减少了查表次数。

2.1 dpcls相关数据结构

// 线程安全的
#define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }

struct cmap {
OVSRCU_TYPE(struct cmap_impl *) impl;
};

/* The implementation of a concurrent hash map. */
struct cmap_impl {
// 补齐64字节
PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline0,
unsigned int n; /* Number of in-use elements. */
unsigned int max_n; /* Max elements before enlarging. */
unsigned int min_n; /* Min elements before shrinking. */
uint32_t mask; /* Number of 'buckets', minus one. */
uint32_t basis; /* Basis for rehashing client's
hash values. */
);
PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline1,
struct cmap_bucket buckets[1];
);
};

struct cmap_bucket {
/* Padding to make cmap_bucket exactly one cache line long. */
PADDED_MEMBERS(CACHE_LINE_SIZE,
// 锁机制,读和写都会+1,读的时候等到变成偶数再去读,保证安全
atomic_uint32_t counter;
// 桶中的每个槽用(hashs[i], nodes[i])元组来表示
uint32_t hashes[CMAP_K];
struct cmap_node nodes[CMAP_K];
);
};
struct cmap_node {
OVSRCU_TYPE(struct cmap_node *) next; /* Next node with same hash. */
};

/* 二级匹配表.每个报文接收端口对应一个 */
struct dpcls {
struct cmap_node node; /* 链表节点 */
odp_port_t in_port; /* 报文接收端口 */
struct cmap subtables_map; // 管理下边subtables的索引,用于遍历
struct pvector subtables; // 上文TSS算法所说的元组表
}
 
struct pvector {
// 指向具体子表信息
OVSRCU_TYPE(struct pvector_impl *) impl;
// 平时,temp都是为NULL.只有当pvector扩充时,temp才用来临时缓存数据.
// 待排好序后,再拷贝到impl中,temp再置NULL
struct pvector_impl *temp;
};

// 相当于vector
struct pvector_impl {
size_t size; /* Number of entries in the vector */
size_t allocated; /* Number allocted entries */
/* 初始化的时候只有4个元素.后续可能会扩充 */
struct pvector_entry vector[];
}
 
struct pvector_entry {
// pvector_impl中的vector是按照priority从小到大排序的
// pmd_thread_main里会把priority赋值为hit_cnt,然后排序
int priority;
/* 实际指向了struct dpcls_subtable结构 */
void *ptr;
}
 
// 子表信息
struct dpcls_subtable {
/* The fields are only used by writers. */
struct cmap_node cmap_node OVS_GUARDED; /* Within dpcls 'subtables_map'. */

struct cmap rules; // 该表的bucket内容  
uint32\_t hit\_cnt;  // 命中该子表的次数

// 下边是mask的miniflow前两个的bits里1的个数  
uint8\_t mf\_bits\_set\_unit0;  
uint8\_t mf\_bits\_set\_unit1;  
// 根据mf\_bits\_set\_unit01选择最合适的查找算法  
dpcls\_subtable\_lookup\_func lookup\_func;

/\* Caches the masks to match a packet to, reducing runtime calculations. \*/  
uint64\_t \*mf\_masks; // 由下边的mask->mf->bits\[01\]得来的,  
struct netdev\_flow\_key mask; // 该表的掩码信息  

};

关于上边的mf_masks与mask,举个例子
mf_bits_set_unit0 = 4, mf_bits_set_unit1 = 0
netdev_flow_key.mf.bits[0] = 111010 (2进制)
mf_masks = [1, 111, 1111, 11111] (2进制)

三个图对应他们的关系,链表三用于遍历的,查找过程中并不会通过链表三方式搜索。查找的时候走的就是链表二的流程。

2.2 dpcls查询入口:fast_path_processing->dpcls_lookup()

static bool
dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
struct dpcls_rule **rules, const size_t cnt,
int *num_lookups_p)
{
#define MAP_BITS (sizeof(uint32_t) * CHAR_BIT)
BUILD_ASSERT_DECL(MAP_BITS >= NETDEV_MAX_BURST);

struct dpcls\_subtable \*subtable;  
uint32\_t keys\_map = TYPE\_MAXIMUM(uint32\_t); /\* Set all bits. \*/

if (cnt != MAP\_BITS) {  
    /\*keys\_map中置1位数为包的总数,并且第i位对应第i个包\*/  
    keys\_map >>= MAP\_BITS - cnt; /\* Clear extra bits. \*/  
}  
memset(rules, 0, cnt \* sizeof \*rules);

int lookups\_match = 0, subtable\_pos = 1;  
uint32\_t found\_map;

PVECTOR\_FOR\_EACH (subtable, &cls->subtables) {  
    // 查找函数,对应下边的lookup\_generic()  
    found\_map = subtable->lookup\_func(subtable, keys\_map, keys, rules);

    uint32\_t pkts\_matched = count\_1bits(found\_map);  
    // 搜索的子表个数,加上的是当前这几个key找了多少个表  
    lookups\_match += pkts\_matched \* subtable\_pos;

    keys\_map &= ~found\_map;  
    if (!keys\_map) {  
        if (num\_lookups\_p) {  
            \*num\_lookups\_p = lookups\_match;  
        }  
        // 全找到了  
        return true;  
    }  
    subtable\_pos++;  
}

if (num\_lookups\_p) {  
    \*num\_lookups\_p = lookups\_match;  
}  
// 没有全找到  
return false;  

}

lookup_generic()

ovs-dpdk里面有avx512-gather.c,使用avx512优化了look_up,整体逻辑还是一样的,这里只说dpif-netdev-lookup-generic

入口在这里,往下走,传进去subtable有效字段有多大
static uint32_t
dpcls_subtable_lookup_generic(struct dpcls_subtable *subtable,
uint32_t keys_map,
const struct netdev_flow_key *keys[],
struct dpcls_rule **rules)
{
return lookup_generic_impl(subtable, keys_map, keys, rules,
subtable->mf_bits_set_unit0,
subtable->mf_bits_set_unit1);
}

static inline uint32_t ALWAYS_INLINE
lookup_generic_impl(struct dpcls_subtable *subtable, // 当前的subtable
uint32_t keys_map, // miss_bit_map
const struct netdev_flow_key *keys[], // miss_key
struct dpcls_rule **rules, // save hit_rule
const uint32_t bit_count_u0,
const uint32_t bit_count_u1)
{
// 有几个包
const uint32_t n_pkts = count_1bits(keys_map);
ovs_assert(NETDEV_MAX_BURST >= n_pkts);
uint32_t hashes[NETDEV_MAX_BURST];

// 根据mask字段的大小开空间  
const uint32\_t bit\_count\_total = bit\_count\_u0 + bit\_count\_u1;  
// 一个batch最大是NETDEV\_MAX\_BURST  
const uint32\_t block\_count\_required = bit\_count\_total \* NETDEV\_MAX\_BURST;  
uint64\_t \*mf\_masks = subtable->mf\_masks;  
int i;

// 申请存储一个batch报文信息的数组,存放  
uint64\_t \*blocks\_scratch = get\_blocks\_scratch(block\_count\_required); 

// 获得每个key与当前表的mask“与运算”的结果  
ULLONG\_FOR\_EACH\_1 (i, keys\_map) {  
        netdev\_flow\_key\_flatten(keys\[i\],  
                                &subtable->mask, // 该表的掩码信息  
                                mf\_masks,  // 由subtable->mask处理后的mask  
                                &blocks\_scratch\[i \* bit\_count\_total\],  
                                bit\_count\_u0,  
                                bit\_count\_u1);  
}

// 算出来每一个key在该subtable里的hash值,该hash值由“mask字节数,key和mask与运算结果”得出  
ULLONG\_FOR\_EACH\_1 (i, keys\_map) {  
    uint64\_t \*block\_ptr = &blocks\_scratch\[i \* bit\_count\_total\];  
    uint32\_t hash = hash\_add\_words64(0, block\_ptr, bit\_count\_total);  
    hashes\[i\] = hash\_finish(hash, bit\_count\_total \* 8);  
}

uint32\_t found\_map;  
const struct cmap\_node \*nodes\[NETDEV\_MAX\_BURST\];

// 找到每个key在该subtable里的cmap,并且返回每个key有没有被找到,第i位是1则找到  
found\_map = cmap\_find\_batch(&subtable->rules, keys\_map, hashes, nodes);

ULLONG\_FOR\_EACH\_1 (i, found\_map) {  
    struct dpcls\_rule \*rule;  
    // 可能不同的rule有相同的hash,看那个是匹配的  
    CMAP\_NODE\_FOR\_EACH (rule, cmap\_node, nodes\[i\]) {  
        const uint32\_t cidx = i \* bit\_count\_total;  
        /\*rule->mask & keys\[i\]的值与rule->flow相比较\*/  
        uint32\_t match = netdev\_rule\_matches\_key(rule, bit\_count\_total,  
                                                 &blocks\_scratch\[cidx\]);  
        if (OVS\_LIKELY(match)) {  
            rules\[i\] = rule;  
            subtable->hit\_cnt++;  
            goto next;  
        }  
    }  
    ULLONG\_SET0(found\_map, i);  /\* Did not match. \*/  
next:  
    ; /\* Keep Sparse happy. \*/  
}  
return found\_map;  

}

掩码运算netdev_flow_key_flatten()

// 这个函数对应dpif-netdev.c里面的dpcls_flow_key_gen_masks()
static inline void
netdev_flow_key_flatten(const struct netdev_flow_key *key, // 要查找的miss_key
const struct netdev_flow_key *mask,
const uint64_t *mf_masks,
uint64_t *blocks_scratch,
const uint32_t u0_count,
const uint32_t u1_count)
{
/* Load mask from subtable, mask with packet mf, popcount to get idx. */
const uint64_t *pkt_blocks = miniflow_get_values(&key->mf);
const uint64_t *tbl_blocks = miniflow_get_values(&mask->mf); // 获取miss_key和mask的miniflow

/\* Packet miniflow bits to be masked by pre-calculated mf\_masks. \*/  
const uint64\_t pkt\_bits\_u0 = key->mf.map.bits\[0\];  
const uint32\_t pkt\_bits\_u0\_pop = count\_1bits(pkt\_bits\_u0);  
const uint64\_t pkt\_bits\_u1 = key->mf.map.bits\[1\];

// 这个函数就是把miss_key与subtable的掩码进行&运算
// 会运算出该mask在意字段结果,放到blocks_scratch里
netdev_flow_key_flatten_unit(&pkt_blocks[0], // key-mf的数据段
&tbl_blocks[0], // mask->mf的数据段
&mf_masks[0], // mask->mf->bits得来mask
&blocks_scratch[0], // 存放的地址
pkt_bits_u0, // key->mf里的bits[0]
u0_count); // mask->mf->bits[0]里1的个数

netdev\_flow\_key\_flatten\_unit(&pkt\_blocks\[pkt\_bits\_u0\_pop\], // 上边bits\[0\]的已经算过了,从bits\[1\]开始算  
                             &tbl\_blocks\[u0\_count\],  
                             &mf\_masks\[u0\_count\],  
                             &blocks\_scratch\[u0\_count\],  
                             pkt\_bits\_u1,  
                             u1\_count);  

}

static inline void
netdev_flow_key_flatten_unit(const uint64_t *pkt_blocks, // key-mf的数据段
const uint64_t *tbl_blocks, // mask->mf里的数据段
const uint64_t *mf_masks, // mask->mf->bits得来mask
uint64_t *blocks_scratch, // 存放到这里
const uint64_t pkt_mf_bits, // key->mf里的bits[01]
const uint32_t count) // mask->mf->bits[0]里1的个数
{

// 说一下意思,这个我们流程就是用key和subtable的mask与运算,肯定只需要与运算mask里
// 不为0的字段,其他的mask不关心,然后这个操作就是为了得到key对应字段是key->mf的第几位,
// 比如mask的bits[0]=11111, key的bits[0] = 10100, mask里的第3个1在key里面是第1个
// 这一位与的结果就是tbl_blocks[2]&pkt_blocks[0], 也就是怎么找到key里的下标0
// 就看key当前位之前有几个1就行了。这里这样做的1010111,
// 蓝色1之前有count_1bits(1010111 & 0001111) = 3

// 对上边的mask举个例子 count = 4;
// mask->mf->bits[0] = 111010 (2进制)
// mf_masks = [1, 111, 1111, 11111] (2进制);
// pkt_mf_bits = 010100
// blocks_scratch = [0,0,0,0,pkt_blocks[1]&tbl_blocks[4],0]
uint32_t i;
for (i = 0; i < count; i++) {
// 拿i=2举例
uint64_t mf_mask = mf_masks[i]; // mf_mask = 001111
uint64_t idx_bits = mf_mask & pkt_mf_bits; // idx_bits = 000100
const uint32_t pkt_idx = count_1bits(idx_bits); // pkt_idx = 1

    uint64\_t pkt\_has\_mf\_bit = (mf\_mask + 1) & pkt\_mf\_bits;  // pkt\_has\_mf\_bit = 010000  
    // 是否求掩码:mask当前位对应的key的字段,如果key在当前位是0,下边算掩码就会变成0  
    uint64\_t no\_bit = ((!pkt\_has\_mf\_bit) > 0) - 1; // 2^64 - 1

    // mask里第i个字段与运算key对应的字段  
    blocks\_scratch\[i\] = pkt\_blocks\[pkt\_idx\] & tbl\_blocks\[i\] & no\_bit; //  
}  

}

key对应的cmap:cmap_find_batch()

unsigned long
cmap_find_batch(const struct cmap *cmap, unsigned long map,
uint32_t hashes[], const struct cmap_node *nodes[])
{
const struct cmap_impl *impl = cmap_get_impl(cmap);
unsigned long result = map;
int i;
// 每一位就是一个包,一字节8个包
uint32_t h1s[sizeof map * CHAR_BIT];
const struct cmap_bucket *b1s[sizeof map * CHAR_BIT];
const struct cmap_bucket *b2s[sizeof map * CHAR_BIT];
uint32_t c1s[sizeof map * CHAR_BIT];

// 每个impl里桶的数量为impl->mask+1  
// 为什么mask是桶的个数减1:因为下标从0开始,找下表的时候直接(hash & impl->mask)就行了

// 至于为什么开两个?因为buckets存放的方法也是一个值对应两个hash  
// 第一次hash1 = rehash(impl->basis, hash), 找buckets\[hash1 & impl->mask\], 遍历里面CMAP\_K个元素  
// 第二次hash2 = other\_hash(hash1), 找buckets\[hash2 & impl->mask\], 遍历里面CMAP\_K个元素

/\* Compute hashes and prefetch 1st buckets. \*/  
ULLONG\_FOR\_EACH\_1(i, map) {  
    h1s\[i\] = rehash(impl, hashes\[i\]);  
    b1s\[i\] = &impl->buckets\[h1s\[i\] & impl->mask\];  
    OVS\_PREFETCH(b1s\[i\]);  
}  
/\* Lookups, Round 1. Only look up at the first bucket. \*/  
ULLONG\_FOR\_EACH\_1(i, map) {  
    uint32\_t c1;  
    const struct cmap\_bucket \*b1 = b1s\[i\];  
    const struct cmap\_node \*node;

    do {  
        c1 = read\_even\_counter(b1);  
        // 找一下这个cmap\_bucket里面有没有相同hash的  
        node = cmap\_find\_in\_bucket(b1, hashes\[i\]);  
    } while (OVS\_UNLIKELY(counter\_changed(b1, c1)));

    if (!node) {  
        /\* Not found (yet); Prefetch the 2nd bucket. \*/  
        b2s\[i\] = &impl->buckets\[other\_hash(h1s\[i\]) & impl->mask\];  
        OVS\_PREFETCH(b2s\[i\]);  
        c1s\[i\] = c1; /\* We may need to check this after Round 2. \*/  
        continue;  
    }  
    /\* Found. \*/  
    ULLONG\_SET0(map, i); /\* Ignore this on round 2. \*/  
    OVS\_PREFETCH(node);  
    nodes\[i\] = node;  
}  
/\* Round 2. Look into the 2nd bucket, if needed. \*/  
ULLONG\_FOR\_EACH\_1(i, map) {  
    uint32\_t c2;  
    const struct cmap\_bucket \*b2 = b2s\[i\];  
    const struct cmap\_node \*node;

    do {  
        c2 = read\_even\_counter(b2);  
        node = cmap\_find\_in\_bucket(b2, hashes\[i\]);  
    } while (OVS\_UNLIKELY(counter\_changed(b2, c2)));

    if (!node) {  
        // 可能被修改了,  
        if (OVS\_UNLIKELY(counter\_changed(b1s\[i\], c1s\[i\]))) {  
            node = cmap\_find\_\_(b1s\[i\], b2s\[i\], hashes\[i\]);  
            if (node) {  
                goto found;  
            }  
        }  
        /\* Not found. \*/  
        ULLONG\_SET0(result, i); /\* Fix the result. \*/  
        continue;  
    }  

found:
OVS_PREFETCH(node);
nodes[i] = node;
}
return result;
}

2.3 fast_path_processing()

static inline void
fast_path_processing(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets_,
struct netdev_flow_key **keys,
struct dp_packet_flow_map *flow_map,
uint8_t *index_map,
odp_port_t in_port)
{
const size_t cnt = dp_packet_batch_size(packets_);
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
struct dp_packet *packet;
struct dpcls *cls;
struct dpcls_rule *rules[PKT_ARRAY_SIZE];
struct dp_netdev *dp = pmd->dp;
int upcall_ok_cnt = 0, upcall_fail_cnt = 0;
int lookup_cnt = 0, add_lookup_cnt;
bool any_miss;

for (size\_t i = 0; i < cnt; i++) {  
    /\* Key length is needed in all the cases, hash computed on demand. \*/  
    keys\[i\]->len = netdev\_flow\_key\_size(miniflow\_n\_values(&keys\[i\]->mf));  
}  
/\* Get the classifier for the in\_port \*/  
// 找到端口对应的dpcls结构,每个port有自己的dpcls,因为每个port收到的报文会更相似  
cls = dp\_netdev\_pmd\_lookup\_dpcls(pmd, in\_port);  
if (OVS\_LIKELY(cls)) {  
    // 调用dpcls\_lookup进行匹配  
    any\_miss = !dpcls\_lookup(cls, (const struct netdev\_flow\_key \*\*)keys,  
                            rules, cnt, &lookup\_cnt);  
} else {  
    any\_miss = true;  
    memset(rules, 0, sizeof(rules));  
}  
// 如果有miss的,则需要进行openflow流表查询  
if (OVS\_UNLIKELY(any\_miss) && !fat\_rwlock\_tryrdlock(&dp->upcall\_rwlock)) {  
    uint64\_t actions\_stub\[512 / 8\], slow\_stub\[512 / 8\];  
    struct ofpbuf actions, put\_actions;

    ofpbuf\_use\_stub(&actions, actions\_stub, sizeof actions\_stub);  
    ofpbuf\_use\_stub(&put\_actions, slow\_stub, sizeof slow\_stub);

    DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {  
        struct dp\_netdev\_flow \*netdev\_flow;

        if (OVS\_LIKELY(rules\[i\])) {  
            continue;  
        }  
        // 此时可能已经更新了,在进入upcall之前如果再查一次,如果能够查到,会比upcall消耗的少得多  
        netdev\_flow = dp\_netdev\_pmd\_lookup\_flow(pmd, keys\[i\],  
                                                &add\_lookup\_cnt);  
        if (netdev\_flow) {  
            lookup\_cnt += add\_lookup\_cnt;  
            rules\[i\] = &netdev\_flow->cr;  
            continue;  
        }  
        // 第一级和第二级流表查找失败后,就要查找第三级流表了,即openflow流表,这也称为upcall调用。  
        // 在普通ovs下是通过netlink实现的,在ovs+dpdk下,直接在pmd线程中调用upcall\_cb即可。  
        // 开始查找openflow流表。如果查找openflow流表成功并需要下发到dpcls时,需要判断是否超出最大流表限制  
        int error = handle\_packet\_upcall(pmd, packet, keys\[i\],  
                                         &actions, &put\_actions);

        if (OVS\_UNLIKELY(error)) {  
            upcall\_fail\_cnt++;  
        } else {  
            upcall\_ok\_cnt++;  
        }  
    }

    ofpbuf\_uninit(&actions);  
    ofpbuf\_uninit(&put\_actions);  
    fat\_rwlock\_unlock(&dp->upcall\_rwlock);  
} else if (OVS\_UNLIKELY(any\_miss)) {  
    DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {  
        if (OVS\_UNLIKELY(!rules\[i\])) {  
            dp\_packet\_delete(packet);  
            COVERAGE\_INC(datapath\_drop\_lock\_error);  
            upcall\_fail\_cnt++;  
        }  
    }  
}

DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {  
    struct dp\_netdev\_flow \*flow;  
    /\* Get the original order of this packet in received batch. \*/  
    int recv\_idx = index\_map\[i\];  
    uint16\_t tcp\_flags;

    if (OVS\_UNLIKELY(!rules\[i\])) {  
        continue;  
    }

    flow = dp\_netdev\_flow\_cast(rules\[i\]);

    bool hook\_cached = false;  
    if (pmd->cached\_hook && \\  
            pmd->cached\_hook\_pmd && \\  
            pmd->cached\_hook->hook\_flow\_miss) {  
        hook\_cached = pmd->cached\_hook->hook\_flow\_miss(pmd->cached\_hook\_pmd, packet, flow);  
    }

    if (!hook\_cached) {  
        bool smc\_enable\_db;  
        atomic\_read\_relaxed(&pmd->dp->smc\_enable\_db, &smc\_enable\_db);  
        // 查找到了packet,如果开启了smc,更新smc  
        if (smc\_enable\_db) {  
            uint32\_t hash =  dp\_netdev\_flow\_hash(&flow->ufid);  
            smc\_insert(pmd, keys\[i\], hash);  
        }  
        // 查到了packet,看是否写会更新上一层cache(EMC)  
        if (emc\_probabilistic\_insert(pmd, keys\[i\], flow)) {  
            if (flow->status == OFFLOAD\_NONE) {  
                queue\_netdev\_flow\_put(pmd->dp->dp\_flow\_offload, \\  
                        pmd->dp->class, \\  
                        flow, NULL, DP\_NETDEV\_FLOW\_OFFLOAD\_OP\_ADD);  
            }  
        }  
    }  
    /\* Add these packets into the flow map in the same order  
     \* as received.  
     \*/  
    tcp\_flags = miniflow\_get\_tcp\_flags(&keys\[i\]->mf);  
    packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,  
                               flow\_map, recv\_idx);  
}  
// 更新各个信息  
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MASKED\_HIT,  
                        cnt - upcall\_ok\_cnt - upcall\_fail\_cnt);  
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MASKED\_LOOKUP,  
                        lookup\_cnt);  
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MISS,  
                        upcall\_ok\_cnt);  
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_LOST,  
                        upcall\_fail\_cnt);  

}

2.4 smc更新smc_insert()

static inline void
smc_insert(struct dp_netdev_pmd_thread *pmd,
const struct netdev_flow_key *key,
uint32_t hash)
{
struct smc_cache *smc_cache = &(pmd->flow_cache).smc_cache;
struct smc_bucket *bucket = &smc_cache->buckets[key->hash & SMC_MASK];
uint16_t index;
uint32_t cmap_index;
int i;

//布谷鸟算法  
cmap\_index = cmap\_find\_index(&pmd->flow\_table, hash);  
index = (cmap\_index >= UINT16\_MAX) ? UINT16\_MAX : (uint16\_t)cmap\_index;

/\* If the index is larger than SMC can handle (uint16\_t), we don't  
 \* insert \*/  
if (index == UINT16\_MAX) {  
    //表明找到了  
    return;  
}

/\* If an entry with same signature already exists, update the index \*/  
uint16\_t sig = key->hash >> 16;  
for (i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {  
    if (bucket->sig\[i\] == sig) {  
        bucket->flow\_idx\[i\] = index;  
        return;  
    }  
}  
/\* If there is an empty entry, occupy it. \*/  
for (i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {  
    if (bucket->flow\_idx\[i\] == UINT16\_MAX) {  
        bucket->sig\[i\] = sig;  
        bucket->flow\_idx\[i\] = index;  
        return;  
    }  
}  
/\* Otherwise, pick a random entry. \*/  
i = random\_uint32() % SMC\_ENTRY\_PER\_BUCKET;  
bucket->sig\[i\] = sig;  
bucket->flow\_idx\[i\] = index;  

}

这里就不讲具体代码了,讲一下大概:到openflow查找后会更新dpcls,执行dp_netdev_flow_add() --> dpcls_insert() --> dpcls_find_subtable() --> cmap_insert()

dpcls_find_subtable():

  找一下是否存在相同mask的subtable,存在返回这个subtable,不存在就创建一个subtable,创建的时候会调用dpcls_create_subtable,里面有个dpcls_subtable_get_best_impl会根据mask的miniflow的bits[0]和bits[1]选择的查找算法。

cmap_insert里hash算法用的就是布谷鸟hash,hash两次,插入的核心代码:

static bool
cmap_try_insert(struct cmap_impl *impl, struct cmap_node *node, uint32_t hash)
{
uint32_t h1 = rehash(impl, hash);
uint32_t h2 = other_hash(h1);
// hash两次找到两个桶
struct cmap_bucket *b1 = &impl->buckets[h1 & impl->mask];
struct cmap_bucket *b2 = &impl->buckets[h2 & impl->mask];

// 插入规则:  
// 1.是否有相同hash的node,就插到对应链上  
// 2.没有相同hash,就看有没有空的node  
// 3.都不行就通过bfs,看能否让b1,b2空出来一个,把这个放进去  
// 都不行就插入失败  
return (OVS\_UNLIKELY(cmap\_insert\_dup(node, hash, b1) ||  
                     cmap\_insert\_dup(node, hash, b2)) ||  
        OVS\_LIKELY(cmap\_insert\_bucket(node, hash, b1) ||  
                   cmap\_insert\_bucket(node, hash, b2)) ||  
        cmap\_insert\_bfs(impl, node, hash, b1, b2));  

}

参考博客:

OVS-DPDK Datapath Classifier :这个是理论上的流程,看完就知道这个算法流程了

ovs分类器 flow和miniflow :很重要的结构体miniflow

OVS-DPDK DataPath Classifier反向设计 :这个有很多详细的解释,但不怎么流畅