libuv网络I/O机制

两条故事线去探索http.Server从listen()connection事件的触发其背后的原理,主要是为了理解libuv在网络I/O方面的异步实现。一条故事线看TCP handle的I/O观察者是怎样加入到event loop的观察者队列,另一条故事线看隐藏于背后的event loop在liunx下如何利用系统的epoll机制注册并收集events从而调用观察者回调。

源码解读以linux系统为方向。我们知道libuv对不同平台的内部网络I/O异步机制进行了抽象,如Windows的IOCP,FreeBSD下的kqueue,linux下的epoll等。

从js层面来到C++层面

http.Server类继承net.Server类,以Server.listen()为例,其实调用的是net.Server.listen(),实际调用的是setupListenHandle(),包括createServerHandle()到C++层面创建handle和调用C++层面定义的listen()方法:

1
2
3
4
5
6
7
rval = createServerHandle(address, port, addressType, fd);
this._handle = rval;
this[async_id_symbol] = getNewAsyncId(this._handle);
this._handle.onconnection = onconnection;
this._handle.owner = this;
var err = this._handle.listen(backlog || 511);
nextTick(this[async_id_symbol], emitListeningNT, this); //触发listening事件

emitListeningNT里触发listening事件,其中handle是C/C++内建模块tcp_wrap.cc里定义的,这里还定义了onconnection,这个会在C/C++层面合适的时候调用,而onconnection()里会触发connection事件。

说到handle创建,onconnection的调用,那就要开始我们的第一条故事线:向event loop里加入观察者。

故事线一:向event loop观察者队列里加入观察者

观察者的产生

首先TCPWrap类的constructor:

1
2
3
4
5
6
7
8
TCPWrap::TCPWrap(Environment* env, Local<Object> object)
: ConnectionWrap(env,
object,
AsyncWrap::PROVIDER_TCPWRAP) {
int r = uv_tcp_init(env->event_loop(), &handle_);
CHECK_EQ(r, 0);
UpdateWriteQueueSize();
}

我们看到调用了uv_tcp_init()用于设置handle_,比如将env.event_loop()赋值给handle_->loop

TCPWrap继承ConnectionWrap

1
2
3
class TCPWrap : public ConnectionWrap<TCPWrap, uv_tcp_t>{
//
}

然后这里这里的handle_就是在ConnectionWrap里定义的:

1
2
3
4
5
6
7
namespace node {
template <typename WrapType, typename UVType>
class ConnectionWrap : public StreamWrap {
...
UVType handle_;
};

可以看到这里的handle_类型根据模板参数UVType来,这样&wrap->handle_就是一个uv_tcp_t观察者类型的指针了。

关于uv_tcp_t:

1
2
3
4
5
struct uv_tcp_s {
UV_HANDLE_FIELDS
UV_STREAM_FIELDS
UV_TCP_PRIVATE_FIELDS
};

UV_HANDLE_FIELDS定义如下:

http://7xsi10.com1.z0.glb.clouddn.com/uv_handle_fileds.png

UV_STREAM_FIELDSUV_STREAM_PRIVATE_FIELDS定义如下:
http://7xsi10.com1.z0.glb.clouddn.com/UV_STREAM_PRIVATE_FIELDS.png

其中io_watcher就是我们所说的I/O观察者了,connection_cb将是最终C++进入js层面需要调用的,后面会详解。

TCPWrap::Listen

这便是js层面调用的TCP.listen

1
2
3
4
5
6
7
8
9
10
11
void TCPWrap::Listen(const FunctionCallbackInfo<Value>& args) {
TCPWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap,
args.Holder(),
args.GetReturnValue().Set(UV_EBADF));
int backlog = args[0]->Int32Value();
int err = uv_listen(reinterpret_cast<uv_stream_t*>(&wrap->handle_),
backlog,
OnConnection);
args.GetReturnValue().Set(err);
}

我们看到这里它调用了uv_listen,而参数有强制转换为uv_stream_tuv_tcp_t指针,

stream.c里会根据handle具体类型,调用不同的函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
int uv_listen(uv_stream_t* stream, int backlog, uv_connection_cb cb) {
int err;
switch (stream->type) {
case UV_TCP:
err = uv_tcp_listen((uv_tcp_t*)stream, backlog, cb);
break;
case UV_NAMED_PIPE:
err = uv_pipe_listen((uv_pipe_t*)stream, backlog, cb);
break;
default:
err = -EINVAL;
}
if (err == 0)
uv__handle_start(stream); //会执行activehandles++
return err;
}

