Libuv 管道pipes

libuv的uv_pipe_t结构可能会让一些unix程序员产生困惑,因为它像魔术般变幻出|和pipe(7)。但这里的uv_pipe_t并不是IPC机制里的 匿名管道(在IPC里,pipe是 匿名管道,只允许父子进程之间通信。FIFO则允许没有亲戚关系的进程间通信,显然llibuv里的uv_pipe_t不是第一种)。uv_pipe_t背后有unix本地socket或者windows 具名管道的支持,可以实现多进程间的通信。下面会具体讨论。

Parent-child IPC
父进程与子进程可以通过单工或者双工管道通信,获得管道可以通过设置uv_stdio_container_t.flags为UV_CREATE_PIPE,UV_READABLE_PIPE或者UV_WRITABLE_PIPE的按位或的值。上述的读/写标记是对于子进程而言的。

Arbitrary process IPC
既然本地socket具有确定的名称,而且是以文件系统上的位置来标示的(例如,unix中socket是文件的一种存在形式),那么它就可以用来在不相关的进程间完成通信任务。被开源桌面环境使用的D-BUS系统也是使用了本地socket来作为事件通知的,例如,当消息来到,或者检测到硬件的时候,各种应用程序会被通知到。mysql服务器也运行着一个本地socket,等待客户端的访问。

当使用本地socket的时候,客户端/服务器模型通常和之前类似。在完成初始化后,发送和接受消息的方法和之前的tcp类似,接下来我们同样适用echo服务器的例子来说明。
pipe-echo-server/main.c

int main() {
    loop = uv_default_loop();

    uv_pipe_t server;
    uv_pipe_init(loop, &server, 0);

    signal(SIGINT, remove_sock);

    int r;
    if ((r = uv_pipe_bind(&server, "echo.sock"))) {
        fprintf(stderr, "Bind error %s\n", uv_err_name(r));
        return 1;
    }
    if ((r = uv_listen((uv_stream_t*) &server, 128, on_new_connection))) {
        fprintf(stderr, "Listen error %s\n", uv_err_name(r));
        return 2;
    }
    return uv_run(loop, UV_RUN_DEFAULT);
}

我们把socket命名为echo.sock,意味着它将会在本地文件夹中被创造。对于stream API来说,本地socekt表现得和tcp的socket差不多。你可以使用socat测试一下服务器:

$ socat - /path/to/socket

客户端如果想要和服务器端连接的话,应该使用:

void uv_pipe_connect(uv_connect_t *req, uv_pipe_t *handle, const char *name, uv_connect_cb cb);

上述函数,name应该为echo.sock。

Sending file descriptors over pipes
最酷的事情是本地socket可以传递文件描述符,也就是说进程间可以交换文件描述符。这样就允许进程将它们的I/O传递给其他进程。它的应用场景包括,负载均衡服务器,分派工作进程等,各种可以使得cpu使用最优化的应用。libuv当前只支持通过管道传输TCP sockets或者其他的pipes。

为了展示这个功能,我们将来实现一个由循环中的工人进程处理client端请求,的这么一个echo服务器程序。这个程序有一些复杂,在教程中只截取了部分的片段,为了更好地理解,我推荐你去读下完整的代码。

工人进程很简单,文件描述符将从主进程传递给它。

multi-echo-server/worker.c

uv_loop_t *loop;
uv_pipe_t queue;
int main() {
    loop = uv_default_loop();

    uv_pipe_init(loop, &queue, 1 /* ipc */);
    uv_pipe_open(&queue, 0);
    uv_read_start((uv_stream_t*)&queue, alloc_buffer, on_new_connection);
    return uv_run(loop, UV_RUN_DEFAULT);
}

queue是另一端连接上主进程的管道,因此,文件描述符可以传送过来。在uv_pipe_init中将ipc参数设置为1很关键,因为它标明了这个管道将被用来做进程间通信。因为主进程需要把文件handle赋给了工人进程作为标准输入,因此我们使用uv_pipe_open把stdin作为pipe(别忘了,0代表stdin)。
multi-echo-server/worker.c

