Libuv 线程同步

Mutexes

libuv上的互斥量函数与pthread上存在一一映射。如果对pthread上的mutex不是很了解可以看这里。

线程mutex函数
  • UV_EXTERN int uv_mutex_init(uv_mutex_t* handle);
  • UV_EXTERN void uv_mutex_destroy(uv_mutex_t* handle);
  • UV_EXTERN void uv_mutex_lock(uv_mutex_t* handle);
  • UV_EXTERN int uv_mutex_trylock(uv_mutex_t* handle);
  • UV_EXTERN void uv_mutex_unlock(uv_mutex_t* handle);

uv_mutex_init与uv_mutex_trylock在成功执行后,返回0,或者在错误时,返回错误码。

如果libuv在编译的时候开启了调试模式,uv_mutex_destroy(), uv_mutex_lock() 和 uv_mutex_unlock()会在出错的地方调用abort()中断。类似的,uv_mutex_trylock()也同样会在错误发生时中断,而不是返回EAGAIN和EBUSY。

递归地调用互斥量函数在某些系统平台上是支持的,但是你不能太过度依赖。因为例如在BSD上递归地调用互斥量函数会返回错误,比如你准备使用互斥量函数给一个已经上锁的临界区再次上锁的时候,就会出错。比如,像下面这个例子:

uv_mutex_lock(a_mutex);
uv_thread_create(thread_id, entry, (void *)a_mutex);
uv_mutex_lock(a_mutex);
// more things here

可以用来等待其他线程初始化一些变量然后释放a_mutex锁,但是第二次调用uv_mutex_lock(), 在调试模式下会导致程序崩溃,或者是返回错误。

在linux中是支持递归上锁的,但是在libuv的API中并未实现。

Lock

读写锁是更细粒度的实现机制。两个读者线程可以同时从共享区中读取数据。当读者以读模式占有读写锁时,写者不能再占有它。当写者以写模式占有这个锁时,其他的写者或者读者都不能占有它。读写锁在数据库操作中非常常见,下面是一个玩具式的例子:

locks/main.c - simple rwlocks

#include <stdio.h>
#include <uv.h>

uv_barrier_t blocker;
uv_rwlock_t numlock;
int shared_num;

void reader(void *n)
{
    int num = *(int *)n;
    int i;
    for (i = 0; i < 20; i++) {
        uv_rwlock_rdlock(&numlock);
        printf("Reader %d: acquired lockn", num);
        printf("Reader %d: shared num = %dn", num, shared_num);
        uv_rwlock_rdunlock(&numlock);
        printf("Reader %d: released lockn", num);
    }
    uv_barrier_wait(&blocker);
}

void writer(void *n)
{
    int num = *(int *)n;
    int i;
    for (i = 0; i < 20; i++) {
        uv_rwlock_wrlock(&numlock);
        printf("Writer %d: acquired lockn", num);
        shared_num++;
        printf("Writer %d: incremented shared num = %dn", num, shared_num);
        uv_rwlock_wrunlock(&numlock);
        printf("Writer %d: released lockn", num);
    }
    uv_barrier_wait(&blocker);
}

int main()
{
    uv_barrier_init(&blocker, 4);

    shared_num = 0;
    uv_rwlock_init(&numlock);

    uv_thread_t threads[3];

    int thread_nums[] = {1, 2, 1};
    uv_thread_create(&threads[0], reader, &thread_nums[0]);
    uv_thread_create(&threads[1], reader, &thread_nums[1]);

    uv_thread_create(&threads[2], writer, &thread_nums[2]);

    uv_barrier_wait(&blocker);
    uv_barrier_destroy(&blocker);

    uv_rwlock_destroy(&numlock);
    return 0;
}

试着来执行一下上面的程序,看读者有多少次会同步执行。在有多个写者的时候,调度器会给予他们高优先级。因此,如果你加入两个读者,你会看到所有的读者趋向于在读者得到加锁机会前结束。

在上面的例子中,我们也使用了屏障。因此主线程来等待所有的线程都已经结束,最后再将屏障和锁一块回收。

其它同步量

libuv同样支持信号量,条件变量和屏障,而且API的使用方法和pthread中的用法很类似。

还有,libuv提供了一个简单易用的函数uv_once()。多个线程调用这个函数,参数可以使用一个uv_once_t和一个指向特定函数的指针,最终只有一个线程能够执行这个特定函数,并且这个特定函数只会被调用一次:

