注:本文是转载,但不是100%的转载,可能稍微有些出入,原文地址点击这里
VPP使用者几乎都会使用dpdk node作为收包驱动,本文将分析其源码。
基本概念
vlib_buffer_t
dpdk收到的数据包用rte_mbuf结构描述。vpp为了兼容其它收包node(netmap,pcap等)改为使用vlib_buffer_t
来描述数据包。
vlib_buffer_t
紧跟在rte_mbuf
后面,headroom空间中。vlib_buffer_from_rte_mbuf
和rte_mbuf_from_vlib_buffer
完成相互转化。
值得注意的是,该结构使用了三个CLIB_CACHE_LINE_ALIGN_MARK
宏,把结构分成了3段,确保每段都是CLIB_CACHE_LINE_BYTES
对齐。作为一个被频繁使用的结构,这样设计可以更好的更具需要进行内存预取。
vlib_get_next_frame,vlib_put_next_frame
几乎每个node中必定出现的一对好基友。vlib_get_next_frame
获取传递给下一个node的数据包将驻留的内存结构。vlib_put_next_frame
把传递给下一个node的数据包写入特定位置。这样下一个node将正式可以被调度框架调度,并处理传给他的数据包.
EFDearly-fast-discard
没有找到相关资料,估计也是一个性能优化特性,之后再补个人对作者意图的猜测。
核心函数
dpdk_device_input
//use_efd指定是否启用early-fast-discard特性。
static inline u32 dpdk_device_input (dpdk_main_t * dm,
dpdk_device_t * xd,
vlib_node_runtime_t * node,
u32 cpu_index, u16 queue_id, int use_efd)
{
u32 n_buffers;
/*
* 缺省情况下一跳是ethernet node,之后可以最终改变下一条,
* 博主认为这里使用next_index = node->cached_next_index会更合理
*/
u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;
...
//网卡必须up了
if (xd->admin_up == 0)
return 0;
//指定网卡的指定队列上收一批包,返回收到的包个数,
//数据包地址存在xd->rx_vectors[queue_id]中。
n_buffers = dpdk_rx_burst (dm, xd, queue_id);
if (n_buffers == 0)
{
//看来EFD是默认不使用EFD。
if (PREDICT_FALSE (use_efd && dm->efd.enabled))
{
xd->efd_agent.last_poll_time = 0;
xd->efd_agent.last_burst_sz = 0;
}
return 0;
}
/*
* vlib_buffer有多个pool,可以用在自己构建的数据包,
* 这里的fl仅仅用来作为模板初始化ret_mbuf后面的vlib_buffer结构。
*/
fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);
if (PREDICT_FALSE (use_efd && dm->efd.enabled))
{
//一次收包到了VLIB_FRAME_SIZE个,如果这种情况次数很多,
//说明发生了收包端拥塞,可以在后面直接丢弃数据包。
if (PREDICT_FALSE (xd->efd_agent.consec_full_frames_cnt >=
dm->efd.consec_full_frames_hi_thresh))
{
u32 device_queue_sz = rte_eth_rx_queue_count (xd->device_index,
queue_id);
if (device_queue_sz >= dm->efd.queue_hi_thresh)
{
/* dpdk device queue has reached the critical threshold */
xd->efd_agent.congestion_cnt++;
/* apply EFD to packets from the burst */
efd_discard_burst = 1;
}
}
}
mb_index = 0;
while (n_buffers > 0)
{
//to_next指向的内存用来保存数据包地址,n_left_to_next保存数据包地址的内存最大剩余空间
vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
while (n_buffers > 0 && n_left_to_next > 0)
{
if (PREDICT_TRUE (n_buffers > 2))
{
/*
* 把之后需要使用的数据包内存预取一下。
* 为什么预取2个包以后的而不是下一个包?疑惑。
* 内存预取所花费的cpu周期,博主有时间在定量分析下。
*/
struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index + 2];
vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
}
//拥塞发生了,丢包
if (PREDICT_FALSE (efd_discard_burst))
{
vlib_thread_main_t *tm = vlib_get_thread_main ();
if (PREDICT_TRUE (cntr_type = is_efd_discardable (tm, b0, mb)))
{
rte_pktmbuf_free (mb);
xd->efd_agent.discard_cnt++;
increment_efd_drop_counter (vm, cntr_type, 1);
n_buffers--;
mb_index++;
continue;
}
}
if (PREDICT_FALSE (mb->nb_segs > 1))
{
//又是内存预取
struct rte_mbuf *pfmb = mb->next;
vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
b_chain = b0;
}
//取用fl中的模板给b0初始化,这个函数在赋值时,
//如果cpu支持128bit向量,则可以每次赋值拷贝16个字节(u8x16类型支持)。
vlib_buffer_init_for_free_list (b0, fl);
//vlib_buffer转换为偏移量,注意地址都是1 << CLIB_LOG2_CACHE_LINE_BYTES对齐
bi0 = vlib_get_buffer_index (vm, b0);
//数据包写入缓存,下一跳node将会使用
to_next[0] = bi0;
to_next++;
n_left_to_next--;
//决定下一跳node,默认情况下会根据3层协议来决定下一跳。
//如果自己hook了,则条转给指定的node
dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
&next0, &error0);
...
//博主认为这里是个bug,
//应该是b0->current_data += mb->data_off - sizeof(struct vlib_buffer_t)
b0->current_data += mb->data_off - RTE_PKTMBUF_HEADROOM;
b0->current_length = mb->data_len - l3_offset0;
...
while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
{
...
}
vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
to_next, n_left_to_next,
bi0, next0);
n_buffers--;
mb_index++;
}
//调度框架之后就可以调度下一跳node处理上文传入的数据包集合了。
vlib_put_next_frame (vm, node, next_index, n_left_to_next);
}
...
//多线程模型下,可以使用多个dpdk线程收包。
dpdk_worker_t *dw = vec_elt_at_index (dm->workers, cpu_index);
dw->aggregate_rx_packets += mb_index;
return mb_index;
}
//观察这个注册函数,可见它有5个下一跳。
VLIB_REGISTER_NODE (dpdk_input_node) = {
.function = dpdk_input,
.type = VLIB_NODE_TYPE_INPUT,
.name = "dpdk-input",
/* Will be enabled if/when hardware is detected. */
.state = VLIB_NODE_STATE_DISABLED,
.format_buffer = format_ethernet_header_with_length,
.format_trace = format_dpdk_rx_dma_trace,
.n_errors = DPDK_N_ERROR,
.error_strings = dpdk_error_strings,
.n_next_nodes = DPDK_RX_N_NEXT,
.next_nodes = {
[DPDK_RX_NEXT_DROP] = "error-drop",
[DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
[DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
[DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
[DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
},
};