void on_new_connection(uv_stream_t *q, ssize_t nread, const uv_buf_t *buf) {
    if (nread < 0) {
        if (nread != UV_EOF)
            fprintf(stderr, "Read error %s\n", uv_err_name(nread));
        uv_close((uv_handle_t*) q, NULL);
        return;
    }

    uv_pipe_t *pipe = (uv_pipe_t*) q;
    if (!uv_pipe_pending_count(pipe)) {
        fprintf(stderr, "No pending count\n");
        return;
    }

    uv_handle_type pending = uv_pipe_pending_type(pipe);
    assert(pending == UV_TCP);

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(q, (uv_stream_t*) client) == 0) {
        uv_os_fd_t fd;
        uv_fileno((const uv_handle_t*) client, &fd);
        fprintf(stderr, "Worker %d: Accepted fd %d\n", getpid(), fd);
        uv_read_start((uv_stream_t*) client, alloc_buffer, echo_read);
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

首先,我们调用uv_pipe_pending_count来确定从handle中可以读取出数据。如果你的程序能够处理不同类型的handle,这时uv_pipe_pending_type就可以用来决定当前的类型。虽然在这里使用accept看起来很怪,但实际上是讲得通的。accept最常见的用途是从其他的文件描述符(监听的socket)获取文件描述符(client端)。这从原理上说,和我们现在要做的是一样的:从queue中获取文件描述符(client)。接下来,worker可以执行标准的echo服务器的工作了。

我们再来看看主进程,观察如何启动worker来达到负载均衡。

multi-echo-server/main.c

struct child_worker {
    uv_process_t req;
    uv_process_options_t options;
    uv_pipe_t pipe;
} *workers;

child_worker结构包裹着进程,和连接主进程和各个独立进程的管道。

multi-echo-server/main.c

void setup_workers() {
    round_robin_counter = 0;

    // ...

    // launch same number of workers as number of CPUs
    uv_cpu_info_t *info;
    int cpu_count;
    uv_cpu_info(&info, &cpu_count);
    uv_free_cpu_info(info, cpu_count);

    child_worker_count = cpu_count;

    workers = calloc(sizeof(struct child_worker), cpu_count);
    while (cpu_count--) {
        struct child_worker *worker = &workers[cpu_count];
        uv_pipe_init(loop, &worker->pipe, 1);

        uv_stdio_container_t child_stdio[3];
        child_stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE;
        child_stdio[0].data.stream = (uv_stream_t*) &worker->pipe;
        child_stdio[1].flags = UV_IGNORE;
        child_stdio[2].flags = UV_INHERIT_FD;
        child_stdio[2].data.fd = 2;

        worker->options.stdio = child_stdio;
        worker->options.stdio_count = 3;

        worker->options.exit_cb = close_process_handle;
        worker->options.file = args[0];
        worker->options.args = args;

        uv_spawn(loop, &worker->req, &worker->options); 
        fprintf(stderr, "Started worker %d\n", worker->req.pid);
    }
}

首先,我们使用酷炫的uv_cpu_info函数获取到当前的cpu的核心个数,所以我们也能启动一样数目的worker进程。再次强调一下,务必将uv_pipe_init的ipc参数设置为1。接下来,我们指定子进程的stdin是一个可读的管道(从子进程的角度来说)。接下来的一切就很直观了,worker进程被启动,等待着文件描述符被写入到他们的标准输入中。

在主进程的on_new_connection中,我们接收了client端的socket,然后把它传递给worker环中的下一个可用的worker进程。

multi-echo-server/main.c

void on_new_connection(uv_stream_t *server, int status) {
    if (status == -1) {
        // error!
        return;
    }

    uv_tcp_t *client = (uv_tcp_t*) malloc(sizeof(uv_tcp_t));
    uv_tcp_init(loop, client);
    if (uv_accept(server, (uv_stream_t*) client) == 0) {
        uv_write_t *write_req = (uv_write_t*) malloc(sizeof(uv_write_t));
        dummy_buf = uv_buf_init("a", 1);
        struct child_worker *worker = &workers[round_robin_counter];
        uv_write2(write_req, (uv_stream_t*) &worker->pipe, &dummy_buf, 1, (uv_stream_t*) client, NULL);
        round_robin_counter = (round_robin_counter + 1) % child_worker_count;
    }
    else {
        uv_close((uv_handle_t*) client, NULL);
    }
}

uv_write2能够在所有的情形上做了一个很好的抽象,我们只需要将client作为一个参数即可完成传输。现在,我们的多进程echo服务器已经可以运转起来啦。

感谢Kyle指出了uv_write2需要一个不为空的buffer。