注:本文是转载,但不是100%的转载,可能稍微有些出入,原文地址点击这里

基本概念


VPP支持多线程模式,其中区分为主线程和工作线程。主线程可以调用所有类型的node,工作线程只能调用。
VLIB_NODE_TYPE_INTERNALVLIB_NODE_TYPE_INPUT类型的node。
VPP多线程之间同步采用的是自旋锁,如果滥用vlib_worker_thread_barrier_syncvlib_worker_thread_barrier_release会导致性能下降。博主认为如果使用RCU锁会有更好的性能。

核心函数


start_workers

建立多线程框架。

static clib_error_t *start_workers (vlib_main_t * vm)
{
    /*
     * VLIB_REGISTER_THREAD会增加vlib_thread_main_t->registrations中数量。
     * cpu_config可以配置占用核心位图或者指定核心个数,这将增加
     * vlib_thread_registration_t->count计数。
     * 这两个值将在vlib_thread_init用于计算vlib_thread_main_t->n_vlib_mains
     */
    u32 n_vlib_mains = tm->n_vlib_mains;
    //调用该函数的是在主线程中
    u8 *main_heap = clib_mem_get_per_cpu_heap ();
    mheap_t *main_heap_header = mheap_header (main_heap);

    ...

    //不开启工作线程时,n_vlib_mains为1,否则,每个工作线程都要单独的一份vlib_mains
    if (n_vlib_mains > 1)
    {
        //确保vlib_mains中能保存tm->n_vlib_mains个数据结构,可以产生内存重新分配
        vec_validate (vlib_mains, tm->n_vlib_mains - 1);
        _vec_len (vlib_mains) = 0;
        //主线程的配置信息
        vec_add1 (vlib_mains, vm);

        //博主发现根本没用到这个队列。无视之。
        vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
        _vec_len (vlib_frame_queues) = 0;
        fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
        vec_add1 (vlib_frame_queues, fq);

        //如果wait_at_barrier为1,则工作线程主循环会自旋等待。
        vlib_worker_threads->wait_at_barrier =
            clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
        /*
         * 记录有多少工作线程进入自旋等待状态,只有所有工作线程都自旋等待了,
         * vlib_worker_thread_barrier_sync才能返回。
         * 该机制滥用的话,性能会下降很快,希望以后能用rcu锁替代自旋锁
         */
        vlib_worker_threads->workers_at_barrier =
            clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);

        ...

        //tm->registrations就是调用VLIB_REGISTER_THREAD个数
        for (i = 0; i < vec_len (tm->registrations); i++)
        {
            ...

            //配置工作线程个数为0的话,自然不用管了
            if (tr->count == 0)
              continue;

            //工作线程个数要么指定core bitmap,要么指定个数自动分配核心
            for (k = 0; k < tr->count; k++)
            {
                vec_add2 (vlib_worker_threads, w, 1);
                //博主没有找到mheap_size赋值的地方,应该都是0,一脸懵逼
                if (tr->mheap_size)
                  w->thread_mheap =
                      mheap_alloc (0 /* use VM */ , tr->mheap_size);
                else
                  w->thread_mheap = main_heap;

                ...

                //VLIB_REGISTER_THREAD处指定该值。"stats"工作线程这里为1,"workers"工作线程为0
                if (tr->no_data_structure_clone)
                  continue;

                ...

                //工作线程都是从主线程复制配置信息
                vm_clone = clib_mem_alloc (sizeof (*vm_clone));
                clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));

                vm_clone->cpu_index = worker_thread_index;
                vm_clone->heap_base = w->thread_mheap;
                vm_clone->mbuf_alloc_list = 0;
                memset (&vm_clone->random_buffer, 0,
                            sizeof (vm_clone->random_buffer));

                nm = &vlib_mains[0]->node_main;
                nm_clone = &vm_clone->node_main;
                /* fork next frames array, preserving node runtime indices */
                nm_clone->next_frames = vec_dup (nm->next_frames);
                for (j = 0; j < vec_len (nm_clone->next_frames); j++)
                {
                    vlib_next_frame_t *nf = &nm_clone->next_frames[j];
                    u32 save_node_runtime_index;
                    u32 save_flags;

                    save_node_runtime_index = nf->node_runtime_index;
                    save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
                    vlib_next_frame_init (nf);
                    nf->node_runtime_index = save_node_runtime_index;
                    nf->flags = save_flags;
                }

                /* fork the frame dispatch queue */
                nm_clone->pending_frames = 0;
                vec_validate (nm_clone->pending_frames, 10);  /* $$$$$?????? */
                _vec_len (nm_clone->pending_frames) = 0;

                /* fork nodes */
                nm_clone->nodes = 0;
                for (j = 0; j < vec_len (nm->nodes); j++)
                {
                    vlib_node_t *n;
                    n = clib_mem_alloc_no_fail (sizeof (*n));
                    clib_memcpy (n, nm->nodes[j], sizeof (*n));
                    /* none of the copied nodes have enqueue rights given out */
                    n->owner_node_index = VLIB_INVALID_NODE_INDEX;
                    memset (&n->stats_total, 0, sizeof (n->stats_total));
                    memset (&n->stats_last_clear, 0,
                                sizeof (n->stats_last_clear));
                    vec_add1 (nm_clone->nodes, n);
                }
                nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
                    vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);

                nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
                    vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
                vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
                    rt->cpu_index = vm_clone->cpu_index;

                nm_clone->processes = vec_dup (nm->processes);

                /* zap the (per worker) frame freelists, etc */
                nm_clone->frame_sizes = 0;
                nm_clone->frame_size_hash = 0;

                /*
                 * vpp默认mheap_size为0,那么将使用main heap,那么多线程下会有锁开销。
                 * 懵逼,应该是不同核心不同heap才对。博主觉得这是个bug
                 */
                clib_mem_set_heap (oldheap);
                vec_add1 (vlib_mains, vm_clone);

                vm_clone->error_main.counters =
                    vec_dup (vlib_mains[0]->error_main.counters);
                vm_clone->error_main.counters_last_clear =
                    vec_dup (vlib_mains[0]->error_main.counters_last_clear);

                /* Fork the vlib_buffer_main_t free lists, etc. */
                bm_clone = vec_dup (vm_clone->buffer_main);
                vm_clone->buffer_main = bm_clone;

                orig_freelist_pool = bm_clone->buffer_free_list_pool;
                bm_clone->buffer_free_list_pool = 0;

                /* *INDENT-OFF* */
                pool_foreach (fl_orig, orig_freelist_pool,
                            ({
                             pool_get_aligned (bm_clone->buffer_free_list_pool,
                                         fl_clone, CLIB_CACHE_LINE_BYTES);
                             ASSERT (fl_orig - orig_freelist_pool
                                         == fl_clone - bm_clone->buffer_free_list_pool);

                             fl_clone[0] = fl_orig[0];
                             fl_clone->aligned_buffers = 0;
                             fl_clone->unaligned_buffers = 0;
                             fl_clone->n_alloc = 0;
                             }));
                /* *INDENT-ON* */

                worker_thread_index++;
            }
        }
    }
    else
    {
        //"stats"工作线程依旧开启,"workers"工作线程没启用
        for (i = 0; i < vec_len (tm->registrations); i++)
        {
            tr = tm->registrations[i];

            for (j = 0; j < tr->count; j++)
            {
                vec_add2 (vlib_worker_threads, w, 1);
                if (tr->mheap_size)
                  w->thread_mheap =
                      mheap_alloc (0 /* use VM */ , tr->mheap_size);
                else
                  w->thread_mheap = main_heap;
                w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
                w->thread_function = tr->function;
                w->thread_function_arg = w;
                w->instance_id = j;
                w->elog_track.name =
                    (char *) format (0, "%s %d", tr->name, j + 1);
                w->registration = tr;
                vec_add1 (w->elog_track.name, 0);
                elog_track_register (&vm->elog_main, &w->elog_track);
            }
        }
    }

    worker_thread_index = 1;

    for (i = 0; i < vec_len (tm->registrations); i++)
    {
        int j;

        tr = tm->registrations[i];
        //支持pthread或者dpdk线程启动。线程入口函数调用vlib_worker_thread_bootstrap_fn
        if (tr->use_pthreads || tm->use_pthreads)
        {
            for (j = 0; j < tr->count; j++)
            {
                w = vlib_worker_threads + worker_thread_index++;
                if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
                            0)
                  clib_warning ("Couldn't start '%s' pthread ", tr->name);
            }
        }
        else
        {
            uword c;
            /* *INDENT-OFF* */
            clib_bitmap_foreach (c, tr->coremask, ({
                            w = vlib_worker_threads + worker_thread_index++;
                            if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
                            clib_warning ("Couldn't start DPDK lcore %d", c);

                            }));
            /* *INDENT-ON* */
        }
    }

    //同步
    vlib_worker_thread_barrier_sync (vm);
    vlib_worker_thread_barrier_release (vm);
    return 0;
}
//工作线程业务逻辑,使用单独分配的栈
void vlib_worker_thread_fn (void *arg)
{
    vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
    vlib_thread_main_t *tm = vlib_get_thread_main ();
    vlib_main_t *vm = vlib_get_main ();

    ASSERT (vm->cpu_index == os_get_cpu_number ());

    //如果使用的是pthread,则里面有一个同步点
    vlib_worker_thread_init (w);
    clib_time_init (&vm->clib_time);
    clib_mem_set_heap (w->thread_mheap);

    /* Wait until the dpdk init sequence is complete */
    //主线程的dpdk_process中释放worker_thread_release
    while (tm->worker_thread_release == 0)
      vlib_worker_thread_barrier_check ();

    //业务主循环
    vlib_worker_thread_internal (vm);
}
static_always_inline void vlib_worker_thread_internal (vlib_main_t * vm)
{
    vlib_node_main_t *nm = &vm->node_main;
    u64 cpu_time_now = clib_cpu_time_now ();

    while (1)
    {
        /*
        * 每次循环都检查有没有同步请求,如果主线程vlib_worker_thread_barrier_sync调用,
        * 则所有工作线程都自旋等待时,主线程的vlib_worker_thread_barrier_sync返回,开始同步逻辑,
        * 此时所有工作线程都在自旋等待中,主线程可以操作共有数据。
        * vlib_worker_thread_barrier_release调用后,工作线程退出自旋等待,继续主业务逻辑循环
        */
        vlib_worker_thread_barrier_check ();

        //博主没有找到enqeue操作,那么这个函数等于没用。
        vlib_frame_queue_dequeue_internal (vm);

        //工作线程只处理VLIB_NODE_TYPE_INPUT和VLIB_NODE_TYPE_INTERNAL两种类型
        vlib_node_runtime_t *n;
        vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
        {
            //cli会配置网卡与特定核心绑定,因此这里只会收取分配给本核心的网卡的数据包
            cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
                        VLIB_NODE_STATE_POLLING, /* frame */ 0,
                        cpu_time_now);
        }

        if (_vec_len (nm->pending_frames))
        {
            int i;
            cpu_time_now = clib_cpu_time_now ();
            for (i = 0; i < _vec_len (nm->pending_frames); i++)
            {
                vlib_pending_frame_t *p;

                p = nm->pending_frames + i;

                cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
            }
            _vec_len (nm->pending_frames) = 0;
        }
        vlib_increment_main_loop_counter (vm);

        /* Record time stamp in case there are no enabled nodes and above
           calls do not update time stamp. */
        cpu_time_now = clib_cpu_time_now ();
    }
}
最后修改:2021 年 08 月 18 日
如果觉得我的文章对你有用,请随意赞赏