想要看一看DPDK的抓包是怎么做的,我有没有改进的空间。用法这里不讲,可以查看官方文档

通过了解,DPDK的抓包是采用的C/S架构,DPDK有一个librte_pdump的库是server端,用来接收client端的指令去配置抓包,类似于linux kenrel做的工作,而client端就是我们运行的抓包命令,类似于tcpdump命令。

librte_pdump


由基于DPDK的主程序去调用rte_pdump_init创建server端。

int
rte_pdump_init(const char *path)
{
    ...

    /* 设置unix socket domain的server端路径 */
    rte_pdump_set_socket_dir(path, RTE_PDUMP_SOCKET_SERVER);

    /* 创建unix socket domain的socket */
    pdump_create_server_socket();

    /* 创建线程持续接收client端的信息,以便于在抓包时挂载抓包函数 */
    ret = pthread_create(&pdump_thread, NULL, pdump_thread_main, NULL);

    /* 配置线程名字为pdump-thread,便于调试 */
    snprintf(thread_name, RTE_MAX_THREAD_NAME_LEN, "pdump-thread");
    ret = rte_thread_setname(pdump_thread, thread_name);

    return 0;
}

这里面我们比较重视的是pdump_thread_main,所以看一下该函数。

static __attribute__((noreturn)) void *
pdump_thread_main(__rte_unused void *arg)
{
    /* 无限循环,没有退出机制 */
    for (;;) {
        /* 从客户端接收信息 */
        recvfrom(pdump_socket_fd, &cli_req,
                 sizeof(struct pdump_request), 0,
                 (struct sockaddr *)&cli_addr, &cli_len);

        /* 根据接收的信息挂载抓包函数 */
        set_pdump_rxtx_cbs(&cli_req);

        /* 给客户端返回一些信息 */
        resp.ver = cli_req.ver;
        resp.res_op = cli_req.op;
        resp.err_value = ret;
        sendto(pdump_socket_fd, &resp,
               sizeof(struct pdump_response),
               0, (struct sockaddr *)&cli_addr, cli_len);
    }
}

接下来我们关注的是挂载抓包函数,所以看一下函数et_pdump_rxtx_cbs

static int
set_pdump_rxtx_cbs(struct pdump_request *p)
{
    /* 通过设备名称找到端口信息 */
    if (operation == ENABLE) {
        rte_eth_dev_get_port_by_name(p->data.en_v1.device,
                                     &port);
        queue = p->data.en_v1.queue;
        ring = p->data.en_v1.ring;
        mp = p->data.en_v1.mp;
    } else {
        rte_eth_dev_get_port_by_name(p->data.dis_v1.device,
                                     &port);
        queue = p->data.dis_v1.queue;
        ring = p->data.dis_v1.ring;
        mp = p->data.dis_v1.mp;
    }

    /* 判断是抓所有队列的报文还是只抓一些队列的 */
    if (queue == RTE_PDUMP_ALL_QUEUES) {
        rte_eth_dev_info_get(port, &dev_info);
        nb_rx_q = dev_info.nb_rx_queues;
        nb_tx_q = dev_info.nb_tx_queues;
    }

    /* register RX callback */
    if (flags & RTE_PDUMP_FLAG_RX) {
        end_q = (queue == RTE_PDUMP_ALL_QUEUES) ? nb_rx_q : queue + 1;
        pdump_regitser_rx_callbacks(end_q, port, queue, ring, mp,
                                    operation);
    }

    /* register TX callback */
    if (flags & RTE_PDUMP_FLAG_TX) {
        end_q = (queue == RTE_PDUMP_ALL_QUEUES) ? nb_tx_q : queue + 1;
        pdump_regitser_tx_callbacks(end_q, port, queue, ring, mp,
                                    operation);
    }
}

我们主要操作就是挂载函数,分别是RX和TX处挂载回调函数,这块我们只关注一个就行,因为两者操作基本一致。

static int
pdump_regitser_rx_callbacks(uint16_t end_q, uint8_t port, uint16_t queue,
                            struct rte_ring *ring, struct rte_mempool *mp,
                            uint16_t operation)
{
    uint16_t qid = (queue == RTE_PDUMP_ALL_QUEUES) ? 0 : queue;
    for (; qid < end_q; qid++) {
        struct pdump_rxtx_cbs *cbs = &rx_cbs[port][qid];
        if (cbs && operation == ENABLE) {
            cbs->ring = ring;
            cbs->mp = mp;
            /* 此处挂载pdump_rx函数,而tx处挂载pdump_tx函数 */
            cbs->cb = rte_eth_add_first_rx_callback(port, qid,
                                                    pdump_rx, cbs);
        }
        if (cbs && operation == DISABLE) {
            int ret = rte_eth_remove_rx_callback(port, qid, cbs->cb);
            cbs->cb = NULL;
        }
    }

    return 0;
}

接收和发送分别挂载了函数pdump_rxpdump_tx,而他们两个都是调用了函数pdump_copy

static inline void
pdump_copy(struct rte_mbuf **pkts, uint16_t nb_pkts, void *user_params)
{
    uint16_t d_pkts = 0;
    struct rte_mbuf *dup_bufs[nb_pkts];

    struct pdump_rxtx_cbs *cbs  = user_params;
    struct rte_ring *ring = cbs->ring;
    struct rte_mempool *mp = cbs->mp;
    /* 拷贝一份报文 */
    for (unsigned i = 0; i < nb_pkts; i++) {
        struct rte_mbuf *p = pdump_pktmbuf_copy(pkts[i], mp);
        if (p) {
            dup_bufs[d_pkts++] = p;
        }
    }

    /* 通过ring发送 */
    int ring_enq = rte_ring_enqueue_burst(ring, (void *)dup_bufs, d_pkts);
    if (unlikely(ring_enq < d_pkts)) {
        RTE_LOG(DEBUG, PDUMP,
            "only %d of packets enqueued to ring\n", ring_enq);
        do {
            rte_pktmbuf_free(dup_bufs[ring_enq]);
        } while (++ring_enq < d_pkts);
    }
}

