了解版本为主要版本为1.5.12。主要从四个方面了解:主循环逻辑、TCP连接处理流程、HTTP连接处理流程。

主循环处理流程

主循环run_poll_loop

HAproxy的主循环在haproxy.c中的run_poll_loop()函数,代码如下:

/* Runs the polling loop */
void run_poll_loop()
{
    int next;
    tv_update_date(0,1);
    while (1)
    {
        /* check if we caught some signals and process them */
        signal_process_queue();
        /* Check if we can expire some tasks */
        wake_expired_tasks(&next);
        /* Process a few tasks */
        process_runnable_tasks(&next);
        /* stop when there's nothing left to do */
        if (jobs == 0)
            break;
        /* The poller will ensure it returns around <next> */
        cur_poller.poll(&cur_poller, next);
        fd_process_cached_events();
    }
}

主循环的结构比较清晰,就是循环的调用几个函数,并在适当的时候结束循环并退出:

  1. 处理信号队列。
  2. 超时任务。
  3. 处理可运行的任务。
  4. 检测是否可以结束循环。
  5. 执行 poll 处理 fd 的 IO 事件。
  6. 处理可能仍有 IO 事件的 fd.

signal_process_queue - 处理信号队对列

haproxy实现了自己的信号处理机制。接受到信号之后,将该信号放到信号队列中。在程序运行到signal_process_queue()时处理所有位于信号队列中的信号。

wake_expired_tasks - 唤醒超时任务

haproxy的顶层处理逻辑是task,task上存储着要处理的任务的全部信息。task的管理是采用队列方式,同时分为wait_queuerun_queue。顾名思义,wait_queue是需要等待一定时间的task的集合,而run_queue则代表需要立即执行的task的集合。

该函数就是检查wait_queue中那些超时的任务,并将其放到run_queue中。haproxy在执行的过程中,会因为一些情况导致需要将当前的任务通过调用task_queue等接口放到wait_queue中。

process_runnable_tasks - 处理可运行的任务

处理位于run_queue中的任务。

前面提到,wake_expired_tasks可能将一些超时的任务放到run_queue中。此外,haproxy执行的过程中,还有可能通过调用task_wakeup直接讲某个task放到run_queue中,这代表程序希望该任务下次尽可能快的被执行。

对于TCP或者HTTP业务流量的处理,该函数最终通过调用process_session来完成,包括解析已经接收到的数据, 并执行一系列load balance的特性,但不负责从socket收发数据。

jobs == 0 - 无任务可执行,结束循环

haproxy中用jobs记录当前要处理的任务总数,一个listener也会被计算在内。因此, 如果jobs为0的话,通常意味着haproxy要退出了,因为连listener都要释放了。 jobs的数值通常在process_session时更新。因此,是否可以退出循环,就放在了所有任务的process_session执行之后。

cur_poller.poll() - 执行poll处理fd的IO事件

haproxy启动阶段,会检测当前系统可以启用那种异步处理的机制,比如selectpollepollkqueue等,并注册对应poller的poll方法。epoll的相关函数接口在ev_epoll.c中。

这里就是执行已经注册的poller的poll方法,主要功能就是获取所有活动的fd,并调用对应的handler,完成接受新建连接、数据收发等功能。

处理可能仍有IO事件的fd

poller的poll方法执行时,程序会将某些符合条件以便再次执行IO处理的的fd放到fd_spec list[]中,fd_process_cached_events()函数会再次执行这些fd的io handler

TCP连接处理流程

关键数据结构 session

haproxy负责处理请求的核心数据结构是struct session,本文不对该数据结构进行分析。

从业务的处理的角度,简单介绍一下对session的理解:

  • haproxy每接收到client的一个连接,便会创建一个session结构,该结构一直伴随着连接的处理,直至连接被关闭,session才会被释放。
  • haproxy其他的数据结构,大多会通过引用的方式和session进行关联。
  • 一个业务session上会存在两个TCP连接,一个是client到haproxy,一个是haproxy到后端server.

此外,一个session,通常还要对应一个task,haproxy最终用来做调度的是通过task.

相关初始化

在haproxy正式处理请求之前,会有一系列初始化动作。这里介绍和请求处理相关的一些初始化。

初始化处理TCP连接的方法

初始化处理TCP协议的相关数据结构,主要是和socket相关的方法的声明。详细见下面proto_tcpv4 (proto_tcp.c)的初始化:

/* Note: must not be declared <const> as its list will be overwritten */
static struct protocol proto_tcpv4 = {
    .name = "tcpv4",
    .sock_domain = AF_INET,
    .sock_type = SOCK_STREAM,
    .sock_prot = IPPROTO_TCP,
    .sock_family = AF_INET,
    .sock_addrlen = sizeof(struct sockaddr_in),
    .l3_addrlen = 32/8,
    .accept = &listener_accept,
    .connect = tcp_connect_server,
    .bind = tcp_bind_listener,
    .bind_all = tcp_bind_listeners,
    .unbind_all = unbind_all_listeners,
    .enable_all = enable_all_listeners,
    .get_src = tcp_get_src,
    .get_dst = tcp_get_dst,
    .drain = tcp_drain,
    .pause = tcp_pause_listener,
    .listeners = LIST_HEAD_INIT(proto_tcpv4.listeners),
    .nb_listeners = 0,
};

初始化 listener

listener,顾名思义,就是用于负责处理监听相关的逻辑。

在haproxy解析bind配置的时候赋值给listener的proto成员。函数调用流程如下:

cfgparse.c
-> cfg_parse_listen
-> str2listener
-> tcpv4_add_listener
-> listener->proto = &proto_tcpv4;

由于这里初始化的是listener处理socket的一些方法。可以推断,haproxy接收client新建连接的入口函数应该是protocol结构体中的accpet方法。对于tcpv4来说,就是listener_accept()函数。

listener的其他初始化

cfgparse.c
-> check_config_validity
-> listener->accept = session_accept;
   listener->frontend = curproxy; (解析 frontend 时,会执行赋值: curproxy->accept = frontend_accept)
   listener->handler = process_session;

整个haproxy配置文件解析完毕,listener也已初始化完毕。可以简单梳理一下几个accept方法的设计逻辑:

  • stream_sock_accept(): 负责接收新建TCP连接,并触发listener自己的accept方法session_accept()
  • session_accept(): 负责创建session,并作session成员的初步初始化,并调用frontend的accept方法front_accetp()
  • frontend_accept(): 该函数主要负责session前端的TCP连接的初始化,包括socket设置,log设置,以及session部分成员的初始化。

下文分析TCP新建连接处理过程,基本上就是这三个函数的分析。

绑定所有已注册协议上的listeners

haproxy.c
-> protocol_bind_all
-> all registered protocol bind_all
-> tcp_bind_listeners (TCP)
-> tcp_bind_listener
-> [ fdtab[fd].iocb = listener->proto->accept ]

该函数指针指向proto_tcpv4结构体的accept成员,即函数listener_accept

启用所有已注册协议上的listeners

把所有 listeners 的 fd 加到 polling lists 中 haproxy.c -> protocol_enable_all -> all registered protocol enable_all -> enable_all_listeners (TCP) -> enable_listener 函数会将处于 LI_LISTEN 的 listener 的状态修改为 LI_READY,并调用 cur poller 的 set 方法, 比如使用 sepoll,就会调用 __fd_set.

TCP 连接的处理流程

接受新建连接

前面几个方面的分析,主要是为了搞清楚当请求到来时,处理过程中实际的函数调用关系。以下分析TCP建连过程。

haproxy.c
-> run_poll_loop
-> cur_poller.poll
-> _do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法)
-> fd_process_polled_events
-> fdtab[fd].iocb (TCP 协议的该函数指针指向 listener_accept )
-> listener_accept
-> 按照 global.tune.maxaccept 的设置尽量可能多执行系统调用 accept,然后再调用 l->accept(),即 listener 的 accept 方法 session_accept
-> session_accept

session_accept主要完成以下功能:

  • 调用conn_new分配一个connection结构。
  • cli_conn->t.sock.fd = cfd: 系统调用返回的 cfd 记录到连接。
  • cli_conn->addr.from = *addr: 记录ip地址。
  • 调用pool_alloc2分配一个session结构。
  • 调用task_new分配一个新任务。
  • 将新分配的session加入全局sessions链表中。
  • si_attach_connsi_conn_cb绑定到connection(si_conn_recv_cbsi_conn_send_cb)。
  • conn_attachsess_conn_cb绑定到connection(全为NULL)。
  • conn_ctrl_init主要是fdtab[fd].iocb = conn_fd_handler
  • session_complete中的session和task的初始化,若干重要成员的初始化如下:

    • t->process = l->handler:即t->process指向process_session
    • t->context = s:任务的上下文指向session。
    • s->listener = l:session的listener成员指向当前的listener。
    • s->si[]的初始化,将si_conn_cb绑定connection(si_conn_recv_cbsi_conn_send_cb)。
    • s->reqs->rep分别分配内存,并作对应的初始化。

      • s->req = pool_alloc2(pool2_buffer)。
      • s->rep = pool_alloc2(pool2_buffer)。
      • 从代码上来看,应该是各自独立分配tune.bufsize + sizeof struct buffer大小的内存。
    • 初始化s->txn
  • p->accept执行proxy的accept方法即frontend_accept

    • 设置session结构体的log成员。
    • 根据配置的情况,分别设置新建连接套接字的选项,包括TCP_NODELAY/KEEPALIVE/LINGER/SNDBUF/RCVBUF等等。
    • 如果mode是http的话,将session的txn成员做相关的设置和初始化。