/* Initialize guard */
static uv_once_t once_only = UV_ONCE_INIT;

int i = 0;

void increment() {
   i++;
}

void thread1() {
   /* ... work */
   uv_once(once_only, increment);
}

void thread2() {
   /* ... work */
   uv_once(once_only, increment);
}

int main() {
   /* ... spawn threads */
}

当所有的线程执行完毕时,i == 1。

在libuv的v0.11.11版本里,推出了uv_key_t结构和操作线程局部存储TLS的API,使用方法同样和pthread类似。

libuv work queue

uv_queue_work()是一个便利的函数,它使得一个应用程序能够在不同的线程运行任务,当任务完成后,回调函数将会被触发。它看起来好像很简单,但是它真正吸引人的地方在于它能够使得任何第三方的库都能以event-loop的方式执行。当使用event-loop的时候,最重要的是不能让loop线程阻塞,或者是执行高cpu占用的程序,因为这样会使得loop慢下来,loop event的高效特性也不能得到很好地发挥。

然而,很多带有阻塞的特性的程序(比如最常见的I/O)使用开辟新线程来响应新请求(最经典的‘一个客户,一个线程‘模型)。使用event-loop可以提供另一种实现的方式。libuv提供了一个很好的抽象,使得你能够很好地使用它。

下面有一个很好的例子,灵感来自<>。我们将要执行fibonacci数列,并且睡眠一段时间,但是将阻塞和cpu占用时间长的任务分配到不同的线程,使得其不会阻塞event loop上的其他任务。
queue-work/main.c - lazy fibonacci

void fib(uv_work_t *req) {
    int n = *(int *) req->data;
    if (random() % 2)
        sleep(1);
    else
        sleep(3);
    long fib = fib_(n);
    fprintf(stderr, "%dth fibonacci is %lun", n, fib);
}

void after_fib(uv_work_t *req, int status) {
    fprintf(stderr, "Done calculating %dth fibonaccin", *(int *) req->data);
}

任务函数很简单,也还没有运行在线程之上。uv_work_t是关键线索,你可以通过void *data传递任何数据,使用它来完成线程之间的沟通任务。但是你要确信,当你在多个线程都在运行的时候改变某个东西的时候,能够使用适当的锁。

触发器是uv_queue_work:

int main() {
    loop = uv_default_loop();

    int data[FIB_UNTIL];
    uv_work_t req[FIB_UNTIL];
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        data[i] = i;
        req[i].data = (void *) &data[i];
        uv_queue_work(loop, &req[i], fib, after_fib);
    }

    return uv_run(loop, UV_RUN_DEFAULT);
}

线程函数fbi()将会在不同的线程中运行,传入uv_work_t结构体参数,一旦fib()函数返回,after_fib()会被event loop中的线程调用,然后被传入同样的结构体。

为了封装阻塞的库,常见的模式是用baton来交换数据。

从libuv 0.9.4版后,添加了函数uv_cancel()。它可以用来取消工作队列中的任务。只有还未开始的任务可以被取消,如果任务已经开始执行或者已经执行完毕,uv_cancel()调用会失败。

当用户想要终止程序的时候,uv_cancel()可以用来清理任务队列中的等待执行的任务。例如,一个音乐播放器可以以歌手的名字对歌曲进行排序,如果这个时候用户想要退出这个程序,uv_cancel()就可以做到快速退出,而不用等待执行完任务队列后,再退出。

让我们对上述程序做一些修改,用来演示uv_cancel()的用法。首先让我们注册一个处理中断的函数。

queue-cancel/main.c

void signal_handler(uv_signal_t *req, int signum)
{
    printf("Signal received!n");
    int i;
    for (i = 0; i < FIB_UNTIL; i++) {
        uv_cancel((uv_req_t*) &fib_reqs[i]);
    }
    uv_signal_stop(req);
}

对于已经成功取消的任务,他的回调函数的参数status会被设置为UV_ECANCELED。

queue-cancel/main.c

void after_fib(uv_work_t *req, int status) {
    if (status == UV_ECANCELED)
        fprintf(stderr, "Calculation of %d cancelled.n", *(int *) req->data);
}

uv_cancel()函数同样可以用在uv_fs_t和uv_getaddrinfo_t请求上。对于一系列的文件系统操作函数来说,uv_fs_t.errorno会同样被设置为UV_ECANCELED。

一个良好设计的程序,应该能够终止一个已经开始运行的长耗时任务。
Such a worker could periodically check for a variable that only the main process sets to signal termination.