这里调用`uv_tcp_listen`:

1
2
3
4
5
6
7
8
9
10
11
if (listen(tcp->io_watcher.fd, backlog))
return -errno;
tcp->connection_cb = cb;
tcp->flags |= UV_HANDLE_BOUND;
/* Start listening for connections. */
tcp->io_watcher.cb = uv__server_io;
uv__io_start(tcp->loop, &tcp->io_watcher, POLLIN);
return 0;

uv__io_t io_watcher.cb被设置为uv__server_io,该函数会由event loop调用,后面会详解。而tcp->connection_cb = cb被设置为cb,即onConnection

io_watcheruv__io_t类型:

1
2
3
4
5
6
7
8
9
struct uv__io_s {
uv__io_cb cb;
void* pending_queue[2];
void* watcher_queue[2];
unsigned int pevents;
unsigned int events;
int fd;
UV_IO_PRIVATE_PLATFORM_FIELDS
};

再进入uv__io_start():

1
2
3
4
5
6
7
8
9
10
11
12
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
w->pevents |= events;
maybe_resize(loop, w->fd + 1);
if (QUEUE_EMPTY(&w->watcher_queue))
QUEUE_INSERT_TAIL(&loop->watcher_queue, &w->watcher_queue);
if (loop->watchers[w->fd] == NULL) {
loop->watchers[w->fd] = w;
loop->nfds++;
}
}

可以看到这里,将events赋值给w->pevents,就是说加入到pending events里去,就是它对events事件感兴趣。这个会在后面epoll_wait()用到。

uv_tcp_t类型的_handleio_wathcer就加入到了_handle->loop(实际上就是event loop)的watcher_queue里。

至此,添加I/O观察者的任务就完成了,该函数执行完毕,TCP.listen()的调用就介绍,就会返回到js层面,去触发listening事件,可见,这个过程是同步执行的,那么listening事件真正的意义就是标志着I/O观察者成功加入到事件循环了。

而要谈真正意义上的异步,还得从另一条故事线出发,那便是隐藏于背后的event loop

故事线二:event loop

还记得我上一篇文章吗?在node.cc的最后一个inline Start()里:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
{
SealHandleScope seal(isolate);
bool more;
do {
v8_platform.PumpMessageLoop(isolate);
more = uv_run(env.event_loop(), UV_RUN_ONCE);
if (more == false) {
v8_platform.PumpMessageLoop(isolate);
EmitBeforeExit(&env);
more = uv_loop_alive(env.event_loop());
if (uv_run(env.event_loop(), UV_RUN_NOWAIT) != 0)
more = true;
}
} while (more == true);
}

uv_run()开启了event loop。

然后我们进入到uv_run()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
int uv_run(uv_loop_t* loop, uv_run_mode mode) {
int timeout;
int r;
int ran_pending;
r = uv__loop_alive(loop);
if (!r)
uv__update_time(loop);
//这里就是那个被称作event loop的while loop
while (r != 0 && loop->stop_flag == 0) {
uv__update_time(loop);
uv__run_timers(loop);
ran_pending = uv__run_pending(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
timeout = 0;
if ((mode == UV_RUN_ONCE && !ran_pending) || mode == UV_RUN_DEFAULT)
timeout = uv_backend_timeout(loop);
uv__io_poll(loop, timeout);
uv__run_check(loop);
uv__run_closing_handles(loop);
if (mode == UV_RUN_ONCE) {
uv__update_time(loop);
uv__run_timers(loop);
}
r = uv__loop_alive(loop);
if (mode == UV_RUN_ONCE || mode == UV_RUN_NOWAIT)
break;
}
if (loop->stop_flag != 0)
loop->stop_flag = 0;
return r;
}

这里不断地循环,不断地判断event loop是否保持active,如果有handle存在,那么它将一直在我们背后运行。

而这里与我们有关的是uv__io_poll(),而poll这个词正暴露了它的内部实现机制epoll

首先进入了一个循环:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
while (!QUEUE_EMPTY(&loop->watcher_queue)) {
q = QUEUE_HEAD(&loop->watcher_queue);
QUEUE_REMOVE(q);
QUEUE_INIT(q);
w = QUEUE_DATA(q, uv__io_t, watcher_queue);
e.events = w->pevents;
e.data = w->fd;
if (w->events == 0)
op = UV__EPOLL_CTL_ADD;
else
op = UV__EPOLL_CTL_MOD;
if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();
assert(op == UV__EPOLL_CTL_ADD);
abort();
}
w->events = w->pevents;
}