TCP连接上的接收事件

haproxy.c
-> run_poll_loop
-> cur_poller.poll
-> _do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法)
-> fd_process_polled_events
-> fdtab[fd].iocb(fd) (该函数在建连阶段被初始化为四层协议的 read 方法,对于 TCP 协议,为 conn_fd_handler)
-> conn_fd_handler
-> conn->data->recv(对应si_conn_recv_cb)
-> si_conn_recv_cb

si_conn_recv_cb主要完成以下功能:

  • 找到当前连接的读缓冲,即当前 session 的 req buffer:struct channel *chn = si->ib
  • 根据配置,调用splice或者recv读取套接字上的数据,并填充到读缓冲中,即填充到从b->r(初始位置应该就是b->data)开始的内存中。
  • ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward); -> raw_sock_to_buf
  • 如果读取到0字节,则意味着接收到对端的关闭请求,调用stream_sock_shutr进行处理。

    • 读缓冲标记si->ib->flagsBF_SHUTR置位,清除当前fd的epoll读事件,不再从该fd读取。
    • 如果写缓冲si->ob->flagsBF_SHUTW已经置位,说明应该是由本地首先发起的关闭连接动作。

      • 将fd从fdset[]中清除,从epoll中移除fd,执行系统调用close(fd)fd.state置位FD_STCLOSE
        *stream interface的状态修改si->state = SI_ST_DIS
  • 唤醒任务task_wakeup,把当前任务加入到run queue中。随后检测runnable tasks时,就会处理该任务。

TCP连接上的发送事件

haproxy.c
-> run_poll_loop
-> cur_poller.poll
-> _do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法)
-> fdtab[fd].iocb(fd) (该函数在建连阶段被初始化为四层协议的 write 方法,对于 TCP 协议,为 conn_fd_handler)
-> conn_fd_handler
-> conn->data->send(对应si_conn_send_cb)
-> si_conn_send_cb

si_conn_send_cb主要完成以下功能:

  • 找到当前连接的写缓冲,即当前session的rep buffer:struct channel *chn = si->ob;
  • 将待发送的数据调用send系统调用发送出去。
  • 或者数据已经发送完毕,需要发送关闭连接的动作stream_sock_shutw->系统调用shutdown。
  • 唤醒任务task_wakeup,把当前任务加入到run queue中。随后检测runnable tasks时,就会处理该任务。

HTTP 请求的处理

haproxy.c
-> run_poll_loop
-> process_runnable_tasks,查找当前待处理的任务所有 tasks, 然后调用 task->process(大多时候就是 process_session) 进行处理
-> process_session

process_session主要完成以下功能:

  • 处理连接需要关闭的情形,分支resync_stream_interface
  • 处理请求,分支resync_request (read event)。

    • 根据s->req->analysers的标记位,调用不同的analyser进行处理请求。
    • ana_list & AN_REQ_WAIT_HTTP:http_wait_for_request
    • ana_list & AN_REQ_HTTP_PROCESS_FE:http_process_req_common
    • ana_list & AN_REQ_SWITCHING_RULES:process_switching_rules
  • 处理应答,分支resync_response(write event)。

    • 根据s->rep->analysers的标记位,调用不同的analyser进行处理请求。
    • ana_list & AN_RES_WAIT_HTTP:http_wait_for_response
    • ana_list & AN_RES_HTTP_PROCESS_BE:http_process_res_common
  • 处理forward buffer的相关动作。
  • 关闭req和rep的buffer,调用pool2_free释放session及其申请的相关内存,包括读写缓冲 (read 0 bytes)。

    • pool_free2(pool2_buffer, s->req);
    • pool_free2(pool2_buffer, s->rep);
    • pool_free2(pool2_session, s);
  • task从运行任务队列中清除,调用pool2_free释放 task申请的内存:task_delete(); task_free();

