注:本文是转载,但不是100%的转载,可能稍微有些出入,原文地址点击这里

VPP使用者几乎都会使用dpdk node作为收包驱动,本文将分析其源码。

基本概念


vlib_buffer_t

dpdk收到的数据包用rte_mbuf结构描述。vpp为了兼容其它收包node(netmap,pcap等)改为使用vlib_buffer_t来描述数据包。

请输入图片描述

vlib_buffer_t紧跟在rte_mbuf后面,headroom空间中。
vlib_buffer_from_rte_mbufrte_mbuf_from_vlib_buffer完成相互转化。
值得注意的是,该结构使用了三个CLIB_CACHE_LINE_ALIGN_MARK宏,把结构分成了3段,确保每段都是CLIB_CACHE_LINE_BYTES对齐。作为一个被频繁使用的结构,这样设计可以更好的更具需要进行内存预取。

vlib_get_next_frame,vlib_put_next_frame

几乎每个node中必定出现的一对好基友。vlib_get_next_frame获取传递给下一个node的数据包将驻留的内存结构。vlib_put_next_frame把传递给下一个node的数据包写入特定位置。这样下一个node将正式可以被调度框架调度,并处理传给他的数据包.

EFDearly-fast-discard

没有找到相关资料,估计也是一个性能优化特性,之后再补个人对作者意图的猜测。

核心函数


dpdk_device_input

