想要看一看DPDK的抓包是怎么做的,我有没有改进的空间。用法这里不讲,可以查看官方文档。
通过了解,DPDK的抓包是采用的C/S架构,DPDK有一个librte_pdump的库是server端,用来接收client端的指令去配置抓包,类似于linux kenrel做的工作,而client端就是我们运行的抓包命令,类似于tcpdump命令。
librte_pdump
由基于DPDK的主程序去调用rte_pdump_init
创建server端。
int
rte_pdump_init(const char *path)
{
...
/* 设置unix socket domain的server端路径 */
rte_pdump_set_socket_dir(path, RTE_PDUMP_SOCKET_SERVER);
/* 创建unix socket domain的socket */
pdump_create_server_socket();
/* 创建线程持续接收client端的信息,以便于在抓包时挂载抓包函数 */
ret = pthread_create(&pdump_thread, NULL, pdump_thread_main, NULL);
/* 配置线程名字为pdump-thread,便于调试 */
snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "pdump-thread");
ret = rte_thread_setname(pdump_thread, thread_name);
return 0;
}
这里面我们比较重视的是pdump_thread_main
,所以看一下该函数。
static __attribute__((noreturn)) void *
pdump_thread_main(__rte_unused void *arg)
{
/* 无限循环,没有退出机制 */
for (;;) {
/* 从客户端接收信息 */
recvfrom(pdump_socket_fd, &cli_req,
sizeof(struct pdump_request), 0,
(struct sockaddr *)&cli_addr, &cli_len);
/* 根据接收的信息挂载抓包函数 */
set_pdump_rxtx_cbs(&cli_req);
/* 给客户端返回一些信息 */
resp.ver = cli_req.ver;
resp.res_op = cli_req.op;
resp.err_value = ret;
sendto(pdump_socket_fd, &resp,
sizeof(struct pdump_response),
0, (struct sockaddr *)&cli_addr, cli_len);
}
}
接下来我们关注的是挂载抓包函数,所以看一下函数et_pdump_rxtx_cbs
。
static int
set_pdump_rxtx_cbs(struct pdump_request *p)
{
/* 通过设备名称找到端口信息 */
if (operation == ENABLE) {
rte_eth_dev_get_port_by_name(p->data.en_v1.device,
&port);
queue = p->data.en_v1.queue;
ring = p->data.en_v1.ring;
mp = p->data.en_v1.mp;
} else {
rte_eth_dev_get_port_by_name(p->data.dis_v1.device,
&port);
queue = p->data.dis_v1.queue;
ring = p->data.dis_v1.ring;
mp = p->data.dis_v1.mp;
}
/* 判断是抓所有队列的报文还是只抓一些队列的 */
if (queue == RTE_PDUMP_ALL_QUEUES) {
rte_eth_dev_info_get(port, &dev_info);
nb_rx_q = dev_info.nb_rx_queues;
nb_tx_q = dev_info.nb_tx_queues;
}
/* register RX callback */
if (flags & RTE_PDUMP_FLAG_RX) {
end_q = (queue == RTE_PDUMP_ALL_QUEUES) ? nb_rx_q : queue + 1;
pdump_regitser_rx_callbacks(end_q, port, queue, ring, mp,
operation);
}
/* register TX callback */
if (flags & RTE_PDUMP_FLAG_TX) {
end_q = (queue == RTE_PDUMP_ALL_QUEUES) ? nb_tx_q : queue + 1;
pdump_regitser_tx_callbacks(end_q, port, queue, ring, mp,
operation);
}
}
我们主要操作就是挂载函数,分别是RX和TX处挂载回调函数,这块我们只关注一个就行,因为两者操作基本一致。
static int
pdump_regitser_rx_callbacks(uint16_t end_q, uint8_t port, uint16_t queue,
struct rte_ring *ring, struct rte_mempool *mp,
uint16_t operation)
{
uint16_t qid = (queue == RTE_PDUMP_ALL_QUEUES) ? 0 : queue;
for (; qid < end_q; qid++) {
struct pdump_rxtx_cbs *cbs = &rx_cbs[port][qid];
if (cbs && operation == ENABLE) {
cbs->ring = ring;
cbs->mp = mp;
/* 此处挂载pdump_rx函数,而tx处挂载pdump_tx函数 */
cbs->cb = rte_eth_add_first_rx_callback(port, qid,
pdump_rx, cbs);
}
if (cbs && operation == DISABLE) {
int ret = rte_eth_remove_rx_callback(port, qid, cbs->cb);
cbs->cb = NULL;
}
}
return 0;
}
接收和发送分别挂载了函数pdump_rx
和pdump_tx
,而他们两个都是调用了函数pdump_copy
。
static inline void
pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
{
uint16_t d_pkts = 0;
struct rte_mbuf *dup_bufs[nb_pkts];
struct pdump_rxtx_cbs *cbs = user_params;
struct rte_ring *ring = cbs->ring;
struct rte_mempool *mp = cbs->mp;
/* 拷贝一份报文 */
for (unsigned i = 0; i < nb_pkts; i++) {
struct rte_mbuf *p = pdump_pktmbuf_copy(pkts[i], mp);
if (p) {
dup_bufs[d_pkts++] = p;
}
}
/* 通过ring发送 */
int ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
if (unlikely(ring_enq < d_pkts)) {
RTE_LOG(DEBUG, PDUMP,
"only %d of packets enqueued to ring\n", ring_enq);
do {
rte_pktmbuf_free(dup_bufs[ring_enq]);
} while (++ring_enq < d_pkts);
}
}
至此,报文已经传递出去了,就需要看看client端怎么接收。
pdump
app/pdump/main.c
main-->enable_pdump
主要和server进行通信,然后配置抓包,完成后才开始抓包操作。
static void
enable_pdump(void)
{
/* 这个路径通常需要参数配置,因为是server指定的 */
if (server_socket_path[0] != 0) {
rte_pdump_set_socket_dir(server_socket_path,
RTE_PDUMP_SOCKET_SERVER);
}
/* 这块可以不指定,可以采用默认值,然后通信时告知server */
if (client_socket_path[0] != 0) {
rte_pdump_set_socket_dir(client_socket_path,
RTE_PDUMP_SOCKET_CLIENT);
}
/* 启动pdump,因为参数不同,调用的函数和参数不同
* 但是实质调用的函数都一致,就是pdump_create_client_socket
*/
for (int i = 0; i < num_tuples; i++) {
struct pdump_tuples *pt = &pdump_t[i];
if (pt->dir == RTE_PDUMP_FLAG_RXTX) {
if (pt->dump_by_type == DEVICE_ID) {
rte_pdump_enable_by_deviceid(
pt->device_id,
pt->queue,
RTE_PDUMP_FLAG_RX,
pt->rx_ring,
pt->mp, NULL);
rte_pdump_enable_by_deviceid(
pt->device_id,
pt->queue,
RTE_PDUMP_FLAG_TX,
pt->tx_ring,
pt->mp, NULL);
} else if (pt->dump_by_type == PORT_ID) {
rte_pdump_enable(pt->port, pt->queue,
RTE_PDUMP_FLAG_RX,
pt->rx_ring, pt->mp, NULL);
rte_pdump_enable(pt->port, pt->queue,
RTE_PDUMP_FLAG_TX,
pt->tx_ring, pt->mp, NULL);
}
} else if (pt->dir == RTE_PDUMP_FLAG_RX) {
...
} else if (pt->dir == RTE_PDUMP_FLAG_TX) {
...
}
}
}
static int
pdump_create_client_socket(struct pdump_request *p)
{
int n;
struct pdump_response server_resp;
struct sockaddr_un addr, serv_addr, from;
socklen_t addr_len, serv_len;
int pid = getpid();
int socket_fd = socket(AF_UNIX, SOCK_DGRAM, 0);
pdump_get_socket_path(addr.sun_path, sizeof(addr.sun_path),
RTE_PDUMP_SOCKET_CLIENT);
addr.sun_family = AF_UNIX;
addr_len = sizeof(struct sockaddr_un);
bind(socket_fd, (struct sockaddr *) &addr, addr_len);
serv_len = sizeof(struct sockaddr_un);
memset(&serv_addr, 0, sizeof(serv_addr));
pdump_get_socket_path(serv_addr.sun_path,
sizeof(serv_addr.sun_path),
RTE_PDUMP_SOCKET_SERVER);
serv_addr.sun_family = AF_UNIX;
/*
* 以上是创建socket,然后将配置内容发送给server
* server配置好会给回复,然后就会进入到抓包的流程
* 如果没有收到回复是不能抓包的
*/
sendto(socket_fd, p, sizeof(struct pdump_request), 0,
(struct sockaddr *)&serv_addr, serv_len);
recvfrom(socket_fd, &server_resp,
sizeof(struct pdump_response), 0,
(struct sockaddr *)&from, &serv_len);
close(socket_fd);
unlink(addr.sun_path);
}
main-->dump_packets-->pdump_rxtx
就开启了抓包的操作。
static inline void
pdump_rxtx(struct rte_ring *ring, uint8_t vdev_id, struct pdump_stats *stats)
{
/* write input packets of port to vdev for pdump */
struct rte_mbuf *rxtx_bufs[BURST_SIZE];
/* 拿到报文 */
const uint16_t nb_in_deq = rte_ring_dequeue_burst(ring,
(void *)rxtx_bufs, BURST_SIZE);
stats->dequeue_pkts += nb_in_deq;
if (nb_in_deq) {
/* 发送,可以是设备,可以是pcap文件 */
uint16_t nb_in_txd = rte_eth_tx_burst(
vdev_id,
0, rxtx_bufs, nb_in_deq);
stats->tx_pkts += nb_in_txd;
if (unlikely(nb_in_txd < nb_in_deq)) {
do {
rte_pktmbuf_free(rxtx_bufs[nb_in_txd]);
stats->freed_pkts++;
} while (++nb_in_txd < nb_in_deq);
}
}
}
TODO
希望在server端能加上类似Linux kernel的bfp报文过滤。client可以支持各种filter语法。