本文简单分析了TCP连接的处理过程,不侧重细节分析,而且缺少后端server的选择以及建连等,重在希望展示出一个haproxy处理TCP连接的框架。

HTTP请求处理

初始化session数据处理相关的设置

建连的处理基本上就是_do_poll->listener_accept->session_accept->fronend_accept()

其中session_accept()会设置新建fd的io handler

/* Add the various callbacks. Right now the transport layer is present
 * but not initialized. Also note we need to be careful as the stream
 * int is not initialized yet.
 */
conn_prepare(s->si[0].conn, &sess_conn_cb, l->proto, l->xprt, s);

fdtab[cfd].owner = s->si[0].conn; /*fd 对应的 owner 为 connection 结构*/
fdtab[cfd].iocb = conn_fd_handler;
conn_data_want_recv(s->si[0].conn);
if (conn_xprt_init(s->si[0].conn) < 0)
    goto out_free_task;

IPv4 http对应的listener的xprt和proto分别被初始化为

l->xprt = &raw_sock;
l->proto = &proto_tcpv4;

conn_prepare()就是将相关数据收发以及连接处理的函数都赋值到connection结构体上:

/* Assigns a connection with the appropriate data, ctrl, transport layers, and owner. */
static inline void conn_assign(struct connection *conn, const struct data_cb *data,
                               const struct protocol *ctrl, const struct xprt_ops *xprt,
                               void *owner)
{
    conn->data = data;
    conn->ctrl = ctrl;
    conn->xprt = xprt;
    conn->owner = owner;
}

/* prepares a connection with the appropriate data, ctrl, transport layers, and
 * owner. The transport state and context are set to 0.
 */
static inline void conn_prepare(struct connection *conn, const struct data_cb *data,
                                const struct protocol *ctrl, const struct xprt_ops *xprt,
                                void *owner)
{
    conn_assign(conn, data, ctrl, xprt, owner);
    conn->xprt_st = 0;
    conn->xprt_ctx = NULL;
}

经过初始化,session client端的connection结构体初始化完成:

  • conn->data指向sess_conn_cb。后面调用session_complete()会被再次赋值。
  • conn->ctrl指向l->proto, IPv4下为proto_tcpv4
  • conn->xprt执向l->xprt, 不启用SSL时为raw_sock,启用SSL时为ssl_sock
  • conn->owner指向session

接着调用session_complete完成建立一个session所需要的最后的初始化工作,其中包含调用frontend_accept,并将当前session对应的task放入runqueue中以待下次执行:

...
si_takeover_conn(&s->si[0], l->proto, l->xprt);
...
t->process = l->handler;
...
if (p->accept && (ret = p->accept(s)) <= 0) {
    /* Either we had an unrecoverable error (<0) or work is
     * finished (=0, eg: monitoring), in both situations,
     * we can release everything and close.
     */
    goto out_free_rep_buf;
}
...
task_wakeup(t, TASK_WOKEN_INIT);

其中si_takeover_conn完成为si分配连接的处理函数,实现如下:

static inline void si_takeover_conn(struct stream_interface *si, const struct protocol *ctrl, const struct xprt_ops *xprt)
{
    si->ops = &si_conn_ops;
    conn_assign(si->conn, &si_conn_cb, ctrl, xprt, si);
}

si_conn_cb的定义如下:

struct data_cb si_conn_cb = {
    .recv    = si_conn_recv_cb,
    .send    = si_conn_send_cb,
    .wake    = si_conn_wake_cb,
};

因此,si->conn->data指向了si_conn_cb。这个结构用在随后的recv/send中。

此外,session所对应的任务task在session_complete的最后通过调用task_wakeup()是在随后的循环中被执行。task的处理函数初始化为l->handlerprocess_session()

至此,一个新建session的client fd的io处理函数conn_fd_handler()及session的处理函数process_session()都已经正确初始化好了。

以后基本上就是这两个函数分别负责数据的读取,以及业务的处理。

接收client发送的请求数据

epoll中考虑的新建连接通常会尽可能快的传输数据,因此对于新建的fd,通常会尽快的执行io handler,即调用conn_fd_handler

是在ev_epoll.c中的_do_poll()中进行:

gettimeofday(&before_poll, NULL);
status = epoll_wait(epoll_fd, epoll_events, global.tune.maxpollevents, wait_time);
tv_update_date(wait_time, status);
measure_idle();

/* process polled events */