//use_efd指定是否启用early-fast-discard特性。
static inline u32 dpdk_device_input (dpdk_main_t * dm,
           dpdk_device_t * xd,
           vlib_node_runtime_t * node,
           u32 cpu_index, u16 queue_id, int use_efd)
{
    u32 n_buffers;
    /*
     * 缺省情况下一跳是ethernet node,之后可以最终改变下一条,
     * 博主认为这里使用next_index = node->cached_next_index会更合理
     */
    u32 next_index = DPDK_RX_NEXT_ETHERNET_INPUT;

    ...

    //网卡必须up了
    if (xd->admin_up == 0)
        return 0;

    //指定网卡的指定队列上收一批包,返回收到的包个数,
    //数据包地址存在xd->rx_vectors[queue_id]中。
    n_buffers = dpdk_rx_burst (dm, xd, queue_id);

    if (n_buffers == 0)
    {
        //看来EFD是默认不使用EFD。
        if (PREDICT_FALSE (use_efd && dm->efd.enabled))
        {
            xd->efd_agent.last_poll_time = 0;
            xd->efd_agent.last_burst_sz = 0;
        }
        return 0;
    }

    /*
     * vlib_buffer有多个pool,可以用在自己构建的数据包,
     * 这里的fl仅仅用来作为模板初始化ret_mbuf后面的vlib_buffer结构。
     */
    fl = vlib_buffer_get_free_list (vm, VLIB_BUFFER_DEFAULT_FREE_LIST_INDEX);

    if (PREDICT_FALSE (use_efd && dm->efd.enabled))
    {
        //一次收包到了VLIB_FRAME_SIZE个,如果这种情况次数很多,
        //说明发生了收包端拥塞,可以在后面直接丢弃数据包。
        if (PREDICT_FALSE (xd->efd_agent.consec_full_frames_cnt >=
                dm->efd.consec_full_frames_hi_thresh))
        {
            u32 device_queue_sz = rte_eth_rx_queue_count (xd->device_index,
                    queue_id);
            if (device_queue_sz >= dm->efd.queue_hi_thresh)
            {
                /* dpdk device queue has reached the critical threshold */
                xd->efd_agent.congestion_cnt++;

                /* apply EFD to packets from the burst */
                efd_discard_burst = 1;
            }
        }
    }

    mb_index = 0;

    while (n_buffers > 0)
    {
        //to_next指向的内存用来保存数据包地址,n_left_to_next保存数据包地址的内存最大剩余空间
        vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);

        while (n_buffers > 0 && n_left_to_next > 0)
        {
            if (PREDICT_TRUE (n_buffers > 2))
            {
                /*
                 * 把之后需要使用的数据包内存预取一下。
                 * 为什么预取2个包以后的而不是下一个包?疑惑。
                 * 内存预取所花费的cpu周期,博主有时间在定量分析下。
                 */
                struct rte_mbuf *pfmb = xd->rx_vectors[queue_id][mb_index + 2];
                vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
                CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, STORE);
                CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
            }

            //拥塞发生了,丢包
            if (PREDICT_FALSE (efd_discard_burst))
            {
                vlib_thread_main_t *tm = vlib_get_thread_main ();

                if (PREDICT_TRUE (cntr_type = is_efd_discardable (tm, b0, mb)))
                {
                    rte_pktmbuf_free (mb);
                    xd->efd_agent.discard_cnt++;
                    increment_efd_drop_counter (vm, cntr_type, 1);
                    n_buffers--;
                    mb_index++;
                    continue;
                }
            }

            if (PREDICT_FALSE (mb->nb_segs > 1))
            {
                //又是内存预取
                struct rte_mbuf *pfmb = mb->next;
                vlib_buffer_t *bp = vlib_buffer_from_rte_mbuf (pfmb);
                CLIB_PREFETCH (pfmb, CLIB_CACHE_LINE_BYTES, LOAD);
                CLIB_PREFETCH (bp, CLIB_CACHE_LINE_BYTES, STORE);
                b_chain = b0;
            }

            //取用fl中的模板给b0初始化,这个函数在赋值时,
            //如果cpu支持128bit向量,则可以每次赋值拷贝16个字节(u8x16类型支持)。
            vlib_buffer_init_for_free_list (b0, fl);

            //vlib_buffer转换为偏移量,注意地址都是1 << CLIB_LOG2_CACHE_LINE_BYTES对齐
            bi0 = vlib_get_buffer_index (vm, b0);

            //数据包写入缓存,下一跳node将会使用
            to_next[0] = bi0;
            to_next++;
            n_left_to_next--;

            //决定下一跳node,默认情况下会根据3层协议来决定下一跳。
            //如果自己hook了,则条转给指定的node
            dpdk_rx_next_and_error_from_mb_flags_x1 (xd, mb, b0,
                    &next0, &error0);

            ...

            //博主认为这里是个bug,
            //应该是b0->current_data += mb->data_off - sizeof(struct vlib_buffer_t)
            b0->current_data += mb->data_off - RTE_PKTMBUF_HEADROOM;
            b0->current_length = mb->data_len - l3_offset0;

            ...

            while ((mb->nb_segs > 1) && (nb_seg < mb->nb_segs))
            {
                ...
            }

            vlib_validate_buffer_enqueue_x1 (vm, node, next_index,
                       to_next, n_left_to_next,
                       bi0, next0);
            n_buffers--;
            mb_index++;
        }
        //调度框架之后就可以调度下一跳node处理上文传入的数据包集合了。
        vlib_put_next_frame (vm, node, next_index, n_left_to_next);
    }

    ...

    //多线程模型下,可以使用多个dpdk线程收包。
    dpdk_worker_t *dw = vec_elt_at_index (dm->workers, cpu_index);
    dw->aggregate_rx_packets += mb_index;

    return mb_index;
}
//观察这个注册函数,可见它有5个下一跳。
VLIB_REGISTER_NODE (dpdk_input_node) = {
    .function = dpdk_input,
    .type = VLIB_NODE_TYPE_INPUT,
    .name = "dpdk-input",

    /* Will be enabled if/when hardware is detected. */
    .state = VLIB_NODE_STATE_DISABLED,

    .format_buffer = format_ethernet_header_with_length,
    .format_trace = format_dpdk_rx_dma_trace,

    .n_errors = DPDK_N_ERROR,
    .error_strings = dpdk_error_strings,

    .n_next_nodes = DPDK_RX_N_NEXT,
    .next_nodes = {
        [DPDK_RX_NEXT_DROP] = "error-drop",
        [DPDK_RX_NEXT_ETHERNET_INPUT] = "ethernet-input",
        [DPDK_RX_NEXT_IP4_INPUT] = "ip4-input-no-checksum",
        [DPDK_RX_NEXT_IP6_INPUT] = "ip6-input",
        [DPDK_RX_NEXT_MPLS_INPUT] = "mpls-gre-input",
    },
};
最后修改:2021 年 08 月 20 日
如果觉得我的文章对你有用,请随意赞赏