赵占旭的博客

[转]Cisco VPP 多线程分析

注:本文是转载,但不是100%的转载,可能稍微有些出入,原文地址点击这里

基本概念


VPP支持多线程模式,其中区分为主线程和工作线程。主线程可以调用所有类型的node,工作线程只能调用。
VLIB_NODE_TYPE_INTERNALVLIB_NODE_TYPE_INPUT类型的node。
VPP多线程之间同步采用的是自旋锁,如果滥用vlib_worker_thread_barrier_syncvlib_worker_thread_barrier_release会导致性能下降。博主认为如果使用RCU锁会有更好的性能。

核心函数


start_workers

建立多线程框架。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
static clib_error_t *start_workers (vlib_main_t * vm)
{
/*
* VLIB_REGISTER_THREAD会增加vlib_thread_main_t->registrations中数量。
* cpu_config可以配置占用核心位图或者指定核心个数,这将增加
* vlib_thread_registration_t->count计数。
* 这两个值将在vlib_thread_init用于计算vlib_thread_main_t->n_vlib_mains
*/
u32 n_vlib_mains = tm->n_vlib_mains;
//调用该函数的是在主线程中
u8 *main_heap = clib_mem_get_per_cpu_heap ();
mheap_t *main_heap_header = mheap_header (main_heap);

...

//不开启工作线程时,n_vlib_mains为1,否则,每个工作线程都要单独的一份vlib_mains
if (n_vlib_mains > 1)
{
//确保vlib_mains中能保存tm->n_vlib_mains个数据结构,可以产生内存重新分配
vec_validate (vlib_mains, tm->n_vlib_mains - 1);
_vec_len (vlib_mains) = 0;
//主线程的配置信息
vec_add1 (vlib_mains, vm);

//博主发现根本没用到这个队列。无视之。
vec_validate (vlib_frame_queues, tm->n_vlib_mains - 1);
_vec_len (vlib_frame_queues) = 0;
fq = vlib_frame_queue_alloc (FRAME_QUEUE_NELTS);
vec_add1 (vlib_frame_queues, fq);

//如果wait_at_barrier为1,则工作线程主循环会自旋等待。
vlib_worker_threads->wait_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);
/*
* 记录有多少工作线程进入自旋等待状态,只有所有工作线程都自旋等待了,
* vlib_worker_thread_barrier_sync才能返回。
* 该机制滥用的话,性能会下降很快,希望以后能用rcu锁替代自旋锁
*/
vlib_worker_threads->workers_at_barrier =
clib_mem_alloc_aligned (sizeof (u32), CLIB_CACHE_LINE_BYTES);

...

//tm->registrations就是调用VLIB_REGISTER_THREAD个数
for (i = 0; i < vec_len (tm->registrations); i++)
{
...

//配置工作线程个数为0的话,自然不用管了
if (tr->count == 0)
continue;

//工作线程个数要么指定core bitmap,要么指定个数自动分配核心
for (k = 0; k < tr->count; k++)
{
vec_add2 (vlib_worker_threads, w, 1);
//博主没有找到mheap_size赋值的地方,应该都是0,一脸懵逼
if (tr->mheap_size)
w->thread_mheap =
mheap_alloc (0 /* use VM */ , tr->mheap_size);
else
w->thread_mheap = main_heap;

...

//VLIB_REGISTER_THREAD处指定该值。"stats"工作线程这里为1,"workers"工作线程为0
if (tr->no_data_structure_clone)
continue;

...

//工作线程都是从主线程复制配置信息
vm_clone = clib_mem_alloc (sizeof (*vm_clone));
clib_memcpy (vm_clone, vlib_mains[0], sizeof (*vm_clone));

vm_clone->cpu_index = worker_thread_index;
vm_clone->heap_base = w->thread_mheap;
vm_clone->mbuf_alloc_list = 0;
memset (&vm_clone->random_buffer, 0,
sizeof (vm_clone->random_buffer));

nm = &vlib_mains[0]->node_main;
nm_clone = &vm_clone->node_main;
/* fork next frames array, preserving node runtime indices */
nm_clone->next_frames = vec_dup (nm->next_frames);
for (j = 0; j < vec_len (nm_clone->next_frames); j++)
{
vlib_next_frame_t *nf = &nm_clone->next_frames[j];
u32 save_node_runtime_index;
u32 save_flags;

save_node_runtime_index = nf->node_runtime_index;
save_flags = nf->flags & VLIB_FRAME_NO_FREE_AFTER_DISPATCH;
vlib_next_frame_init (nf);
nf->node_runtime_index = save_node_runtime_index;
nf->flags = save_flags;
}

/* fork the frame dispatch queue */
nm_clone->pending_frames = 0;
vec_validate (nm_clone->pending_frames, 10); /* $$$$$?????? */
_vec_len (nm_clone->pending_frames) = 0;

/* fork nodes */
nm_clone->nodes = 0;
for (j = 0; j < vec_len (nm->nodes); j++)
{
vlib_node_t *n;
n = clib_mem_alloc_no_fail (sizeof (*n));
clib_memcpy (n, nm->nodes[j], sizeof (*n));
/* none of the copied nodes have enqueue rights given out */
n->owner_node_index = VLIB_INVALID_NODE_INDEX;
memset (&n->stats_total, 0, sizeof (n->stats_total));
memset (&n->stats_last_clear, 0,
sizeof (n->stats_last_clear));
vec_add1 (nm_clone->nodes, n);
}
nm_clone->nodes_by_type[VLIB_NODE_TYPE_INTERNAL] =
vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INTERNAL]);

nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT] =
vec_dup (nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]);
vec_foreach (rt, nm_clone->nodes_by_type[VLIB_NODE_TYPE_INPUT])
rt->cpu_index = vm_clone->cpu_index;