每次从观察者队列里取出队头的观察者队列,然后根据它才取出w(实际的io_wathcer);并取出观察者感兴趣的pending events和与之绑定的fd

然后调用epoll机制三大方法之一的uv__epoll_ctl去注册io观察者感兴趣的pending状态的events。

接着又会进去一个无限循环for(;;),首先调用uv__epoll_wait等待上面注册的事件发生:

1
2
3
4
nfds = uv__epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);

epoll机制会在该fd产生观察者感兴趣的事件发生后返回,收集到事件会放到events数组里。

接着在这个循环里依次取出events(由epoll产生):

1
2
3
4
5
6
7
for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data;
w = loop->watchers[fd];
...
w->cb(loop, w, pe->events);
}

针对每个事件调用w->cb(loop, w, pe->events);而这个cb我们之前在`uv_tcp_listen()`里进行了设置:

1
tcp->io_watcher.cb = uv__server_io;

然后由于timeout在event loop里被设置为0,即表示立即返回,当下一次event loop来临时,根据epoll机制:


LT模式: 当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。

这样就不用担心两次事件循环之间的时间有事件来不及处理了。

这样这个与I/O有关的epoll操作结束,直到下一个event loop来临,又会去取观察者->注册感兴趣的事件->找epoll询问产生的事件->执行回调

回调进入js层面

顺藤摸瓜来到uv_server_io:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
uv__io_start(stream->loop, &stream->io_watcher, POLLIN);
while (uv__stream_fd(stream) != -1) {
assert(stream->accepted_fd == -1);
err = uv__accept(uv__stream_fd(stream));
if (err < 0) {
if (err == -EAGAIN || err == -EWOULDBLOCK)
return;
if (err == -ECONNABORTED)
continue; /* Ignore. Nothing we can do about that. */
if (err == -EMFILE || err == -ENFILE) {
err = uv__emfile_trick(loop, uv__stream_fd(stream));
if (err == -EAGAIN || err == -EWOULDBLOCK)
break;
}
stream->connection_cb(stream, err);
continue;
}

可以看到在调用回调函数之前,再次调用了uv__io_start(),这样就可以继续监听其他连接的到来了。

最后我们来看看stream->connection_cb(stream,err)的调用:

1
2
WrapType* wrap_data = static_cast<WrapType*>(handle->data);
wrap_data->MakeCallback(env->onconnection_string(), arraysize(argv), argv);

至此,通过在js层面通过Server.handle.onconnection设置的onconnection即这里的env->onconnection_string()就会被调用了。而在onconnection里:

1
2
3
4
5
6
7
8
9
10
11
12
13
var socket = new Socket({
handle: clientHandle,
allowHalfOpen: self.allowHalfOpen,
pauseOnCreate: self.pauseOnConnect
});
socket.readable = socket.writable = true;
self._connections++;
socket.server = self;
socket._server = self;
...
self.emit('connection', socket);

这样我们在最表层见到的connection事件,才会触发,再经过http的一层封装,才会触发request事件。

nice job!

总结

可以看到,调用listen是一个同步过程,即会调用C++层面的Listen,而这个Listen的作用就是将io观察者加入到loop->wathcer_queue里,完成后才会返回。

然后在V8执行js代码的背后,通过执行uv_run()开始event loop,背后的io异步,实际上是利用的epoll机制,里面是一个无限循环,epoll_wait()不断地监听watcher_queue里每个观察者期待的事件是否发生,如果发生,就生成events数组,events数组可能来自不同的fd,针对每一个fd分别调用它们所属观察者的回调函数。接着,进入下一个for循环。

可以看到暴露给应用层的异步网络I/O,内部实现还是同步的,因为epoll这种机制虽然是非阻塞的I/O多路复用,但是需要不断地去轮询事件的产生或者休眠,相当于还是阻塞了process。不过这些都是发生在V8外的event loop内部的,v8的线程没有被阻塞。而event loop的处理方式是要求epoll_wait()立即返回,通过循环的方式去调用它,这就有点类似于read方式了。

罗峡的博客 wechat
欢迎扫描上面的微信公众号二维码,关注我的个人公众号:全栈前端
坚持原创技术分享,您的支持将鼓励我继续创作!