赵占旭的博客

[转]Cisco VPP DPDK Node分析

注:本文是转载,但不是100%的转载,可能稍微有些出入,原文地址点击这里

VPP使用者几乎都会使用dpdk node作为收包驱动,本文将分析其源码。

基本概念


vlib_buffer_t

dpdk收到的数据包用rte_mbuf结构描述。vpp为了兼容其它收包node(netmap,pcap等)改为使用vlib_buffer_t来描述数据包。

avatar

vlib_buffer_t紧跟在rte_mbuf后面,headroom空间中。
vlib_buffer_from_rte_mbufrte_mbuf_from_vlib_buffer完成相互转化。
值得注意的是,该结构使用了三个CLIB_CACHE_LINE_ALIGN_MARK宏,把结构分成了3段,确保每段都是CLIB_CACHE_LINE_BYTES对齐。作为一个被频繁使用的结构,这样设计可以更好的更具需要进行内存预取。

vlib_get_next_frame,vlib_put_next_frame

几乎每个node中必定出现的一对好基友。vlib_get_next_frame获取传递给下一个node的数据包将驻留的内存结构。vlib_put_next_frame把传递给下一个node的数据包写入特定位置。这样下一个node将正式可以被调度框架调度,并处理传给他的数据包.

EFDearly-fast-discard

没有找到相关资料,估计也是一个性能优化特性,之后再补个人对作者意图的猜测。

核心函数


dpdk_device_input

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
//use_efd指定是否启用early-fast-discard特性。
static inline u32 dpdk_device_input (dpdk_main_t * dm,
dpdk_device_t * xd,
vlib_node_runtime_t * node,
u32 cpu_index, u16 queue_id, int use_efd)
{
u32 n_buffers;
/*
* 缺省情况下一跳是ethernet node,之后可以最终改变下一条,
* 博主认为这里使用next_index = node->cached_next_index会更合理
*/
u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;

...

//网卡必须up了
if (xd->admin_up == 0)
return 0;

//指定网卡的指定队列上收一批包,返回收到的包个数,
//数据包地址存在xd->rx_vectors[queue_id]中。
n_buffers = dpdk_rx_burst (dm, xd, queue_id);

if (n_buffers == 0)
{
//看来EFD是默认不使用EFD。
if (PREDICT_FALSE (use_efd && dm->efd.enabled))
{
xd->efd_agent.last_poll_time = 0;
xd->efd_agent.last_burst_sz = 0;
}
return 0;
}

/*
* vlib_buffer有多个pool,可以用在自己构建的数据包,
* 这里的fl仅仅用来作为模板初始化ret_mbuf后面的vlib_buffer结构。
*/
fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);

if (PREDICT_FALSE (use_efd && dm->efd.enabled))
{
//一次收包到了VLIB_FRAME_SIZE个,如果这种情况次数很多,
//说明发生了收包端拥塞,可以在后面直接丢弃数据包。
if (PREDICT_FALSE (xd->efd_agent.consec_full_frames_cnt >=
dm->efd.consec_full_frames_hi_thresh))
{
u32 device_queue_sz = rte_eth_rx_queue_count (xd->device_index,
queue_id);
if (device_queue_sz >= dm->efd.queue_hi_thresh)
{
/* dpdk device queue has reached the critical threshold */
xd->efd_agent.congestion_cnt++;

/* apply EFD to packets from the burst */
efd_discard_burst = 1;
}
}
}

mb_index = 0;

while (n_buffers > 0)
{
//to_next指向的内存用来保存数据包地址,n_left_to_next保存数据包地址的内存最大剩余空间
vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);

while (n_buffers > 0 && n_left_to_next > 0)
{
if (PREDICT_TRUE (n_buffers > 2))
{
/*
* 把之后需要使用的数据包内存预取一下。
* 为什么预取2个包以后的而不是下一个包?疑惑。
* 内存预取所花费的cpu周期,博主有时间在定量分析下。
*/
struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index + 2];
vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
}

//拥塞发生了,丢包
if (PREDICT_FALSE (efd_discard_burst))
{
vlib_thread_main_t *tm = vlib_get_thread_main ();

if (PREDICT_TRUE (cntr_type = is_efd_discardable (tm, b0, mb)))
{
rte_pktmbuf_free (mb);
xd->efd_agent.discard_cnt++;
increment_efd_drop_counter (vm, cntr_type, 1);
n_buffers--;
mb_index++;
continue;
}
}

if (PREDICT_FALSE (mb->nb_segs > 1))
{
//又是内存预取
struct rte_mbuf *pfmb = mb->next;
vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
b_chain = b0;
}

//取用fl中的模板给b0初始化,这个函数在赋值时,
//如果cpu支持128bit向量,则可以每次赋值拷贝16个字节(u8x16类型支持)。
vlib_buffer_init_for_free_list (b0, fl);

//vlib_buffer转换为偏移量,注意地址都是1 << CLIB_LOG2_CACHE_LINE_BYTES对齐
bi0 = vlib_get_buffer_index (vm, b0);

//数据包写入缓存,下一跳node将会使用
to_next[0] = bi0;
to_next++;
n_left_to_next--;

//决定下一跳node,默认情况下会根据3层协议来决定下一跳。
//如果自己hook了,则条转给指定的node
dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
&next0, &error0);

...

//博主认为这里是个bug,
//应该是b0->current_data += mb->data_off - sizeof(struct vlib_buffer_t)
b0->current_data += mb->data_off - RTE_PKTMBUF_HEADROOM;
b0->current_length = mb->data_len - l3_offset0;

...

while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
{
...
}

vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
to_next, n_left_to_next,
bi0, next0);
n_buffers--;
mb_index++;
}
//调度框架之后就可以调度下一跳node处理上文传入的数据包集合了。
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
}

...

//多线程模型下,可以使用多个dpdk线程收包。
dpdk_worker_t *dw = vec_elt_at_index (dm->workers, cpu_index);
dw->aggregate_rx_packets += mb_index;

return mb_index;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//观察这个注册函数,可见它有5个下一跳。
VLIB_REGISTER_NODE (dpdk_input_node) = {
.function = dpdk_input,
.type = VLIB_NODE_TYPE_INPUT,
.name = "dpdk-input",

/* Will be enabled if/when hardware is detected. */
.state = VLIB_NODE_STATE_DISABLED,

.format_buffer = format_ethernet_header_with_length,
.format_trace = format_dpdk_rx_dma_trace,

.n_errors = DPDK_N_ERROR,
.error_strings = dpdk_error_strings,

.n_next_nodes = DPDK_RX_N_NEXT,
.next_nodes = {
[DPDK_RX_NEXT_DROP] = "error-drop",
[DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
[DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
[DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
[DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
},
};

注意:所有文章非特别说明皆为原创。为保证信息与源同步,转载时请务必注明文章出处!谢谢合作 :-)

原始链接:http://zhaozhanxu.com/2016/11/07/VPP/2016-11-07-VPP-DPDK-Node/

许可协议: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。