nm_clone->processes = vec_dup (nm->processes);

/* zap the (per worker) frame freelists, etc */
nm_clone->frame_sizes = 0;
nm_clone->frame_size_hash = 0;

/*
* vpp默认mheap_size为0,那么将使用main heap,那么多线程下会有锁开销。
* 懵逼,应该是不同核心不同heap才对。博主觉得这是个bug
*/
clib_mem_set_heap (oldheap);
vec_add1 (vlib_mains, vm_clone);

vm_clone->error_main.counters =
vec_dup (vlib_mains[0]->error_main.counters);
vm_clone->error_main.counters_last_clear =
vec_dup (vlib_mains[0]->error_main.counters_last_clear);

/* Fork the vlib_buffer_main_t free lists, etc. */
bm_clone = vec_dup (vm_clone->buffer_main);
vm_clone->buffer_main = bm_clone;

orig_freelist_pool = bm_clone->buffer_free_list_pool;
bm_clone->buffer_free_list_pool = 0;

/* *INDENT-OFF* */
pool_foreach (fl_orig, orig_freelist_pool,
({
pool_get_aligned (bm_clone->buffer_free_list_pool,
fl_clone, CLIB_CACHE_LINE_BYTES);
ASSERT (fl_orig - orig_freelist_pool
== fl_clone - bm_clone->buffer_free_list_pool);

fl_clone[0] = fl_orig[0];
fl_clone->aligned_buffers = 0;
fl_clone->unaligned_buffers = 0;
fl_clone->n_alloc = 0;
}));
/* *INDENT-ON* */

worker_thread_index++;
}
}
}
else
{
//"stats"工作线程依旧开启,"workers"工作线程没启用
for (i = 0; i < vec_len (tm->registrations); i++)
{
tr = tm->registrations[i];

for (j = 0; j < tr->count; j++)
{
vec_add2 (vlib_worker_threads, w, 1);
if (tr->mheap_size)
w->thread_mheap =
mheap_alloc (0 /* use VM */ , tr->mheap_size);
else
w->thread_mheap = main_heap;
w->thread_stack = vlib_thread_stacks[w - vlib_worker_threads];
w->thread_function = tr->function;
w->thread_function_arg = w;
w->instance_id = j;
w->elog_track.name =
(char *) format (0, "%s %d", tr->name, j + 1);
w->registration = tr;
vec_add1 (w->elog_track.name, 0);
elog_track_register (&vm->elog_main, &w->elog_track);
}
}
}