for (count = 0; count < status; count++) {
    unsigned int n;
    unsigned int e = epoll_events[count].events;
    fd = epoll_events[count].data.fd;
    ...
    /* Save number of updates to detect creation of new FDs. */
    old_updt = fd_nbupdt;
    fdtab[fd].iocb(fd);
    ...
    for (new_updt = fd_nbupdt; new_updt > old_updt; new_updt--) {
        fd = fd_updt[new_updt - 1];
        ...
        if (fdtab[fd].ev && fdtab[fd].iocb && fdtab[fd].owner)
            fdtab[fd].iocb(fd);
        ...
    }

上面代码中第一处执行iocb()的是由epoll_wait()返回的fd触发的。而第二次的iocb()则就是在前面iocb的执行过程中新建的fd,为了提高效率,则直接调用该fd的iocb(),也 就是conn_fd_handler()函数。

int conn_fd_handler(int fd)
{
    struct connection *conn = fdtab[fd].owner;
    ...
    if ((fdtab[fd].ev & (FD_POLL_IN | FD_POLL_HUP | FD_POLL_ERR)) &&
        conn->xprt &&
        !(conn->flags & (CO_FL_WAIT_RD|CO_FL_WAIT_ROOM|CO_FL_ERROR|CO_FL_HANDSHAKE))) {
        /* force detection of a flag change : it's impossible to have both
         * CONNECTED and WAIT_CONN so we're certain to trigger a change.
         */
        flags = CO_FL_WAIT_L4_CONN | CO_FL_CONNECTED;
        conn->data->recv(conn);
    }
    ...
}

根据的session_complete的初始化,上面代码conn->data->recv指向si_conn_recv_cb()。 该函数就是haproxy中负责接收数据的入口函数。相同的,si_conn_send_cb()就是haproxy中负责发送数据的入口函数。

si_conn_recv_cb()函数简单介绍如下:

if (conn->xprt->rcv_pipe &&
    chn->to_forward >= MIN_SPLICE_FORWARD && chn->flags & CF_KERN_SPLICING) {
    ...
    ret = conn->xprt->rcv_pipe(conn, chn->pipe, chn->to_forward);
    ...
}
...
while (!chn->pipe && !(conn->flags & (CO_FL_ERROR | CO_FL_SOCK_RD_SH | CO_FL_DATA_RD_SH | CO_FL_WAIT_RD | CO_FL_WAIT_ROOM | CO_FL_HANDSHAKE))) {

    ...
    ret = conn->xprt->rcv_buf(conn, chn->buf, max);
    ...
}

该函数主要根据数据的接收情况,选择调用xprt的rcv_pipe还是rcv_buf。前面已经分析过,conn->xprt指向了listner的xprt,不启用SSL就是raw_sock数据结构。

因此,数据的接收最终是通过调用raw_sockraw_sock_to_pipe或/和raw_sock_to_buf完成的。

连接处理过程

haproxy.c
-> run_poll_loop 调用 cur_poller.poll, 实际为_do_poll (如果配置使用的是 sepoll,则调用 ev_sepoll.c 中的 poll 方法)
--> _do_poll 中监听事件,触发之后根据事件数量依次调用 fd_process_polled_events
---> fd_process_polled_events 调用 fdtab[fd].iocb (TCP协议的该函数指针指向listener_accept)
----> listener_accept 按照 global.tune.maxaccept 的设置尽量可能多执行系统调用 accept,然后再调用 l->accept(),即 listener 的 accept 方法 session_accept
-----> session_accept 调用 session_complete
------> session_complete 调用 p->accept 即 frontend_accept
-------> frontend_accept 调用完毕后退回到 fd_process_polled_events
---> fd_process_polled_events 进入循环判定是否有新的fd加入,有的话调用fd对应的fdtab[fd].iocb(TCP协议的该函数指针指向 conn_fd_handler)
----> conn_fd_handler 调用 conn->data->recv(conn) (指向 si_conn_recv_cb)
-----> si_conn_recv_cb 调用 conn->xprt->rcv_buf(conn, chn->buf, max) (指向 raw_sock_to_buf)
------> raw_sock_to_buf 调用 recv(conn->t.sock.fd, bi_end(buf), try, 0) 接收数据,退回到 conn_fd_handler
----> conn_fd_handler 调用 conn->data->wake(conn) (指向 si_conn_wake_cb)
-----> si_conn_wake_cb 退回到 run_poll_loop
-> run_poll_loop 调用 process_runnable_tasks
最后修改:2021 年 08 月 18 日
如果觉得我的文章对你有用,请随意赞赏