至此,报文已经传递出去了,就需要看看client端怎么接收。

pdump


app/pdump/main.c

main-->enable_pdump主要和server进行通信,然后配置抓包,完成后才开始抓包操作。

static void
enable_pdump(void)
{
    /* 这个路径通常需要参数配置,因为是server指定的 */
    if (server_socket_path[0] != 0) {
        rte_pdump_set_socket_dir(server_socket_path,
                                 RTE_PDUMP_SOCKET_SERVER);
    }
    /* 这块可以不指定,可以采用默认值,然后通信时告知server */
    if (client_socket_path[0] != 0) {
        rte_pdump_set_socket_dir(client_socket_path,
                                 RTE_PDUMP_SOCKET_CLIENT);
    }

    /* 启动pdump,因为参数不同,调用的函数和参数不同
     * 但是实质调用的函数都一致,就是pdump_create_client_socket
     */
    for (int i = 0; i < num_tuples; i++) {
        struct pdump_tuples *pt = &pdump_t[i];
        if (pt->dir == RTE_PDUMP_FLAG_RXTX) {
            if (pt->dump_by_type == DEVICE_ID) {
                rte_pdump_enable_by_deviceid(
                        pt->device_id,
                        pt->queue,
                        RTE_PDUMP_FLAG_RX,
                        pt->rx_ring,
                        pt->mp, NULL);
                rte_pdump_enable_by_deviceid(
                        pt->device_id,
                        pt->queue,
                        RTE_PDUMP_FLAG_TX,
                        pt->tx_ring,
                        pt->mp, NULL);
            } else if (pt->dump_by_type == PORT_ID) {
                rte_pdump_enable(pt->port, pt->queue,
                        RTE_PDUMP_FLAG_RX,
                        pt->rx_ring, pt->mp, NULL);
                rte_pdump_enable(pt->port, pt->queue,
                        RTE_PDUMP_FLAG_TX,
                        pt->tx_ring, pt->mp, NULL);
            }
        } else if (pt->dir == RTE_PDUMP_FLAG_RX) {
            ...
        } else if (pt->dir == RTE_PDUMP_FLAG_TX) {
            ...
        }
    }
}
static int
pdump_create_client_socket(struct pdump_request *p)
{
    int n;
    struct pdump_response server_resp;
    struct sockaddr_un addr, serv_addr, from;
    socklen_t addr_len, serv_len;

    int pid = getpid();

    int socket_fd = socket(AF_UNIX, SOCK_DGRAM, 0);

    pdump_get_socket_path(addr.sun_path, sizeof(addr.sun_path),
                          RTE_PDUMP_SOCKET_CLIENT);
    addr.sun_family = AF_UNIX;
    addr_len = sizeof(struct sockaddr_un);

    bind(socket_fd, (struct sockaddr *) &addr, addr_len);

    serv_len = sizeof(struct sockaddr_un);
    memset(&serv_addr, 0, sizeof(serv_addr));
    pdump_get_socket_path(serv_addr.sun_path,
                          sizeof(serv_addr.sun_path),
                          RTE_PDUMP_SOCKET_SERVER);
    serv_addr.sun_family = AF_UNIX;

    /*
     * 以上是创建socket,然后将配置内容发送给server
     * server配置好会给回复,然后就会进入到抓包的流程
     * 如果没有收到回复是不能抓包的
     */
    sendto(socket_fd, p, sizeof(struct pdump_request), 0,
           (struct sockaddr *)&serv_addr, serv_len);

    recvfrom(socket_fd, &server_resp,
             sizeof(struct pdump_response), 0,
             (struct sockaddr *)&from, &serv_len);

    close(socket_fd);
    unlink(addr.sun_path);
}

main-->dump_packets-->pdump_rxtx就开启了抓包的操作。

static inline void
pdump_rxtx(struct rte_ring *ring, uint8_t vdev_id, struct pdump_stats *stats)
{
    /* write input packets of port to vdev for pdump */
    struct rte_mbuf *rxtx_bufs[BURST_SIZE];

    /* 拿到报文 */
    const uint16_t nb_in_deq = rte_ring_dequeue_burst(ring,
            (void *)rxtx_bufs, BURST_SIZE);
    stats->dequeue_pkts += nb_in_deq;

    if (nb_in_deq) {
        /* 发送,可以是设备,可以是pcap文件 */
        uint16_t nb_in_txd = rte_eth_tx_burst(
                vdev_id,
                0, rxtx_bufs, nb_in_deq);
        stats->tx_pkts += nb_in_txd;

        if (unlikely(nb_in_txd < nb_in_deq)) {
            do {
                rte_pktmbuf_free(rxtx_bufs[nb_in_txd]);
                stats->freed_pkts++;
            } while (++nb_in_txd < nb_in_deq);
        }
    }
}

TODO

希望在server端能加上类似Linux kernel的bfp报文过滤。client可以支持各种filter语法。

最后修改:2021 年 08 月 18 日
如果觉得我的文章对你有用,请随意赞赏