worker_thread_index = 1;

for (i = 0; i < vec_len (tm->registrations); i++)
{
int j;

tr = tm->registrations[i];
//支持pthread或者dpdk线程启动。线程入口函数调用vlib_worker_thread_bootstrap_fn
if (tr->use_pthreads || tm->use_pthreads)
{
for (j = 0; j < tr->count; j++)
{
w = vlib_worker_threads + worker_thread_index++;
if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, 0) <
0)
clib_warning ("Couldn't start '%s' pthread ", tr->name);
}
}
else
{
uword c;
/* *INDENT-OFF* */
clib_bitmap_foreach (c, tr->coremask, ({
w = vlib_worker_threads + worker_thread_index++;
if (vlib_launch_thread (vlib_worker_thread_bootstrap_fn, w, c) < 0)
clib_warning ("Couldn't start DPDK lcore %d", c);

}));
/* *INDENT-ON* */
}
}

//同步
vlib_worker_thread_barrier_sync (vm);
vlib_worker_thread_barrier_release (vm);
return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//工作线程业务逻辑,使用单独分配的栈
void vlib_worker_thread_fn (void *arg)
{
vlib_worker_thread_t *w = (vlib_worker_thread_t *) arg;
vlib_thread_main_t *tm = vlib_get_thread_main ();
vlib_main_t *vm = vlib_get_main ();

ASSERT (vm->cpu_index == os_get_cpu_number ());

//如果使用的是pthread,则里面有一个同步点
vlib_worker_thread_init (w);
clib_time_init (&vm->clib_time);
clib_mem_set_heap (w->thread_mheap);

/* Wait until the dpdk init sequence is complete */
//主线程的dpdk_process中释放worker_thread_release
while (tm->worker_thread_release == 0)
vlib_worker_thread_barrier_check ();

//业务主循环
vlib_worker_thread_internal (vm);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
static_always_inline void vlib_worker_thread_internal (vlib_main_t * vm)
{
vlib_node_main_t *nm = &vm->node_main;
u64 cpu_time_now = clib_cpu_time_now ();

while (1)
{
/*
* 每次循环都检查有没有同步请求,如果主线程vlib_worker_thread_barrier_sync调用,
* 则所有工作线程都自旋等待时,主线程的vlib_worker_thread_barrier_sync返回,开始同步逻辑,
* 此时所有工作线程都在自旋等待中,主线程可以操作共有数据。
* vlib_worker_thread_barrier_release调用后,工作线程退出自旋等待,继续主业务逻辑循环
*/
vlib_worker_thread_barrier_check ();

//博主没有找到enqeue操作,那么这个函数等于没用。
vlib_frame_queue_dequeue_internal (vm);

//工作线程只处理VLIB_NODE_TYPE_INPUT和VLIB_NODE_TYPE_INTERNAL两种类型
vlib_node_runtime_t *n;
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
{
//cli会配置网卡与特定核心绑定,因此这里只会收取分配给本核心的网卡的数据包
cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT,
VLIB_NODE_STATE_POLLING, /* frame */ 0,
cpu_time_now);
}

if (_vec_len (nm->pending_frames))
{
int i;
cpu_time_now = clib_cpu_time_now ();
for (i = 0; i < _vec_len (nm->pending_frames); i++)
{
vlib_pending_frame_t *p;

p = nm->pending_frames + i;

cpu_time_now = dispatch_pending_node (vm, p, cpu_time_now);
}
_vec_len (nm->pending_frames) = 0;
}
vlib_increment_main_loop_counter (vm);

/* Record time stamp in case there are no enabled nodes and above
calls do not update time stamp. */
cpu_time_now = clib_cpu_time_now ();
}
}

注意:所有文章非特别说明皆为原创。为保证信息与源同步,转载时请务必注明文章出处!谢谢合作 :-)

原始链接:http://zhaozhanxu.com/2016/11/12/VPP/2016-11-12-VPP-Multithread/

许可协议: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。