在介绍之前先说一些概念:里面有两个结构很重要,一个是flow一个是miniflow这里介绍一下他们的数据结构和构造函数。
flow的特点是8字节对齐的,存储报文相关字段和其他原数据,用于匹配流表,数据包含四个层次:
flow的坏处就是占用了很大的字节,并且有很多字段都是0,在2.8版本中flow的大小是672字节。
miniflow是flow的压缩版,因为flow占用字节很大,比如可以支持ARP,IP等报文,填充了arp字段,icmp报文就是空的了,浪费了很多信息。过程中用到hash作为key,也是根据miniflow计算hash值,不是用的flow。
struct miniflow {
struct flowmap map;
};
struct flowmap {
map_t bits[FLOWMAP_UNITS];
};
miniflow其包含两部分内容:
miniflow数据结构:
//flow是8字节对齐的,除8得到flow中包含8字节的个数
#define FLOW_U64S (sizeof(struct flow) / sizeof(uint64_t))
//map大小为8字节,MAP_T_BITS 为64位
typedef unsigned long long map_t;
#define MAP_T_BITS (sizeof(map_t) * CHAR_BIT)
//每位表示一个u64,FLOWMAP_UNITS 表示最少需要几个64位
#define FLOWMAP_UNITS DIV_ROUND_UP(FLOW_U64S, MAP_T_BITS)
struct flowmap {
map_t bits[FLOWMAP_UNITS];
};
struct miniflow {
struct flowmap map;
/* Followed by:
* uint64_t values[n];
* where 'n' is miniflow_n_values(miniflow). */
};
struct netdev_flow_key {
uint32_t hash;
uint32_t len;
struct miniflow mf; // bits
uint64_t buf[FLOW_MAX_PACKET_U64S]; // 就是上边所说的value
};
// 有些字段是互斥的
#define FLOW_MAX_PACKET_U64S (FLOW_U64S \
/* Unused in datapath */ - FLOW_U64_SIZE(regs) \
- FLOW_U64_SIZE(metadata) \
/* L2.5/3 */ - FLOW_U64_SIZE(nw_src) /* incl. nw_dst */ \
- FLOW_U64_SIZE(mpls_lse) \
/* L4 */ - FLOW_U64_SIZE(tp_src) \
)
miniflow优点:
void
miniflow_extract(struct dp_packet *packet, struct miniflow *dst)
{
…
// 初始化赋值有两个关键,一个是这个values: return (uint64_t *)(mf + 1);
// 就是上边说的
uint64_t *values = miniflow_values(dst);
struct mf_ctx mf = { FLOWMAP_EMPTY_INITIALIZER, values,
values + FLOW_U64S };
…
if (md->skb_priority || md->pkt_mark) {
miniflow_push_uint32(mf, skb_priority, md->skb_priority);
miniflow_push_uint32(mf, pkt_mark, md->pkt_mark);
}
miniflow_push_be16(mf, dl_type, dl_type);
miniflow_pad_to_64(mf, dl_type);
…
// 去取网络层信息,从这里可以看出,ovs暂时只支持IP,IPV6,ARP,RARP报文
if (OVS\_LIKELY(dl\_type == htons(ETH\_TYPE\_IP))){...}
else if
...
// 提取传输层,从这里可以看出,ovs暂时支持传输层协议有TCP,UDP,SCTP,ICMP,ICMPV6
if (OVS\_LIKELY(nw\_proto == IPPROTO\_TCP)){...}
else if
...
miniflow_push_uint32()
在上面将value保存到miniflow时,用到了几个辅助函数,比如下面的miniflow_push_uint32用来将一个32位的值保存到miniflow中FIELD对应的位置。其首先调用offsetof获取field在flow中的偏移字节数,因为flow是8字节对齐的,所以一个四字节的成员变量要么位于8字节的起始位置,要么位于8字节的中间位置,即对8取模值肯定为0或者4,再调用miniflow_push_uint32_保存到对应的位置,并设置map中对应的bit为1。
#define miniflow_push_uint32(MF, FIELD, VALUE) \
miniflow_push_uint32_(MF, offsetof(struct flow, FIELD), VALUE)
#define miniflow_push_uint32_(MF, OFS, VALUE) \
{ \
MINIFLOW_ASSERT(MF.data < MF.end); \
\
//成员变量位于起始位置,需要调用miniflow_set_map设置对应的bit为1
if ((OFS) % 8 == 0) { \
miniflow_set_map(MF, OFS / 8); \
*(uint32_t *)MF.data = VALUE; \
} else if ((OFS) % 8 == 4) { \
//成员变量不在起始位置,要判断此变量所在的bit为1
miniflow_assert_in_map(MF, OFS / 8); \
*((uint32_t *)MF.data + 1) = VALUE; \
MF.data++; \
} \
}
/* Initializes 'dst' as a copy of 'src'. */
void
miniflow_expand(const struct miniflow *src, struct flow *dst)
{
memset(dst, 0, sizeof *dst);
flow_union_with_miniflow(dst, src);
}
/* Perform a bitwise OR of miniflow 'src' flow data with the equivalent
* fields in 'dst', storing the result in 'dst'. */
static inline void
flow_union_with_miniflow(struct flow *dst, const struct miniflow *src)
{
flow_union_with_miniflow_subset(dst, src, src->map);
}
static inline void
flow_union_with_miniflow_subset(struct flow *dst, const struct miniflow *src,
struct flowmap subset)
{
uint64_t *dst_u64 = (uint64_t *) dst;
const uint64_t *p = miniflow_get_values(src);
map_t map;
//遍历所有的map
FLOWMAP_FOR_EACH_MAP (map, subset) {
size_t idx;
//遍历map中所有的非0bit
MAP_FOR_EACH_INDEX(idx, map) {
dst_u64[idx] |= *p++;
}
dst_u64 += MAP_T_BITS;
}
}
该部分入口在lib/dpif-netdev.c,就是最开始的那个图。
查询的缓存分为两层:一个是DFC,一个是dpcls,相当于microflow和megaflow,DFC由两部分组成,DFC(datapath flow cache):EMC(Exact match cache)+SMC(Signature match cache),另一部分就是dpcls(datapath classifer)。
SMC默认关闭:bool smc_enable = smap_get_bool(other_config, "smc-enable", false);
函数执行流程(不包含SMC的):
static void
dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets,
bool md_is_valid, odp_port_t port_no)
{
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = dp_packet_batch_size(packets);
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
OVS_ALIGNED_VAR(CACHE_LINE_SIZE)
struct netdev_flow_key keys[PKT_ARRAY_SIZE];
struct netdev_flow_key *missed_keys[PKT_ARRAY_SIZE];
struct packet_batch_per_flow batches[PKT_ARRAY_SIZE];
size_t n_batches;
struct dp_packet_flow_map flow_map[PKT_ARRAY_SIZE];
uint8_t index_map[PKT_ARRAY_SIZE];
size_t n_flows, i;
odp\_port\_t in\_port;
n\_batches = 0;
// 1. dfc\_processing之后会把miss的放到packets里
// 找到的可能已经batched了,或者放到flow\_map里了
// flow\_map里是未bathed的,可能直接是\*flow或者是NULL,是NULL再去下一层cache查
dfc\_processing(pmd, packets, keys, missed\_keys, batches, &n\_batches,
flow\_map, &n\_flows, index\_map, md\_is\_valid, port\_no);
// 2. 如果有miss的,再去找fast-path,也就是查dpcls
if (!dp\_packet\_batch\_is\_empty(packets)) {
in\_port = packets->packets\[0\]->md.in\_port.odp\_port;
fast\_path\_processing(pmd, packets, missed\_keys,
flow\_map, index\_map, in\_port);
}
/\* Batch rest of packets which are in flow map. \*/
for (i = 0; i < n\_flows; i++) {
struct dp\_packet\_flow\_map \*map = &flow\_map\[i\];
if (OVS\_UNLIKELY(!map->flow)) {
continue;
}
dp\_netdev\_queue\_batches(map->packet, map->flow, map->tcp\_flags,
batches, &n\_batches);
}
for (i = 0; i < n\_batches; i++) {
batches\[i\].flow->batch = NULL;
}
// 执行每个packet的action
for (i = 0; i < n\_batches; i++) {
packet\_batch\_per\_flow\_execute(&batches\[i\], pmd);
}
}
static inline size_t
dfc_processing(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets_,
struct netdev_flow_key *keys,
struct netdev_flow_key **missed_keys,
struct packet_batch_per_flow batches[], size_t *n_batches,
struct dp_packet_flow_map *flow_map,
size_t *n_flows, uint8_t *index_map,
bool md_is_valid, odp_port_t port_no)
{
struct netdev_flow_key *key = &keys[0];
size_t n_missed = 0, n_emc_hit = 0;
struct dfc_cache *cache = &pmd->flow_cache;
struct dp_packet *packet;
size_t cnt = dp_packet_batch_size(packets_);
// emc的插入概率,如果为0,表示不开启emc
uint32_t cur_min = pmd->ctx.emc_insert_min;
int i;
uint16_t tcp_flags;
bool smc_enable_db;
// 记录未batched的个数
size_t map_cnt = 0;
// 这个变量用于保序
bool batch_enable = true;
// 获取smc是否开启参数
atomic_read_relaxed(&pmd->dp->smc_enable_db, &smc_enable_db);
pmd_perf_update_counter(&pmd->perf_stats,
md_is_valid ? PMD_STAT_RECIRC : PMD_STAT_RECV,
cnt);
do\_dfc\_hook(pmd, packets\_, batches, n\_batches);
cnt = dp\_packet\_batch\_size(packets\_);
// 逐个对dp\_packet\_batch中的每一个packet进行处理
DP\_PACKET\_BATCH\_REFILL\_FOR\_EACH (i, cnt, packet, packets\_) {
struct dp\_netdev\_flow \*flow;
// 若packet包长小于以太头的长度直接丢包
if (OVS\_UNLIKELY(dp\_packet\_size(packet) < ETH\_HEADER\_LEN)) {
dp\_packet\_delete(packet);
COVERAGE\_INC(datapath\_drop\_rx\_invalid\_packet);
continue;
}
// 对数据手工预取可减少读取延迟,从而提高性能
if (i != cnt - 1) {
struct dp\_packet \*\*packets = packets\_->packets;
/\* Prefetch next packet data and metadata. \*/
OVS\_PREFETCH(dp\_packet\_data(packets\[i+1\]));
pkt\_metadata\_prefetch\_init(&packets\[i+1\]->md);
}
// 初始化metadata首先将pkt\_metadata中flow\_in\_port前的字节全部设为0
// 将in\_port.odp\_port设为port\_no, tunnel.ipv6\_dst设为in6addr\_any
if (!md\_is\_valid) {
pkt\_metadata\_init(&packet->md, port\_no);
}
// 报文转化为miniflow, 上文有讲
miniflow\_extract(packet, &key->mf);
key->len = 0; /\* Not computed yet. \*/
// 计算当前报文miniflow的hash值
key->hash =
(md\_is\_valid == false)
? dpif\_netdev\_packet\_get\_rss\_hash\_orig\_pkt(packet, &key->mf)
: dpif\_netdev\_packet\_get\_rss\_hash(packet, &key->mf);
// 根据key->hash,emc\_entry alive,miniflow 3个条件得到dp\_netdev\_flow
// cur\_min = 0,表示不可能插入,后面有讲什么时候才会插入EMC
flow = (cur\_min != 0) ? emc\_lookup(&cache->emc\_cache, key) : NULL;
if (OVS\_LIKELY(flow)) {
tcp\_flags = miniflow\_get\_tcp\_flags(&key->mf);
n\_emc\_hit++; // 命中次数+1
// 为了保证报文的顺序,所有的packet对应的flow都用flow\_map存储
// flow\_map里面就是packet数量对应的(packet,flow,tcp\_flag)
// 最后会把这些在dp\_netdev\_input\_\_里重新把顺序合并一下
if (OVS\_LIKELY(batch\_enable)) {
// 把查到的flow加到batches里第n\_batches个batch里
dp\_netdev\_queue\_batches(packet, flow, tcp\_flags, batches,
n\_batches);
} else {
packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,
flow\_map, map\_cnt++);
}
} else {
// 这些数据结构用于smc查询时的记录
// 没查到把packet放到packets\_里,从下标0再开始放
// 最后packets\_都是未查到的
dp\_packet\_batch\_refill(packets\_, packet, i);
index\_map\[n\_missed\] = map\_cnt;
flow\_map\[map\_cnt++\].flow = NULL;
missed\_keys\[n\_missed\] = key;
key = &keys\[++n\_missed\];
batch\_enable = false; // 之后的都是未batched的
}
}
\*n\_flows = map\_cnt;
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_EXACT\_HIT, n\_emc\_hit);
// 如果没有开启smc,直接返回了
if (!smc\_enable\_db) {
return dp\_packet\_batch\_size(packets\_);
}
smc\_lookup\_batch(pmd, keys, missed\_keys, packets\_,
n\_missed, flow\_map, index\_map);
return dp\_packet\_batch\_size(packets\_);
}
static inline struct dp_netdev_flow *
emc_lookup(struct emc_cache *cache, const struct netdev_flow_key *key)
{
struct emc_entry *current_entry;
// 这里说一下,一个hash分配两个桶,长度为13位,cache桶的大小为1<<13
// struct emc\_cache {
// struct emc\_entry entries\[EM\_FLOW\_HASH\_ENTRIES\];
// int sweep\_idx; /\* For emc\_cache\_slow\_sweep(). \*/
// };
EMC\_FOR\_EACH\_POS\_WITH\_HASH (cache, current\_entry, key->hash) {
if (current_entry->key.hash == key->hash
&& emc_entry_alive(current_entry)
&& emc_flow_key_equal_mf(¤t_entry->key, &key->mf)) {
/* We found the entry with the 'key->mf' miniflow */
return current_entry->flow;
}
}
return NULL;
}
#define EM_FLOW_HASH_SHIFT 13
#define EM_FLOW_HASH_ENTRIES (1u << EM_FLOW_HASH_SHIFT)
#define EM_FLOW_HASH_MASK (EM_FLOW_HASH_ENTRIES - 1)
#define EM_FLOW_HASH_SEGS 2
#define EMC_FOR_EACH_POS_WITH_HASH(EMC, CURRENT_ENTRY, HASH) \
for (uint32_t i__ = 0, srch_hash__ = (HASH); \
(CURRENT_ENTRY) = &(EMC)->entries[srch_hash__ & EM_FLOW_HASH_MASK], \
i__ < EM_FLOW_HASH_SEGS; \
i__++, srch_hash__ >>= EM_FLOW_HASH_SHIFT)
// 比较miniflow是否相同
static inline bool
emc_flow_key_equal_mf(const struct netdev_flow_key *key,
const struct miniflow *mf)
{
return !memcmp(&key->mf, mf, key->len);
}
EMC查询函数执行:
static inline void
smc_lookup_batch(struct dp_netdev_pmd_thread *pmd,
struct netdev_flow_key *keys,
struct netdev_flow_key **missed_keys,
struct dp_packet_batch *packets_,
const int cnt,
struct dp_packet_flow_map *flow_map,
uint8_t *index_map)
{
int i;
struct dp_packet *packet;
size_t n_smc_hit = 0, n_missed = 0;
struct dfc_cache *cache = &pmd->flow_cache;
struct smc_cache *smc_cache = &cache->smc_cache;
const struct cmap_node *flow_node;
int recv_idx;
uint16_t tcp_flags;
/\* Prefetch buckets for all packets \*/
for (i = 0; i < cnt; i++) {
OVS\_PREFETCH(&smc\_cache->buckets\[keys\[i\].hash & SMC\_MASK\]);
}
DP\_PACKET\_BATCH\_REFILL\_FOR\_EACH (i, cnt, packet, packets\_) {
struct dp\_netdev\_flow \*flow = NULL;
// 找到hash相同的flow链表的头节点
flow\_node = smc\_entry\_get(pmd, keys\[i\].hash);
bool hit = false;
/\* Get the original order of this packet in received batch. \*/
recv\_idx = index\_map\[i\];
if (OVS\_LIKELY(flow\_node != NULL)) {
// 遍历一下看看哪一个是相同的,这个通过offsetof找到存放该cmap结构体的首地址
// dp\_netdev\_flow里面的首地址就是,
CMAP\_NODE\_FOR\_EACH (flow, node, flow\_node) {
/\* Since we dont have per-port megaflow to check the port
\* number, we need to verify that the input ports match. \*/
if (OVS\_LIKELY(dpcls\_rule\_matches\_key(&flow->cr, &keys\[i\]) &&
flow->flow.in\_port.odp\_port == packet->md.in\_port.odp\_port)) {
tcp\_flags = miniflow\_get\_tcp\_flags(&keys\[i\].mf);
keys\[i\].len =
netdev\_flow\_key\_size(miniflow\_n\_values(&keys\[i\].mf));
if (emc\_probabilistic\_insert(pmd, &keys\[i\], flow)) {
if (flow->status == OFFLOAD\_NONE) {
queue\_netdev\_flow\_put(pmd->dp->dp\_flow\_offload, \\
pmd->dp->class, \\
flow, NULL, DP\_NETDEV\_FLOW\_OFFLOAD\_OP\_ADD);
}
}
packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,
flow\_map, recv\_idx);
n\_smc\_hit++;
hit = true;
break;
}
}
if (hit) {
continue;
}
}
// SMC也miss了,和之前一样,把miss的放packets\_里,从0开始放
dp\_packet\_batch\_refill(packets\_, packet, i);
index\_map\[n\_missed\] = recv\_idx;
missed\_keys\[n\_missed++\] = &keys\[i\];
}
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_SMC\_HIT, n\_smc\_hit);
}
查找hash相同的链表头:smc_entry_get()
static inline const struct cmap_node *
smc_entry_get(struct dp_netdev_pmd_thread *pmd, const uint32_t hash)
{
struct smc_cache *cache = &(pmd->flow_cache).smc_cache;
// smc_cache桶的大小是(1<<18),SMC_MASK=(1<<18)- 1
// 先通过后hash的后18位定位到桶
struct smc_bucket *bucket = &cache->buckets[hash & SMC_MASK];
// 一个桶有4个16位的sig,存key->hash前16位,正好是64位
// 遍历4个元素看那个匹配,获得匹配后的cmap的下标
uint16_t sig = hash >> 16;
uint16_t index = UINT16_MAX;
for (int i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {
if (bucket->sig\[i\] == sig) {
index = bucket->flow\_idx\[i\];
break;
}
}
// 通过index找到在dpcls里的桶位置
if (index != UINT16\_MAX) {
return cmap\_find\_by\_index(&pmd->flow\_table, index);
}
return NULL;
}
命中SMC后,插入回上一层cache(EMC)里:emc_probabilistic_insert()
插入EMC的条件:
默认插入流表的概率是1%,可以通过ovs-vsctl set Open_vSwitch . other_config:emc-insert-prob=10 设置概率,表示平均10条流表有1条插入,当为0时禁用EMC,当为1的时候,百分百插入。设置后会在代码里设置emc_insert_min字段为uint_max/10,插入的时候生成一个uint_random(),如果随机数小于emc_insert_min才会插入。
static inline bool
emc_probabilistic_insert(struct dp_netdev_pmd_thread *pmd,
const struct netdev_flow_key *key,
struct dp_netdev_flow *flow)
{
/* Insert an entry into the EMC based on probability value 'min'. By
* default the value is UINT32_MAX / 100 which yields an insertion
* probability of 1/100 ie. 1% */
uint32_t min = pmd->ctx.emc_insert_min;
if (min && random_uint32() <= min) {
emc_insert(&(pmd->flow_cache).emc_cache, key, flow);
return true;
}
return false;
}
emc_insert同样有我在内核查询里的问题,如果cache里没有该miniflow,会找一个hash值小的entry,覆盖这个entry,那如果有一个hash很大的flow被插入了,但是这个flow之后就没用过了,那岂不是这个entry就浪费了,不会被用到。
找到了合适的emc_entry。则将报文对应的netdev_dev_flow key信息存储到该表项中。而对于这个表项,原有的emc_entry.flow有可能还有指向一条旧的流表,需要将这条流表的引用计数减1,如果减1后达到0,则释放该流表空间。同时更新emc_entry.flow重新指向新的流表。到此为止,EMC表项更新完毕。
static inline void
emc_insert(struct emc_cache *cache, const struct netdev_flow_key *key,
struct dp_netdev_flow *flow)
{
struct emc_entry *to_be_replaced = NULL;
struct emc_entry *current_entry;
EMC\_FOR\_EACH\_POS\_WITH\_HASH(cache, current\_entry, key->hash) {
if (netdev\_flow\_key\_equal(¤t\_entry->key, key)) {
/\* We found the entry with the 'mf' miniflow \*/
emc\_change\_entry(current\_entry, flow, NULL);
return;
}
/\* Replacement policy: put the flow in an empty (not alive) entry, or
\* in the first entry where it can be \*/
if (!to\_be\_replaced
|| (emc\_entry\_alive(to\_be\_replaced)
&& !emc\_entry\_alive(current\_entry))
|| current\_entry->key.hash < to\_be\_replaced->key.hash) {
// 这个黄色判断就是我迷惑的地方
to\_be\_replaced = current\_entry;
}
}
/\* We didn't find the miniflow in the cache.
\* The 'to\_be\_replaced' entry is where the new flow will be stored \*/
emc\_change\_entry(to\_be\_replaced, flow, key);
}
在pmd_thread_main()里面:
if (lc++ > 1024) {
lc = 0;
coverage\_try\_clear();
// 这里的optimize是排序一下TSS
dp\_netdev\_pmd\_try\_optimize(pmd, poll\_list, poll\_cnt);
dp\_netdev\_pmd\_hook\_idle\_run(pmd);
#ifdef ENABLE_EMC
if (!ovsrcu_try_quiesce()) {
emc_cache_slow_sweep(pmd->dp, &((pmd->flow_cache).emc_cache));
}
#else
ovsrcu_try_quiesce();
#endif
for (i = 0; i < poll\_cnt; i++) {
uint64\_t current\_seq =
netdev\_get\_change\_seq(poll\_list\[i\].rxq->port->netdev);
if (poll\_list\[i\].change\_seq != current\_seq) {
poll\_list\[i\].change\_seq = current\_seq;
poll\_list\[i\].rxq\_enabled =
netdev\_rxq\_enabled(poll\_list\[i\].rxq->rx);
}
}
}
dpcls是megaflow的查询过程,使用TSS算法,是个很老的算法了,看源码之前,先讲一下ovs里面的TSS,之前内核已经讲过,但是没有讲OVS里做的优化,下边再说一次,然后建议再去看一下这个有很多图的博客OVS-DPDK Datapath Classifier,这样之后对整个dpcls流程就有所了解了。
OVS 在内核态使用了元组空间搜索算法(Tuple Space Search,简称 TSS)进行流表查找,元组空间搜索算法的核心思想是,把所有规则按照每个字段的前缀长度进行组合,并划分为不同的元组中,然后在这些元组集合中进行哈希查找。我们举例说明,假设现有 10 条规则以及 3 个匹配字段,每个匹配字段长度均为 4:
我们将每条规则各匹配字段的前缀长度提取出来,按照前缀长度进行组合,并根据前缀长度组合进行分组:
我们将每个前缀长度组合称为 元组,每个元组对应于哈希表的一个桶,同一前缀长度组合内的所有规则放置在同一个哈希桶内:
10 条规则被划分为 4 个元组,因此最多只需要四次查找,就可以找到对应的规则。
为什么OVS选择TSS,而不选择其他查找算法?论文给出了以下三点解释:
(1)在虚拟化数据中心环境下,流的添加删除比较频繁,TSS支持高效的、常数时间的表项更新; (2)TSS支持任意匹配域的组合; (3)TSS存储空间随着流的数量线性增长,空间复杂度为 O(N),N 为规则数目。
元组空间搜索算法的缺点是,由于基于哈希表实现,因此查找的时间复杂度不能确定。当所有规则各个字段的前缀长度组合数目过多时,查找性能会大大降低,最坏情况下需要查找所有规则。
查找的过程需要从前向后遍历所有元组,命中了就不用往后查了。OVS给每个元组加了一个命中次数,命中次数越多,元组这个链表越靠前,这样就可以减少了查表次数。
// 线程安全的
#define OVSRCU_TYPE(TYPE) struct { ATOMIC(TYPE) p; }
struct cmap {
OVSRCU_TYPE(struct cmap_impl *) impl;
};
/* The implementation of a concurrent hash map. */
struct cmap_impl {
// 补齐64字节
PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline0,
unsigned int n; /* Number of in-use elements. */
unsigned int max_n; /* Max elements before enlarging. */
unsigned int min_n; /* Min elements before shrinking. */
uint32_t mask; /* Number of 'buckets', minus one. */
uint32_t basis; /* Basis for rehashing client's
hash values. */
);
PADDED_MEMBERS_CACHELINE_MARKER(CACHE_LINE_SIZE, cacheline1,
struct cmap_bucket buckets[1];
);
};
struct cmap_bucket {
/* Padding to make cmap_bucket exactly one cache line long. */
PADDED_MEMBERS(CACHE_LINE_SIZE,
// 锁机制,读和写都会+1,读的时候等到变成偶数再去读,保证安全
atomic_uint32_t counter;
// 桶中的每个槽用(hashs[i], nodes[i])元组来表示
uint32_t hashes[CMAP_K];
struct cmap_node nodes[CMAP_K];
);
};
struct cmap_node {
OVSRCU_TYPE(struct cmap_node *) next; /* Next node with same hash. */
};
/* 二级匹配表.每个报文接收端口对应一个 */
struct dpcls {
struct cmap_node node; /* 链表节点 */
odp_port_t in_port; /* 报文接收端口 */
struct cmap subtables_map; // 管理下边subtables的索引,用于遍历
struct pvector subtables; // 上文TSS算法所说的元组表
}
struct pvector {
// 指向具体子表信息
OVSRCU_TYPE(struct pvector_impl *) impl;
// 平时,temp都是为NULL.只有当pvector扩充时,temp才用来临时缓存数据.
// 待排好序后,再拷贝到impl中,temp再置NULL
struct pvector_impl *temp;
};
// 相当于vector
struct pvector_impl {
size_t size; /* Number of entries in the vector */
size_t allocated; /* Number allocted entries */
/* 初始化的时候只有4个元素.后续可能会扩充 */
struct pvector_entry vector[];
}
struct pvector_entry {
// pvector_impl中的vector是按照priority从小到大排序的
// pmd_thread_main里会把priority赋值为hit_cnt,然后排序
int priority;
/* 实际指向了struct dpcls_subtable结构 */
void *ptr;
}
// 子表信息
struct dpcls_subtable {
/* The fields are only used by writers. */
struct cmap_node cmap_node OVS_GUARDED; /* Within dpcls 'subtables_map'. */
struct cmap rules; // 该表的bucket内容
uint32\_t hit\_cnt; // 命中该子表的次数
// 下边是mask的miniflow前两个的bits里1的个数
uint8\_t mf\_bits\_set\_unit0;
uint8\_t mf\_bits\_set\_unit1;
// 根据mf\_bits\_set\_unit01选择最合适的查找算法
dpcls\_subtable\_lookup\_func lookup\_func;
/\* Caches the masks to match a packet to, reducing runtime calculations. \*/
uint64\_t \*mf\_masks; // 由下边的mask->mf->bits\[01\]得来的,
struct netdev\_flow\_key mask; // 该表的掩码信息
};
关于上边的mf_masks与mask,举个例子
mf_bits_set_unit0 = 4, mf_bits_set_unit1 = 0
netdev_flow_key.mf.bits[0] = 111010 (2进制)
mf_masks = [1, 111, 1111, 11111] (2进制)
三个图对应他们的关系,链表三用于遍历的,查找过程中并不会通过链表三方式搜索。查找的时候走的就是链表二的流程。
static bool
dpcls_lookup(struct dpcls *cls, const struct netdev_flow_key *keys[],
struct dpcls_rule **rules, const size_t cnt,
int *num_lookups_p)
{
#define MAP_BITS (sizeof(uint32_t) * CHAR_BIT)
BUILD_ASSERT_DECL(MAP_BITS >= NETDEV_MAX_BURST);
struct dpcls\_subtable \*subtable;
uint32\_t keys\_map = TYPE\_MAXIMUM(uint32\_t); /\* Set all bits. \*/
if (cnt != MAP\_BITS) {
/\*keys\_map中置1位数为包的总数,并且第i位对应第i个包\*/
keys\_map >>= MAP\_BITS - cnt; /\* Clear extra bits. \*/
}
memset(rules, 0, cnt \* sizeof \*rules);
int lookups\_match = 0, subtable\_pos = 1;
uint32\_t found\_map;
PVECTOR\_FOR\_EACH (subtable, &cls->subtables) {
// 查找函数,对应下边的lookup\_generic()
found\_map = subtable->lookup\_func(subtable, keys\_map, keys, rules);
uint32\_t pkts\_matched = count\_1bits(found\_map);
// 搜索的子表个数,加上的是当前这几个key找了多少个表
lookups\_match += pkts\_matched \* subtable\_pos;
keys\_map &= ~found\_map;
if (!keys\_map) {
if (num\_lookups\_p) {
\*num\_lookups\_p = lookups\_match;
}
// 全找到了
return true;
}
subtable\_pos++;
}
if (num\_lookups\_p) {
\*num\_lookups\_p = lookups\_match;
}
// 没有全找到
return false;
}
ovs-dpdk里面有avx512-gather.c,使用avx512优化了look_up,整体逻辑还是一样的,这里只说dpif-netdev-lookup-generic
入口在这里,往下走,传进去subtable有效字段有多大
static uint32_t
dpcls_subtable_lookup_generic(struct dpcls_subtable *subtable,
uint32_t keys_map,
const struct netdev_flow_key *keys[],
struct dpcls_rule **rules)
{
return lookup_generic_impl(subtable, keys_map, keys, rules,
subtable->mf_bits_set_unit0,
subtable->mf_bits_set_unit1);
}
static inline uint32_t ALWAYS_INLINE
lookup_generic_impl(struct dpcls_subtable *subtable, // 当前的subtable
uint32_t keys_map, // miss_bit_map
const struct netdev_flow_key *keys[], // miss_key
struct dpcls_rule **rules, // save hit_rule
const uint32_t bit_count_u0,
const uint32_t bit_count_u1)
{
// 有几个包
const uint32_t n_pkts = count_1bits(keys_map);
ovs_assert(NETDEV_MAX_BURST >= n_pkts);
uint32_t hashes[NETDEV_MAX_BURST];
// 根据mask字段的大小开空间
const uint32\_t bit\_count\_total = bit\_count\_u0 + bit\_count\_u1;
// 一个batch最大是NETDEV\_MAX\_BURST
const uint32\_t block\_count\_required = bit\_count\_total \* NETDEV\_MAX\_BURST;
uint64\_t \*mf\_masks = subtable->mf\_masks;
int i;
// 申请存储一个batch报文信息的数组,存放
uint64\_t \*blocks\_scratch = get\_blocks\_scratch(block\_count\_required);
// 获得每个key与当前表的mask“与运算”的结果
ULLONG\_FOR\_EACH\_1 (i, keys\_map) {
netdev\_flow\_key\_flatten(keys\[i\],
&subtable->mask, // 该表的掩码信息
mf\_masks, // 由subtable->mask处理后的mask
&blocks\_scratch\[i \* bit\_count\_total\],
bit\_count\_u0,
bit\_count\_u1);
}
// 算出来每一个key在该subtable里的hash值,该hash值由“mask字节数,key和mask与运算结果”得出
ULLONG\_FOR\_EACH\_1 (i, keys\_map) {
uint64\_t \*block\_ptr = &blocks\_scratch\[i \* bit\_count\_total\];
uint32\_t hash = hash\_add\_words64(0, block\_ptr, bit\_count\_total);
hashes\[i\] = hash\_finish(hash, bit\_count\_total \* 8);
}
uint32\_t found\_map;
const struct cmap\_node \*nodes\[NETDEV\_MAX\_BURST\];
// 找到每个key在该subtable里的cmap,并且返回每个key有没有被找到,第i位是1则找到
found\_map = cmap\_find\_batch(&subtable->rules, keys\_map, hashes, nodes);
ULLONG\_FOR\_EACH\_1 (i, found\_map) {
struct dpcls\_rule \*rule;
// 可能不同的rule有相同的hash,看那个是匹配的
CMAP\_NODE\_FOR\_EACH (rule, cmap\_node, nodes\[i\]) {
const uint32\_t cidx = i \* bit\_count\_total;
/\*rule->mask & keys\[i\]的值与rule->flow相比较\*/
uint32\_t match = netdev\_rule\_matches\_key(rule, bit\_count\_total,
&blocks\_scratch\[cidx\]);
if (OVS\_LIKELY(match)) {
rules\[i\] = rule;
subtable->hit\_cnt++;
goto next;
}
}
ULLONG\_SET0(found\_map, i); /\* Did not match. \*/
next:
; /\* Keep Sparse happy. \*/
}
return found\_map;
}
// 这个函数对应dpif-netdev.c里面的dpcls_flow_key_gen_masks()
static inline void
netdev_flow_key_flatten(const struct netdev_flow_key *key, // 要查找的miss_key
const struct netdev_flow_key *mask,
const uint64_t *mf_masks,
uint64_t *blocks_scratch,
const uint32_t u0_count,
const uint32_t u1_count)
{
/* Load mask from subtable, mask with packet mf, popcount to get idx. */
const uint64_t *pkt_blocks = miniflow_get_values(&key->mf);
const uint64_t *tbl_blocks = miniflow_get_values(&mask->mf); // 获取miss_key和mask的miniflow
/\* Packet miniflow bits to be masked by pre-calculated mf\_masks. \*/
const uint64\_t pkt\_bits\_u0 = key->mf.map.bits\[0\];
const uint32\_t pkt\_bits\_u0\_pop = count\_1bits(pkt\_bits\_u0);
const uint64\_t pkt\_bits\_u1 = key->mf.map.bits\[1\];
// 这个函数就是把miss_key与subtable的掩码进行&运算
// 会运算出该mask在意字段结果,放到blocks_scratch里
netdev_flow_key_flatten_unit(&pkt_blocks[0], // key-mf的数据段
&tbl_blocks[0], // mask->mf的数据段
&mf_masks[0], // mask->mf->bits得来mask
&blocks_scratch[0], // 存放的地址
pkt_bits_u0, // key->mf里的bits[0]
u0_count); // mask->mf->bits[0]里1的个数
netdev\_flow\_key\_flatten\_unit(&pkt\_blocks\[pkt\_bits\_u0\_pop\], // 上边bits\[0\]的已经算过了,从bits\[1\]开始算
&tbl\_blocks\[u0\_count\],
&mf\_masks\[u0\_count\],
&blocks\_scratch\[u0\_count\],
pkt\_bits\_u1,
u1\_count);
}
static inline void
netdev_flow_key_flatten_unit(const uint64_t *pkt_blocks, // key-mf的数据段
const uint64_t *tbl_blocks, // mask->mf里的数据段
const uint64_t *mf_masks, // mask->mf->bits得来mask
uint64_t *blocks_scratch, // 存放到这里
const uint64_t pkt_mf_bits, // key->mf里的bits[01]
const uint32_t count) // mask->mf->bits[0]里1的个数
{
// 说一下意思,这个我们流程就是用key和subtable的mask与运算,肯定只需要与运算mask里
// 不为0的字段,其他的mask不关心,然后这个操作就是为了得到key对应字段是key->mf的第几位,
// 比如mask的bits[0]=11111, key的bits[0] = 10100, mask里的第3个1在key里面是第1个
// 这一位与的结果就是tbl_blocks[2]&pkt_blocks[0], 也就是怎么找到key里的下标0
// 就看key当前位之前有几个1就行了。这里这样做的1010111,
// 蓝色1之前有count_1bits(1010111 & 0001111) = 3
// 对上边的mask举个例子 count = 4;
// mask->mf->bits[0] = 111010 (2进制)
// mf_masks = [1, 111, 1111, 11111] (2进制);
// pkt_mf_bits = 010100
// blocks_scratch = [0,0,0,0,pkt_blocks[1]&tbl_blocks[4],0]
uint32_t i;
for (i = 0; i < count; i++) {
// 拿i=2举例
uint64_t mf_mask = mf_masks[i]; // mf_mask = 001111
uint64_t idx_bits = mf_mask & pkt_mf_bits; // idx_bits = 000100
const uint32_t pkt_idx = count_1bits(idx_bits); // pkt_idx = 1
uint64\_t pkt\_has\_mf\_bit = (mf\_mask + 1) & pkt\_mf\_bits; // pkt\_has\_mf\_bit = 010000
// 是否求掩码:mask当前位对应的key的字段,如果key在当前位是0,下边算掩码就会变成0
uint64\_t no\_bit = ((!pkt\_has\_mf\_bit) > 0) - 1; // 2^64 - 1
// mask里第i个字段与运算key对应的字段
blocks\_scratch\[i\] = pkt\_blocks\[pkt\_idx\] & tbl\_blocks\[i\] & no\_bit; //
}
}
unsigned long
cmap_find_batch(const struct cmap *cmap, unsigned long map,
uint32_t hashes[], const struct cmap_node *nodes[])
{
const struct cmap_impl *impl = cmap_get_impl(cmap);
unsigned long result = map;
int i;
// 每一位就是一个包,一字节8个包
uint32_t h1s[sizeof map * CHAR_BIT];
const struct cmap_bucket *b1s[sizeof map * CHAR_BIT];
const struct cmap_bucket *b2s[sizeof map * CHAR_BIT];
uint32_t c1s[sizeof map * CHAR_BIT];
// 每个impl里桶的数量为impl->mask+1
// 为什么mask是桶的个数减1:因为下标从0开始,找下表的时候直接(hash & impl->mask)就行了
// 至于为什么开两个?因为buckets存放的方法也是一个值对应两个hash
// 第一次hash1 = rehash(impl->basis, hash), 找buckets\[hash1 & impl->mask\], 遍历里面CMAP\_K个元素
// 第二次hash2 = other\_hash(hash1), 找buckets\[hash2 & impl->mask\], 遍历里面CMAP\_K个元素
/\* Compute hashes and prefetch 1st buckets. \*/
ULLONG\_FOR\_EACH\_1(i, map) {
h1s\[i\] = rehash(impl, hashes\[i\]);
b1s\[i\] = &impl->buckets\[h1s\[i\] & impl->mask\];
OVS\_PREFETCH(b1s\[i\]);
}
/\* Lookups, Round 1. Only look up at the first bucket. \*/
ULLONG\_FOR\_EACH\_1(i, map) {
uint32\_t c1;
const struct cmap\_bucket \*b1 = b1s\[i\];
const struct cmap\_node \*node;
do {
c1 = read\_even\_counter(b1);
// 找一下这个cmap\_bucket里面有没有相同hash的
node = cmap\_find\_in\_bucket(b1, hashes\[i\]);
} while (OVS\_UNLIKELY(counter\_changed(b1, c1)));
if (!node) {
/\* Not found (yet); Prefetch the 2nd bucket. \*/
b2s\[i\] = &impl->buckets\[other\_hash(h1s\[i\]) & impl->mask\];
OVS\_PREFETCH(b2s\[i\]);
c1s\[i\] = c1; /\* We may need to check this after Round 2. \*/
continue;
}
/\* Found. \*/
ULLONG\_SET0(map, i); /\* Ignore this on round 2. \*/
OVS\_PREFETCH(node);
nodes\[i\] = node;
}
/\* Round 2. Look into the 2nd bucket, if needed. \*/
ULLONG\_FOR\_EACH\_1(i, map) {
uint32\_t c2;
const struct cmap\_bucket \*b2 = b2s\[i\];
const struct cmap\_node \*node;
do {
c2 = read\_even\_counter(b2);
node = cmap\_find\_in\_bucket(b2, hashes\[i\]);
} while (OVS\_UNLIKELY(counter\_changed(b2, c2)));
if (!node) {
// 可能被修改了,
if (OVS\_UNLIKELY(counter\_changed(b1s\[i\], c1s\[i\]))) {
node = cmap\_find\_\_(b1s\[i\], b2s\[i\], hashes\[i\]);
if (node) {
goto found;
}
}
/\* Not found. \*/
ULLONG\_SET0(result, i); /\* Fix the result. \*/
continue;
}
found:
OVS_PREFETCH(node);
nodes[i] = node;
}
return result;
}
static inline void
fast_path_processing(struct dp_netdev_pmd_thread *pmd,
struct dp_packet_batch *packets_,
struct netdev_flow_key **keys,
struct dp_packet_flow_map *flow_map,
uint8_t *index_map,
odp_port_t in_port)
{
const size_t cnt = dp_packet_batch_size(packets_);
#if !defined(__CHECKER__) && !defined(_WIN32)
const size_t PKT_ARRAY_SIZE = cnt;
#else
/* Sparse or MSVC doesn't like variable length array. */
enum { PKT_ARRAY_SIZE = NETDEV_MAX_BURST };
#endif
struct dp_packet *packet;
struct dpcls *cls;
struct dpcls_rule *rules[PKT_ARRAY_SIZE];
struct dp_netdev *dp = pmd->dp;
int upcall_ok_cnt = 0, upcall_fail_cnt = 0;
int lookup_cnt = 0, add_lookup_cnt;
bool any_miss;
for (size\_t i = 0; i < cnt; i++) {
/\* Key length is needed in all the cases, hash computed on demand. \*/
keys\[i\]->len = netdev\_flow\_key\_size(miniflow\_n\_values(&keys\[i\]->mf));
}
/\* Get the classifier for the in\_port \*/
// 找到端口对应的dpcls结构,每个port有自己的dpcls,因为每个port收到的报文会更相似
cls = dp\_netdev\_pmd\_lookup\_dpcls(pmd, in\_port);
if (OVS\_LIKELY(cls)) {
// 调用dpcls\_lookup进行匹配
any\_miss = !dpcls\_lookup(cls, (const struct netdev\_flow\_key \*\*)keys,
rules, cnt, &lookup\_cnt);
} else {
any\_miss = true;
memset(rules, 0, sizeof(rules));
}
// 如果有miss的,则需要进行openflow流表查询
if (OVS\_UNLIKELY(any\_miss) && !fat\_rwlock\_tryrdlock(&dp->upcall\_rwlock)) {
uint64\_t actions\_stub\[512 / 8\], slow\_stub\[512 / 8\];
struct ofpbuf actions, put\_actions;
ofpbuf\_use\_stub(&actions, actions\_stub, sizeof actions\_stub);
ofpbuf\_use\_stub(&put\_actions, slow\_stub, sizeof slow\_stub);
DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {
struct dp\_netdev\_flow \*netdev\_flow;
if (OVS\_LIKELY(rules\[i\])) {
continue;
}
// 此时可能已经更新了,在进入upcall之前如果再查一次,如果能够查到,会比upcall消耗的少得多
netdev\_flow = dp\_netdev\_pmd\_lookup\_flow(pmd, keys\[i\],
&add\_lookup\_cnt);
if (netdev\_flow) {
lookup\_cnt += add\_lookup\_cnt;
rules\[i\] = &netdev\_flow->cr;
continue;
}
// 第一级和第二级流表查找失败后,就要查找第三级流表了,即openflow流表,这也称为upcall调用。
// 在普通ovs下是通过netlink实现的,在ovs+dpdk下,直接在pmd线程中调用upcall\_cb即可。
// 开始查找openflow流表。如果查找openflow流表成功并需要下发到dpcls时,需要判断是否超出最大流表限制
int error = handle\_packet\_upcall(pmd, packet, keys\[i\],
&actions, &put\_actions);
if (OVS\_UNLIKELY(error)) {
upcall\_fail\_cnt++;
} else {
upcall\_ok\_cnt++;
}
}
ofpbuf\_uninit(&actions);
ofpbuf\_uninit(&put\_actions);
fat\_rwlock\_unlock(&dp->upcall\_rwlock);
} else if (OVS\_UNLIKELY(any\_miss)) {
DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {
if (OVS\_UNLIKELY(!rules\[i\])) {
dp\_packet\_delete(packet);
COVERAGE\_INC(datapath\_drop\_lock\_error);
upcall\_fail\_cnt++;
}
}
}
DP\_PACKET\_BATCH\_FOR\_EACH (i, packet, packets\_) {
struct dp\_netdev\_flow \*flow;
/\* Get the original order of this packet in received batch. \*/
int recv\_idx = index\_map\[i\];
uint16\_t tcp\_flags;
if (OVS\_UNLIKELY(!rules\[i\])) {
continue;
}
flow = dp\_netdev\_flow\_cast(rules\[i\]);
bool hook\_cached = false;
if (pmd->cached\_hook && \\
pmd->cached\_hook\_pmd && \\
pmd->cached\_hook->hook\_flow\_miss) {
hook\_cached = pmd->cached\_hook->hook\_flow\_miss(pmd->cached\_hook\_pmd, packet, flow);
}
if (!hook\_cached) {
bool smc\_enable\_db;
atomic\_read\_relaxed(&pmd->dp->smc\_enable\_db, &smc\_enable\_db);
// 查找到了packet,如果开启了smc,更新smc
if (smc\_enable\_db) {
uint32\_t hash = dp\_netdev\_flow\_hash(&flow->ufid);
smc\_insert(pmd, keys\[i\], hash);
}
// 查到了packet,看是否写会更新上一层cache(EMC)
if (emc\_probabilistic\_insert(pmd, keys\[i\], flow)) {
if (flow->status == OFFLOAD\_NONE) {
queue\_netdev\_flow\_put(pmd->dp->dp\_flow\_offload, \\
pmd->dp->class, \\
flow, NULL, DP\_NETDEV\_FLOW\_OFFLOAD\_OP\_ADD);
}
}
}
/\* Add these packets into the flow map in the same order
\* as received.
\*/
tcp\_flags = miniflow\_get\_tcp\_flags(&keys\[i\]->mf);
packet\_enqueue\_to\_flow\_map(packet, flow, tcp\_flags,
flow\_map, recv\_idx);
}
// 更新各个信息
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MASKED\_HIT,
cnt - upcall\_ok\_cnt - upcall\_fail\_cnt);
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MASKED\_LOOKUP,
lookup\_cnt);
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_MISS,
upcall\_ok\_cnt);
pmd\_perf\_update\_counter(&pmd->perf\_stats, PMD\_STAT\_LOST,
upcall\_fail\_cnt);
}
static inline void
smc_insert(struct dp_netdev_pmd_thread *pmd,
const struct netdev_flow_key *key,
uint32_t hash)
{
struct smc_cache *smc_cache = &(pmd->flow_cache).smc_cache;
struct smc_bucket *bucket = &smc_cache->buckets[key->hash & SMC_MASK];
uint16_t index;
uint32_t cmap_index;
int i;
//布谷鸟算法
cmap\_index = cmap\_find\_index(&pmd->flow\_table, hash);
index = (cmap\_index >= UINT16\_MAX) ? UINT16\_MAX : (uint16\_t)cmap\_index;
/\* If the index is larger than SMC can handle (uint16\_t), we don't
\* insert \*/
if (index == UINT16\_MAX) {
//表明找到了
return;
}
/\* If an entry with same signature already exists, update the index \*/
uint16\_t sig = key->hash >> 16;
for (i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {
if (bucket->sig\[i\] == sig) {
bucket->flow\_idx\[i\] = index;
return;
}
}
/\* If there is an empty entry, occupy it. \*/
for (i = 0; i < SMC\_ENTRY\_PER\_BUCKET; i++) {
if (bucket->flow\_idx\[i\] == UINT16\_MAX) {
bucket->sig\[i\] = sig;
bucket->flow\_idx\[i\] = index;
return;
}
}
/\* Otherwise, pick a random entry. \*/
i = random\_uint32() % SMC\_ENTRY\_PER\_BUCKET;
bucket->sig\[i\] = sig;
bucket->flow\_idx\[i\] = index;
}
这里就不讲具体代码了,讲一下大概:到openflow查找后会更新dpcls,执行dp_netdev_flow_add() --> dpcls_insert() --> dpcls_find_subtable() --> cmap_insert()
dpcls_find_subtable():
找一下是否存在相同mask的subtable,存在返回这个subtable,不存在就创建一个subtable,创建的时候会调用dpcls_create_subtable,里面有个dpcls_subtable_get_best_impl会根据mask的miniflow的bits[0]和bits[1]选择的查找算法。
cmap_insert里hash算法用的就是布谷鸟hash,hash两次,插入的核心代码:
static bool
cmap_try_insert(struct cmap_impl *impl, struct cmap_node *node, uint32_t hash)
{
uint32_t h1 = rehash(impl, hash);
uint32_t h2 = other_hash(h1);
// hash两次找到两个桶
struct cmap_bucket *b1 = &impl->buckets[h1 & impl->mask];
struct cmap_bucket *b2 = &impl->buckets[h2 & impl->mask];
// 插入规则:
// 1.是否有相同hash的node,就插到对应链上
// 2.没有相同hash,就看有没有空的node
// 3.都不行就通过bfs,看能否让b1,b2空出来一个,把这个放进去
// 都不行就插入失败
return (OVS\_UNLIKELY(cmap\_insert\_dup(node, hash, b1) ||
cmap\_insert\_dup(node, hash, b2)) ||
OVS\_LIKELY(cmap\_insert\_bucket(node, hash, b1) ||
cmap\_insert\_bucket(node, hash, b2)) ||
cmap\_insert\_bfs(impl, node, hash, b1, b2));
}
OVS-DPDK Datapath Classifier :这个是理论上的流程,看完就知道这个算法流程了
ovs分类器 flow和miniflow :很重要的结构体miniflow
OVS-DPDK DataPath Classifier反向设计 :这个有很多详细的解释,但不怎么流畅
手机扫一扫
移动阅读更方便
你可能感兴趣的文章