Redis 源码学习 - ae 服务器模块

基于 Redis 6.0.10

通过这篇文章想说明几个问题:

  • 高性能服务器的组成
  • I/O 多路复用的使用
  • redis 定时器的实现

网络服务 API 声明

网络服务器启动一般需要经过必要的几个流程:

  • 通过 bind(), 把网络地址和 socket 绑定;
  • 通过 listen(), 实现网络地址监听;
  • 通过 accept(), 实现对 socket 连接的接管接受;
  • 通过 read()/recv(), 接受来自 socket 的数据;
  • 通过 write/send(), 发送来自 socket 的数据;
  • 通过 shutdown()和 close(), 关闭 socket 连接.

根据源码中搜索, 可以在 anet.c 中看到相关函数的调用, 它的头文件 anet.h 就是 Redis 对这些 socket 操作函数的封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 连接 TCP 服务器
int anetTcpConnect(char *err, const char *addr, int port);
int anetTcpNonBlockConnect(char *err, const char *addr, int port);
int anetTcpNonBlockBindConnect(char *err, const char *addr, int port, const char *source_addr);
int anetTcpNonBlockBestEffortBindConnect(char *err, const char *addr, int port, const char *source_addr);
// unix socket 连接
int anetUnixConnect(char *err, const char *path);
int anetUnixNonBlockConnect(char *err, const char *path);
// read() 封装, 获取传输数据
int anetRead(int fd, char *buf, int count);
// 对端信息获取
int anetResolve(char *err, char *host, char *ipbuf, size_t ipbuf_len);
int anetResolveIP(char *err, char *host, char *ipbuf, size_t ipbuf_len);
// 创建 TCP 服务器
int anetTcpServer(char *err, int port, char *bindaddr, int backlog);
// 创建 TCP 服务器(IPv6)
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog);
int anetUnixServer(char *err, char *path, mode_t perm, int backlog);
// 接收 TCP 客户端的连接
int anetTcpAccept(char *err, int serversock, char *ip, size_t ip_len, int *port);
int anetUnixAccept(char *err, int serversock);
// 发送数据
int anetWrite(int fd, char *buf, int count);
// 设置 socket 非阻塞
int anetNonBlock(char *err, int fd);
// 设置 socket 阻塞
int anetBlock(char *err, int fd);
// 设置 socket 使用 nagle 算法传输数据
int anetEnableTcpNoDelay(char *err, int fd);
// 设置 socket 不使用 nagle 算法传输数据
int anetDisableTcpNoDelay(char *err, int fd);
// 启用 tcp 保活功能
int anetTcpKeepAlive(char *err, int fd);
// 阻塞发送数据带超时
int anetSendTimeout(char *err, int fd, long long ms);
// 阻塞接受数据带超时
int anetRecvTimeout(char *err, int fd, long long ms);
// 获取对端连接信息(以字符串形式获取)
int anetPeerToString(int fd, char *ip, size_t ip_len, int *port);
int anetKeepAlive(char *err, int fd, int interval);
int anetSockName(int fd, char *ip, size_t ip_len, int *port);
int anetFormatAddr(char *fmt, size_t fmt_len, char *ip, int port);
int anetFormatPeer(int fd, char *fmt, size_t fmt_len);
int anetFormatSock(int fd, char *fmt, size_t fmt_len);

通过搜索 anet* 相关的函数调用, 就可以知道 Redis 是怎么启动与使用服务器的了.

这样封装的好处是可移植性高. 只要 Redis 内部约定全部只使用这些函数对连接进行操作和管理, 那么这之下就可以对 bind()、listen() 等系统调用进行替换. 比如使用 dpdk 编写自己的用户态协议栈的时候, 从而在不改变外层业务逻辑的前提下大幅提高程序的单机性能.

ae 服务器 API 声明

根据外层对上述 anet* 相关函数的调用, 可以找到 ae 服务器相关的 API, 在 src/ae.h 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 创建一个 ae 事件循环
aeEventLoop *aeCreateEventLoop(int setsize);
// 删除一个 ae 事件循环
void aeDeleteEventLoop(aeEventLoop *eventLoop);
// 停止一个 ae 事件循环
void aeStop(aeEventLoop *eventLoop);
// 往一个 ae 事件循环中添加文件描述符事件
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData);
void aeDeleteFileEvent(aeEventLoop *eventLoop, int fd, int mask);
int aeGetFileEvents(aeEventLoop *eventLoop, int fd);
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc);
int aeDeleteTimeEvent(aeEventLoop *eventLoop, long long id);
int aeProcessEvents(aeEventLoop *eventLoop, int flags);
int aeWait(int fd, int mask, long long milliseconds);
void aeMain(aeEventLoop *eventLoop);
char *aeGetApiName(void);
void aeSetBeforeSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *beforesleep);
void aeSetAfterSleepProc(aeEventLoop *eventLoop, aeBeforeSleepProc *aftersleep);
int aeGetSetSize(aeEventLoop *eventLoop);
int aeResizeSetSize(aeEventLoop *eventLoop, int setsize);
void aeSetDontWait(aeEventLoop *eventLoop, int noWait);

往下探究可以发现其实 ae 的实现会根据不同操作系统有不同的实现, 只要保证 ae 服务器对外只使用这些 API 接口, 就能任意替换底下实现 IO 多路复用的基础框架.

使用

下面通过源码介绍 Redis 是怎么使用 ae 服务器的.

创建 event_loop

在 server.c 中 main() 调用了 initServer() 来初始化服务器资源.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int main(int argc, char **argv) {
// ...
// 初始化服务器配置
initServerConfig();
// ...
// 初始化服务器
initServer();

if (!server.sentinel_mode) {
// 初始化服务器收尾, 创建 IO 线程池并让它们等待
InitServerLast();
// ...
} else {
// 初始化服务器收尾, 创建 IO 线程池并让它们等待
InitServerLast();
// ...
}

// 进行事件循环
aeMain(server.el);
// 销毁资源退出程序
aeDeleteEventLoop(server.el);
return 0;
}

initServer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
void initServer(void) {
// ...
// 创建 IO 多路复用的事件循环反应器
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
// ...
// 监听普通地址
/* Open the TCP listening socket for the user commands. */
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
// 监听 tls 地址, 需要 openssl 解密
if (server.tls_port != 0 &&
listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
exit(1);
// ...
// 创建事件循环的定时器任务
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}

/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
for (j = 0; j < server.ipfd_count; j++) {
// 设置事件循环的回调函数, 通过 acceptTcpHandler() 会调用 accept() 接受连接
// 只监听读事件
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
for (j = 0; j < server.tlsfd_count; j++) {
// 设置事件循环的回调函数, 通过 acceptTLSHandler() 会调用 accept() 接受连接
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
acceptTLSHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.tlsfd file event.");
}
}
// ...
}

listenToPort()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
int listenToPort(int port, int *fds, int *count) {
int j;

// server.bindaddr_count 中存储了配置文件和命令行中 bind 配置的地址数量
// server.bindaddr 中则存储了相应的地址
// 如果没有配置地址信息, 则默认监听所有地址
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
// 一般来说只有当没有配置地址时才会进入
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
// 通常来说, 在这里进行地址绑定与监听 anetTcp6Server() 只支持 IPv6
/* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
// 给连接用 socket 设置非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
}

// 如果 IPv6 绑定成功, 则进行 IPv4 绑定
// 如果 IPv6 绑定失败, 并且是不支持导致的, 则进行 IPv4 绑定
// 否则 IPv6 绑定失败且不是因为不支持 IPv6, 就退出
if (*count == 1 || unsupported) {
// 监听 IPv4 地址
/* Bind the IPv4 address as well. */
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
// 给连接用 socket 设置非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
}
}
// 绑定成功 退出循环
/* Exit the loop if we were able to bind * on IPv4 and IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count + unsupported == 2) break;
// IPv6 地址监听
} else if (strchr(server.bindaddr[j],':')) {
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
// IPv4 地址监听
} else {
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
}
// 监听失败
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
"Could not create server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
// 对监听配置了具体地址的 socket 设置非阻塞
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}

这里有个很重要的点是会对所有监听的 socket 设置非阻塞, 这样可以方便 Redis 在没有连接事件的时候做定时器任务或其他任务, 而不是让线程白白空闲, 可以减少线程数量降低上下文切换次数从而提高整机性能.

接收 TCP 客户端连接业务

接下来来看看 Redis 在接受客户端连接时做了什么操作, 即 Redis 的 Accept 业务.

acceptTcpHandler()

处理连接事件的回调函数, 通过 aeCreateFileEvent() 被添加到 event_loop 中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);

while(max--) {
// 创建连接用 socket, 返回值对应 sockfd, cip 存储服务端 ip 地址字符串, cport 存储服务端端口
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
// 答应服务端使用的 IP 地址与端口
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
// 调用建立连接完成后的回调函数
// connCreateAcceptedSocket 会创建 rpc 连接与应用层连接, 并设置连接状态, rpc 连接使用的回调函数集合为 CT_Socket
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}

acceptCommonHandler()

Linux 的协议栈层面 TCP 会话已经建立, Redis 应用层面的会话资源也创建好了, 但需要对连接进行详细配置并加入 event_loop 中.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
UNUSED(ip);

// 获取 rpc 连接状态
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
serverLog(LL_VERBOSE,
"Accepted client connection in error state: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn);
return;
}

// 连接数量超限, 给客户端报错同时主动断开 TCP 连接
/* Limit the number of connections we take at the same time.
*
* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected. */
if (listLength(server.clients) + getClusterConnectionsCount()
>= server.maxclients)
{
char *err;
if (server.cluster_enabled)
err = "-ERR max number of clients + cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";

// 这里是调用 connSocketWrite() 回调函数, 直接同步通过 fd 往客户端写数据
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing
* is written and the connection will just drop. */
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
// 这里是调用 connSocketClose() 回调函数, 从 event_loop 中删除连接监听事件, 并直接同步断开连接
connClose(conn);
return;
}

// 根据连接创建客户端资源
// 把 rpc 连接设置到 client 的 conn 中, 设置应用层读取回调函数, 把客户端资源放入 ae 服务器中
/* Create connection and client */
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}

/* Last chance to keep flags */
c->flags |= flags;

// connAccept() 会调用 connSocketAccept()
// 实际上就是设置连接状态, 然后通过 callHandler() 调用 clientAcceptHandler()
// 目的是进行数据库部分功能的业务处理
/* Initiate accept.
*
* Note that connAccept() is free to do two things here:
* 1. Call clientAcceptHandler() immediately;
* 2. Schedule a future call to clientAcceptHandler().
*
* Because of that, we must do nothing else afterwards.
*/
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}

createClient()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));

/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
// 配置文件中默认为 300s
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
// 设置连接的回调函数为 readQueryFromClient(), 会被 callHandler() 调用, 用于解析应用层数据
// 同时设置 event_loop 对该 fd 可读的回调函数为 connSocketEventHandler()
// 顺便把 fd 的 clientData 指向连接 conn
// 读取在 redis 中涉及到三个回调函数, 将在下面的接收 TCP 客户端数据业务中进行说明
connSetReadHandler(conn, readQueryFromClient);
// 把 client 结构当作 data 放进 conn 中
connSetPrivateData(conn, c);
}
// ...
// 把当前 client 插入 server.client 队列中, 尾插法, 并在 clinet->client_list_node 保存自己的位置
if (conn) linkClient(c);
// 初始化 multi 状态, 事务状态使用 ??
initClientMultiState(c);
return c;
}

clientAcceptHandler()

进行数据库部分功能的业务处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void clientAcceptHandler(connection *conn) {
client *c = connGetPrivateData(conn);

if (connGetState(conn) != CONN_STATE_CONNECTED) {
serverLog(LL_WARNING,
"Error accepting a client connection: %s",
connGetLastError(conn));
freeClientAsync(c);
return;
}

/* If the server is running in protected mode (the default) and there
* is no password set, nor a specific interface is bound, we don't accept
* requests from non loopback interfaces. Instead we try to explain the
* user what to do to fix it if needed. */
if (server.protected_mode &&
server.bindaddr_count == 0 &&
DefaultUser->flags & USER_FLAG_NOPASS &&
!(c->flags & CLIENT_UNIX_SOCKET))
{
char cip[NET_IP_STR_LEN+1] = { 0 };
connPeerToString(conn, cip, sizeof(cip)-1, NULL);

if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) {
char *err =
"-DENIED Redis is running in protected mode because protected "
"mode is enabled, no bind address was specified, no "
"authentication password is requested to clients. In this mode "
"connections are only accepted from the loopback interface. "
"If you want to connect from external computers to Redis you "
"may adopt one of the following solutions: "
"1) Just disable protected mode sending the command "
"'CONFIG SET protected-mode no' from the loopback interface "
"by connecting to Redis from the same host the server is "
"running, however MAKE SURE Redis is not publicly accessible "
"from internet if you do so. Use CONFIG REWRITE to make this "
"change permanent. "
"2) Alternatively you can just disable the protected mode by "
"editing the Redis configuration file, and setting the protected "
"mode option to 'no', and then restarting the server. "
"3) If you started the server manually just for testing, restart "
"it with the '--protected-mode no' option. "
"4) Setup a bind address or an authentication password. "
"NOTE: You only need to do one of the above things in order for "
"the server to start accepting connections from the outside.\r\n";
if (connWrite(c->conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClientAsync(c);
return;
}
}

server.stat_numconnections++;
// 数据库部分相关功能抽象成 module 进行处理, 这个函数最重要的部分
moduleFireServerEvent(REDISMODULE_EVENT_CLIENT_CHANGE,
REDISMODULE_SUBEVENT_CLIENT_CHANGE_CONNECTED,
c);
}

接收 TCP 客户端数据业务

Redis 在接收数据的时候, 可以看成有三个接收数据用的回调函数, 其中两个在 CT_Socket 中, 分别是 connSocketEventHandler()、connSocketRead(), 还有一个 readQueryFromClient().

connSocketEventHandler()

是最外层的接收数据函数, ae 服务器的 event_loop 的收到数据后调用的回调函数.

通过 gdb 调试可以看到:

1
2
3
4
5
6
7
8
9
10
Thread 1 "redis-server" hit Breakpoint 4, connSocketEventHandler (el=0x7ffff780b480, fd=8, 
clientData=0x7ffff78150c0, mask=1) at connection.c:261
261 if (conn->state == CONN_STATE_CONNECTING &&
(gdb) bt
#0 connSocketEventHandler (el=0x7ffff780b480, fd=8, clientData=0x7ffff78150c0, mask=1)
at connection.c:261
#1 0x00005555555929f7 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480,
flags=flags@entry=27) at ae.c:479
#2 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539
#3 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498

可以看到这个函数直接由 aeProcessEvents() 去回调的, 我的理解是这一层回调的功能是读取和输出的总控制. 正常情况都是读取客户端数据, 然后解析并计算请求, 然后将结果发送给客户端. 但这个情景是有可能存在特殊情况需要反转的. 翻阅了源码后, 发现 connSetWriteHandlerWithBarrier() 是唯一控制反转的 API, 而只有 clusterSendMessage() 和 handleClientsWithPendingWrites() 这两个函数调用了它, 和 Redis 的同步功能相关的地方才会出现业务反转现象, 具体到 Redis 的同步功能解析时再具体分析.

接着来看实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
UNUSED(el);
UNUSED(fd);
// clientData 是 conn 是在 createClient() 时设置的
connection *conn = clientData;

if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {

int conn_error = connGetSocketError(conn);
if (conn_error) {
conn->last_errno = conn_error;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}

if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);

if (!callHandler(conn, conn->conn_handler)) return;
conn->conn_handler = NULL;
}

// 判断是否需要反转, 正常情况是先执行完读取任务再执行写入任务
// 但在一些特殊情况, 比如同步到磁盘操作的时候, 需要先执行回写操作再执行读取操作
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsync'ing a file to disk,
* before replying to a client. */
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;

// 执行的是 readQueryFromClient()
/* Handle normal I/O flows */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
// 执行的是 ()
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}

readQueryFromClient()

是被 connSocketEvenHandler() 调用的 read_handler().

这个是真正用于接收和处理来自客户端数据的函数.

gdb 跟踪结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Thread 1 "redis-server" hit Breakpoint 1, readQueryFromClient (conn=0x7ffff78150c0)
at networking.c:1995
1995 client *c = connGetPrivateData(conn);
(gdb) bt
#0 readQueryFromClient (conn=0x7ffff78150c0) at networking.c:1995
#1 0x000055555562b818 in callHandler (handler=<optimized out>, conn=0x7ffff78150c0)
at connhelpers.h:79
#2 connSocketEventHandler (el=<optimized out>, fd=<optimized out>,
clientData=0x7ffff78150c0, mask=<optimized out>) at connection.c:296
#3 0x00005555555929f7 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480,
flags=flags@entry=27) at ae.c:479
#4 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539
#5 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498

接下来看它的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
void readQueryFromClient(connection *conn) {
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;

// 涉及到了 redis 6.0 才有的新特性, 在开启了 io 子线程的时候, 会把当前 client 加入 pending_client 队列中,
// 之后通过 handleClientsWithPendingReadsUsingThreads() 来获取 client 并进行子线程分发,
// 再由子线程在 IOThreadMain() 中去获取 client 并进行读取与处理, 最终还是调用这个函数对数据进行读取与分析处理
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
if (postponeClientRead(c)) return;

/* Update total number of reads on server */
server.stat_total_reads_processed++;

// 最多读取 1024*16, 即一次 read 最多读取 16k 的数据
readlen = PROTO_IOBUF_LEN;
// 如果遇到事务类型需要读取多次数据
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);

/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
if (remaining > 0 && remaining < readlen) readlen = remaining;
}

// 当前用于接收数据的 sds 缓冲区已用掉的大小
qblen = sdslen(c->querybuf);
// 更新偏移 ?? 偏移干吗用的
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 给接收缓冲器增加空间 ?? 为什么不是 querybuf_peak + readlen
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// connRead() 封装了 connSocketRead(), 就是用 read() 去读取 sockfd
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
// nonblock 的 read() 的正常返回, 不过因为用了 event_loop 一般不会调用到这个分支中
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
// 遇到主从复制状态的机器, 如果是 master, 把读出来的数据复制到 client->pending_querybuf 中,
// 之后通过 replicationFeedSlavesFromMasterStream() 发送给 slave 机器一份, 保证主从一致
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}

sdsIncrLen(c->querybuf,nread);
c->lastinteraction = server.unixtime;
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
server.stat_net_input_bytes += nread;
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();

bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
return;
}

// 处理接收到的消息
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
processInputBuffer(c);
}
postponeClientRead()

把 client 加入 clients_pending_read 队列等待后续的回调函数进行分发, 交给各个子线程去处理.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/* Return 1 if we want to handle the client read later using threaded I/O.
* This is called by the readable handler of the event loop.
* As a side effect of calling this function the client is put in the
* pending read clients and flagged as such. */
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}

connSocketRead()

读取 sockfd, read() 的封装.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int connSocketRead(connection *conn, void *buf, size_t buf_len) {
int ret = read(conn->fd, buf, buf_len);
if (!ret) {
conn->state = CONN_STATE_CLOSED;
} else if (ret < 0 && errno != EAGAIN) {
conn->last_errno = errno;

/* Don't overwrite the state of a connection that is not already
* connected, not to mess with handler callbacks.
*/
if (conn->state == CONN_STATE_CONNECTED)
conn->state = CONN_STATE_ERROR;
}

return ret;
}

发送数据给 TCP 客户端业务

Redis 的数据发送流程也是比较繁琐的, 回调函数就有好几个, 这里主要说下 CT_Socket 中的 write 和 write_handler 的区别.

write 是直接发送数据的方法; 而 write_handler 是特殊情况下发送数据的方法, 它也会调用 write. 特殊情况主要指发送反转时、数据一次无法发送完全时的情况.

Redis 发送数据不是在处理完接收与解析后立即进行的, 而是先会把结果保存到 client->buf 中或者 client->reply 中, 在下一次循环时由 beforeSleep() 去处理发送数据.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Thread 1 "redis-server" hit Breakpoint 7, connSocketWrite (conn=0x7ffff78150c0, 
data=0x7ffff791c960, data_len=5) at connection.c:167
167 static int connSocketWrite(connection *conn, const void *data, size_t data_len) {
(gdb) bt
#0 connSocketWrite (conn=0x7ffff78150c0, data=0x7ffff791c960, data_len=5) at connection.c:167
#1 0x00005555555a76fd in connWrite (data_len=<optimized out>, data=<optimized out>,
conn=<optimized out>) at connection.h:140
#2 writeToClient (c=0x7ffff791c700, handler_installed=0) at networking.c:1379
#3 0x00005555555a7946 in handleClientsWithPendingWrites () at networking.c:1497
#4 0x00005555555ad565 in handleClientsWithPendingWritesUsingThreads () at networking.c:3189
#5 0x00005555555960b2 in beforeSleep (eventLoop=<optimized out>) at server.c:2201
#6 beforeSleep (eventLoop=<optimized out>) at server.c:2117
#7 0x00005555555928b9 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480,
flags=flags@entry=27) at ae.c:443
#8 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539
#9 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498

handleClientsWithPendingWritesUsingThreads()

beforeSleep() 没啥好康的, handleClientsWithPendingWritesUsingThreads() 才是处理发送数据业务的地方.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */

// 当 IO 线程较少或者禁止 IO 线程的时候, 直接由主线程进行数据发送
/* If I/O threads are disabled or we have few clients to serve, don't
* use I/O threads, but thejboring synchronous code. */
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
// 把记录在每个 client 中的数据发送出去
return handleClientsWithPendingWrites();
}

/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();

if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);

/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;

/* Remove clients from the list of pending writes since
* they are going to be closed ASAP. */
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}

int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}

/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}

// 发送数据
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);

/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");

// 发现数据没发完, 设置 write_handler() 为 sendReplyToClient() 下次再发
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);

/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);

/* Update processed count on server */
server.stat_io_writes_processed += processed;

return processed;
}

handleClientsWithPendingWrites()

这个函数会先从全局 server 对象的 clients_pending_write 字段(存储 client 对象的链表)挨个取出有数据要发送的 client 对象, 然后调用 writeToClient() 尝试将 client 中存储的应答数据发出去.

当然, 可能存在一种情况是, 由于网络或者客户端的原因, redis-server 某个客户端的数据发送不出去, 或者只有部分可以发出去(例如, 服务器端给客户端发数据, 客户端的应用层一直不从 TCP 内核缓冲区中取出数据, 这样服务器发送一段时间的数据后, 客户端内核缓冲区满了, 服务器再发数据就会发不出去, 由于 fd 是非阻塞的, 这个时候服务器调用 send() 或者 write() 会直接返回, 返回值是 −1, 错误码是 EAGAIN. 不管哪种情况, 数据这一次发不完, 这个时候就需要监听可写事件了.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);

listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);

/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;

/* Don't write to clients that are going to be closed anyway. */
if (c->flags & CLIENT_CLOSE_ASAP) continue;

// 发送数据
/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue;

// 发现数据没有全部发完
// 通过 connSetWriteHandlerWithBarrier() 注册可写事件
// 设置到 epoll 中, 标记 AE_WRITABLE
// 设置回调函数 sendReplyToClient() 去发送数据
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}

connSocketWrite() 是 write() 的封装, 而 connWrite() 是 connSocketWrite() 的封装. 在业务层面发送数据都是通过 writeToClient() 去发送的.

writeToClient()

writeToClient() 先把自己处理的 client 对象的 buf 字段的数据发出去, 如果出错的话则释放这个 client. 如果数据能够全部发完, 发完以后则会移除对应的 fd 上的可写事件(如果添加了); 如果当前 client 设置了 CLIENT_CLOSE_AFTER_REPLY 标志, 则发送完数据立即释放这个 client 对象.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/* Write data in output buffers to client. Return C_OK if the client
* is still valid after the call, C_ERR if it was freed because of some
* error. If handler_installed is set, it will attempt to clear the
* write event.
*
* This function is called by threads, but always with handler_installed
* set to 0. So when handler_installed is set to 0 the function must be
* thread safe. */
int writeToClient(client *c, int handler_installed) {
/* Update total number of writes on server */
server.stat_total_writes_processed++;

ssize_t nwritten = 0, totwritten = 0;
size_t objlen;
clientReplyBlock *o;

// 判断是否还有待发送数据
while(clientHasPendingReplies(c)) {
if (c->bufpos > 0) {
// 直接发送
nwritten = connWrite(c->conn,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;

/* If the buffer was sent, set bufpos to zero to continue with
* the remainder of the reply. */
if ((int)c->sentlen == c->bufpos) {
c->bufpos = 0;
c->sentlen = 0;
}
} else {
o = listNodeValue(listFirst(c->reply));
objlen = o->used;

if (objlen == 0) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
continue;
}

nwritten = connWrite(c->conn, o->buf + c->sentlen, objlen - c->sentlen);
if (nwritten <= 0) break;
c->sentlen += nwritten;
totwritten += nwritten;

/* If we fully sent the object on head go to the next one */
if (c->sentlen == objlen) {
c->reply_bytes -= o->size;
listDelNode(c->reply,listFirst(c->reply));
c->sentlen = 0;
/* If there are no longer objects in the list, we expect
* the count of reply bytes to be exactly zero. */
if (listLength(c->reply) == 0)
serverAssert(c->reply_bytes == 0);
}
}
/* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT
* bytes, in a single threaded server it's a good idea to serve
* other clients as well, even if a very large request comes from
* super fast link that is always able to accept data (in real world
* scenario think about 'KEYS *' against the loopback interface).
*
* However if we are over the maxmemory limit we ignore that and
* just deliver as much data as it is possible to deliver.
*
* Moreover, we also send as much as possible if the client is
* a slave or a monitor (otherwise, on high-speed traffic, the
* replication/output buffer will grow indefinitely) */
if (totwritten > NET_MAX_WRITES_PER_EVENT &&
(server.maxmemory == 0 ||
zmalloc_used_memory() < server.maxmemory) &&
!(c->flags & CLIENT_SLAVE)) break;
}
server.stat_net_output_bytes += totwritten;
if (nwritten == -1) {
if (connGetState(c->conn) == CONN_STATE_CONNECTED) {
nwritten = 0;
} else {
serverLog(LL_VERBOSE,
"Error writing to client: %s", connGetLastError(c->conn));
freeClientAsync(c);
return C_ERR;
}
}
if (totwritten > 0) {
/* For clients representing masters we don't count sending data
* as an interaction, since we always send REPLCONF ACK commands
* that take some time to just fill the socket output buffer.
* We just rely on data / pings received for timeout detection. */
if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime;
}
// 数据发送完毕
if (!clientHasPendingReplies(c)) {
c->sentlen = 0;
// 数据已经发送完了, handler_installed 代表本次发送的是上次没发送完的数据, 被设置了 write_handler 需要清空回调函数
/* Note that writeToClient() is called in a threaded way, but
* adDeleteFileEvent() is not thread safe: however writeToClient()
* is always called with handler_installed set to 0 from threads
* so we are fine. */
if (handler_installed) connSetWriteHandler(c->conn, NULL);

/* Close connection after entire reply has been sent. */
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) {
freeClientAsync(c);
return C_ERR;
}
}
return C_OK;
}
clientHasPendingReplies()

如果 bufpos 没置 0 即代表还有数据没有发送完全.

c->reply 中存放的都是对象, 如果不是空的代表有数据未发送完.

1
2
3
4
5
/* Return true if the specified client has pending reply buffers to write to
* the socket. */
int clientHasPendingReplies(client *c) {
return c->bufpos || listLength(c->reply);
}

总结下 redis 的数据发送:

如果有数据要发送给某个 client, 不需要专门注册可写事件等触发可写事件再发送.

通常的做法是在应答数据产生的地方直接发送, 如果是因为对端 TCP 窗口太小引起的发送不完, 则将剩余的数据存储至某个缓冲区并注册监听可写事件, 等下次触发可写事件后再尝试发送, 一直到数据全部发送完毕后移除可写事件.

而 redis-server 数据的发送逻辑与这个稍微有点差别, 就是将数据发送的时机放到了 event_loop 的某个时间点上(这里是在 ProcessEvents 之前), 其他的与上面完全一样.

之所以不注册监听可写事件, 可写事件触发再发送数据, 原因是通常情况下, 网络通信的两端数据一般都是正常收发的, 不会出现某一端由于 TCP 窗口太小而使另外一端发不出去的情况. 如果注册监听可写事件, 那么这个事件会频繁触发, 而触发时不一定有数据需要发送, 这样不仅浪费系统资源, 同时也浪费服务器程序宝贵的 CPU 时间片.

sendReplyToClient()

1
2
3
4
5
/* Write event handler. Just send data to the client. */
void sendReplyToClient(connection *conn) {
client *c = connGetPrivateData(conn);
writeToClient(c,1);
}

定时器业务的实现

Redis 中定时器的用途是用于 Cron 任务.

Redis 的定时器实现比较简单粗暴, 并不会对定时任务按照到期事件进行排序来达到减小遍历的优化, 是每次都会去遍历整个定时器任务链表的. 详细的参考 aeCreateTimeEvent()->processTimeEvents() 代码解析.

命令解析功能的实现

这里稍微带一点 redis 客户端与服务器之间 resp 协议解析的功能实现.

client 的结构体中 reqtype 代表当前解析状态机解析类型, argc 代表当前命令数量, argv 存储命令对应结构体对象的地址.

在 readQueryFromClient() 处理的最后一步, 会把数据交给 processInputBuffer() 进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/* This function is called every time, in the client structure 'c', there is
* more query buffer to process, because we read more data from the socket
* or because a client was blocked and later reactivated, so there could be
* pending query buffer, already representing a full command, to process. */
void processInputBuffer(client *c) {
// 解析不依赖临时 pos, 直接使用 client 内部的 qb_pos 进行
/* Keep processing while there is something in the input buffer */
while(c->qb_pos < sdslen(c->querybuf)) {
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;

/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;

/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;

/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;

/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;

/* Determine request type when unknown. */
if (!c->reqtype) {
// 判断数据是不是以字符 "*" 开头
if (c->querybuf[c->qb_pos] == '*') {
// 解析状态机类型切换成 multibulk 模式
c->reqtype = PROTO_REQ_MULTIBULK;
} else {
// 解析状态机类型切换成 inline 模式
c->reqtype = PROTO_REQ_INLINE;
}
}

// 还原命令, 结果存储在 argc 与 argv 中
if (c->reqtype == PROTO_REQ_INLINE) {
// inline 类型处理调用 processInlineBuffer() 进行
if (processInlineBuffer(c) != C_OK) break;
/* If the Gopher mode and we got zero or one argument, process
* the request in Gopher mode. To avoid data race, Redis won't
* support Gopher if enable io threads to read queries. */
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
// multibulk 类型处理调用 processMultibulkBuffer() 进行
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}

// 执行还原出来的命令
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}

/* We are finally ready to execute the command. */
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}

/* Trim to pos */
if (c->qb_pos) {
sdsrange(c->querybuf,c->qb_pos,-1);
c->qb_pos = 0;
}
}

还原出来的命令通过 processCommandAndResetClient() 进行执行.

processCommandAndResetClient()

实际是由 processCommand() 来执行命令, commandProcessed() 来还原状态机.

processCommand() 流程大致如下:

  • 先判断是不是 quit 命令, 如果是, 则往发送缓冲区中添加一条应答命令(应答 redis 客户端), 并给当前 client 对象设置 CLIENT_CLOSE_AFTER_REPLY 标志, 即应答完毕后关闭连接;

  • 如果不是 quit 命令, 则使用 lookupCommand() 从全局命令字典表中查找相应的命令, 如果出错, 则向发送缓冲区中添加出错应答, 出错不是指程序逻辑出错, 有可能是客户端发送的非法命令; 如果找到相应的命令, 则执行命令后添加应答.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* This function calls processCommand(), but also performs a few sub tasks
* for the client that are useful in that context:
*
* 1. It sets the current client to the client 'c'.
* 2. calls commandProcessed() if the command was handled.
*
* The function returns C_ERR in case the client was freed as a side effect
* of processing the command, otherwise C_OK is returned. */
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
if (processCommand(c) == C_OK) {
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}

命令回复功能的实现

有了命令解析, 也稍微讲讲命令回复功能的实现.

根据前面命令解析的部分, 应该也可以发现, 实际是通过 addReply*() 来实现的.

以 addReply() 来当样本.

addReply()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
//
if (prepareClientToWrite(c) != C_OK) return;

if (sdsEncodedObject(obj)) {
//
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr));
} else if (obj->encoding == OBJ_ENCODING_INT) {
/* For integer encoded strings we just convert it into a string
* using our optimized function, and attach the resulting string
* to the output buffer. */
char buf[32];
size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr);
if (_addReplyToBuffer(c,buf,len) != C_OK)
_addReplyProtoToList(c,buf,len);
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}

addReply() 类函数有两个关键的地方, 一个是 prepareClientToWrite(), 还一个 _addReplyToBuffer().

prepareClientToWrite()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int prepareClientToWrite(client *c) {
// ...
// clientHasPendingReplies() 判断发送缓冲区中是否还有未发送的应答命令
// 通过 client->bufpos(int)和 client->reply(链表)的长度是否大于 0 判断
/* Schedule the client to write the output buffers to the socket, unless
* it should already be setup to do so (it has already pending data).
*
* If CLIENT_PENDING_READ is set, we're in an IO thread and should
* not install a write handler. Instead, it will be done by
* handleClientsWithPendingReadsUsingThreads() upon return.
*/
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
// 设置 CLIENT_PENDING_WRITE 标志, 并将当前 client 添加到 server->clients_pending_write 链表中
clientInstallWriteHandler(c);

/* Authorize the caller to queue in the output buffer of this client. */
return C_OK;
}

CLIENT_PENDING_WRITE 在 Redis 中代表一个有数据需要发送, 但是还没有注册可写事件的 client 对象.

_addReplyToBuffer()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/* Attempts to add the reply to the static buffer in the client struct.
* Returns C_ERR if the buffer is full, or the reply list is not empty,
* in which case the reply must be added to the reply list. */
int _addReplyToBuffer(client *c, const char *s, size_t len) {
size_t available = sizeof(c->buf)-c->bufpos;

if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;

// 判断发送缓冲区中是否还有未发送的应答命令
/* If there already are entries in the reply list, we cannot
* add anything more to the static buffer. */
if (listLength(c->reply) > 0) return C_ERR;

/* Check that the buffer has enough space available for this string. */
if (len > available) return C_ERR;

memcpy(c->buf+c->bufpos,s,len);
c->bufpos+=len;
return C_OK;
}

client->reply 链表存储的是待发送的应答命令, client->buf 存储应答命令, 其长度被记录在 client->bufpos 字段中, client->buf 是一个固定大小的字节数组, 大小为 PROTO_REPLY_CHUNK_BYTES = 16*1024.

写入 reply 与 buf 后, 把 client 加入 server->clients_pending_write, 接下来就等下次循环进行待发送任务处理了.

流程图

ae 服务器部分 API 实现

struct aeEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/* State of an event based program */
typedef struct aeEventLoop {
// 当前注册进该 event_loop 中最大的文件描述符
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;

struct aeFileEvent

普通任务

1
2
3
4
5
6
7
8
9
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
// 读事件回调函数
aeFileProc *rfileProc;
// 写事件回调函数
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;

struct aeFiredEvent

过期任务

1
2
3
4
5
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;

struct aeTimeEvent

1
2
3
4
5
6
7
8
9
10
11
12
13
/* Time event structure */
typedef struct aeTimeEvent {
long long id; /* time event identifier. */
long when_sec; /* seconds */
long when_ms; /* milliseconds */
aeTimeProc *timeProc;
aeEventFinalizerProc *finalizerProc;
void *clientData;
struct aeTimeEvent *prev;
struct aeTimeEvent *next;
int refcount; /* refcount to prevent timer events from being
* freed in recursive time event calls. */
} aeTimeEvent;

aeCreateEventLoop()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// setsize 说明当前 event_poll 最多支持监听的事件数量
// setsize 大小默认为 10000(配置文件中的 maxclients) + 128(CONFIG_FDSET_INCR) = 10128
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
int i;

if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 预分配任务的内存空间
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
// 根据不同的操作系统有不同的实现, 底下是不同的 IO 复用框架
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
eventLoop->events[i].mask = AE_NONE;
return eventLoop;

err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}

aeApiCreate()

创建一个 IO 多路复用的结构.

以 epoll 来举例, 源文件在 src/ae_epoll.c 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// 用于存放不同 IO 多路复用用到的资源, 不会对外暴露
typedef struct aeApiState {
int epfd;
struct epoll_event *events;
} aeApiState;

static int aeApiCreate(aeEventLoop *eventLoop) {
aeApiState *state = zmalloc(sizeof(aeApiState));

if (!state) return -1;
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
eventLoop->apidata = state;
return 0;
}

aeCreateFileEvent()

往 event_loop 中添加一个文件描述符监听事件.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 检查文件描述符是否大小异常
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 保存进用户态的事件队列中
aeFileEvent *fe = &eventLoop->events[fd];

// 往 IO 多路复用组建中添加监控事件
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 设置 ae 自己的事件队列项状态
fe->mask |= mask;
// 设置回调函数 ?? 这里感觉可以用个 else 提供性能
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}

aeApiAddEvent()

把一个监听事件从用户态添加到 IO 多路复用组件中.

以 epoll 来举例, 源文件在 src/ae_epoll.c 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
// 自动判断是修改还是增加事件
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;

ee.events = 0;
// 得到 ae 自己的任务标志
mask |= eventLoop->events[fd].mask; /* Merge old events */
// 设置对应的 epoll 的任务标志
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 往 epoll 中添加或删除任务
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}

aeCreateTimeEvent()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// milliseconds 代表几秒之后进行任务
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;

te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
te->id = id;
// 计算绝对到期事件并保存到 aeTimeEvent 中
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// 双向链表 头插法
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
eventLoop->timeEventHead = te;
return id;
}

aeMain()

死循环, 每次循环处理 n 次事件.

1
2
3
4
5
6
7
8
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}

aeProcessEvents()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
* if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
* if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;

/* Nothing to do? return ASAP */
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;

if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 搜索最早的定时器任务
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 格式化时间戳格式, 组成的 tvp 用于后面判断是否要阻塞等待
long now_sec, now_ms;

aeGetTime(&now_sec, &now_ms);
tvp = &tv;

/* How many milliseconds we need to wait for the next
* time event to fire? */
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;

if (ms > 0) {
tvp->tv_sec = ms/1000;
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else {
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}

if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}

if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);

/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);

/* After sleep callback. */
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);

// 遍历处理当前所有已触发事件
for (j = 0; j < numevents; j++) {
// 通过激活事件的 fd 找到用户态事件存储队列中相应的事件, 其中包含了回调函数
// ?? 直接去除 fired 队列直接使用 event 指针回传不好吗
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */

/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;

/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}

/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}

processed++;
}
}
// 处理到期的定时器任务
/* Check time events */
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);

return processed; /* return the number of processed file/time events */
}

aeSearchNearestTimer()

通过绝对时间获取最近的一个的定时器任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/* Search the first timer to fire.
* This operation is useful to know how many time the select can be
* put in sleep without to delay any event.
* If there are no timers NULL is returned.
*
* Note that's O(N) since time events are unsorted.
* Possible optimizations (not needed by Redis so far, but...):
* 1) Insert the event in order, so that the nearest is just the head.
* Much better but still insertion or deletion of timers is O(N).
* 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)).
*/
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
aeTimeEvent *te = eventLoop->timeEventHead;
aeTimeEvent *nearest = NULL;

while(te) {
if (!nearest || te->when_sec < nearest->when_sec ||
(te->when_sec == nearest->when_sec &&
te->when_ms < nearest->when_ms))
nearest = te;
te = te->next;
}
return nearest;
}

aeApiPoll()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;

retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;

numevents = retval;
// 把所有激活事件放入 fired 队列中
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;

if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE|AE_READABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE|AE_READABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}

processTimeEvents()

处理定时器任务.

到期定时任务的处理顺序是类似栈的倒序, 因为定时器任务双向链表是头插法.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);

// 如果当前计算机的时间被向未来调整过, 那么就强制执行一遍全部的定时任务, 方法是全部设置已到期
/* If the system clock is moved to the future, and then set back to the
* right value, time events may be delayed in a random way. Often this
* means that scheduled operations will not be performed soon enough.
*
* Here we try to detect system clock skews, and force all the time
* events to be processed ASAP when this happens: the idea is that
* processing events earlier is less dangerous than delaying them
* indefinitely, and practice suggests it is. */
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;

te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
// 遍历每一个定时任务
while(te) {
long now_sec, now_ms;
long long id;

/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
/* If a reference exists for this timer event,
* don't free it. This is currently incremented
* for recursive timerProc calls */
if (te->refcount) {
te = next;
continue;
}
if (te->prev)
te->prev->next = te->next;
else
eventLoop->timeEventHead = te->next;
if (te->next)
te->next->prev = te->prev;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}

// 目前没啥用的防御性代码
/* Make sure we don't process time events created by time events in
* this iteration. Note that this check is currently useless: we always
* add new timers on the head, however if we change the implementation
* detail, this check may be useful again: we keep it here for future
* defense. */
if (te->id > maxId) {
te = te->next;
continue;
}
// 判断是否过期, 过期就执行
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;

id = te->id;
te->refcount++;
retval = te->timeProc(eventLoop, id, te->clientData);
te->refcount--;
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
te = te->next;
}
return processed;
}

网络服务部分 API 实现

anetTcpServer()

创建 IPv4 监听 socket

1
2
3
4
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}

anetTcp6Server()

创建 IPv6 监听 socket

1
2
3
4
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}

_anetTcpServer()

根据配置中的 IP 和端口生成 socket 并监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;

snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
// 设置 hints
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */

// 如果 bindaddr 外面设置的是 NULL, 则会根据网卡配置获取真实可用的地址信息
// 因为 hint 的存在, af = AF_INET 只会得到 IPv4 地址, af = AF_INET6 只会得到 IPv6 地址
// 如果 bindaddr 设置了具体地址, 则只会得到该地址的信息
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
for (p = servinfo; p != NULL; p = p->ai_next) {
// 根据地址信息创建监听 socket
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;

// 设置 IPv6 地址不兼容 IPv4 地址
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
// 设置地址 time_wait 时可重用
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
// 监听 socket
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket, errno: %d", errno);
goto error;
}

error:
if (s != -1) close(s);
s = ANET_ERR;
end:
// 释放地址信息, 此内存块为系统申请, 需要通过 freeaddrinfo 释放, 否则造成内存泄漏
freeaddrinfo(servinfo);
return s;
}

anetListen()

绑定加监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}

if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}

anetTcpAccept()

接受连接, 并获取连接信息.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;

if (sa.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}

anetGenericAccept()

接受连接.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}

其他

struct client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
connection *conn;
int resp; /* RESP protocol version. Can be 2 or 3. */
redisDb *db; /* Pointer to currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t qb_pos; /* The position we have read in querybuf. */
sds pending_querybuf; /* If this client is flagged as master, this buffer
represents the yet not applied portion of the
replication stream that we are receiving from
the master. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
size_t argv_len_sum; /* Sum of lengths of objects in argv list. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
user *user; /* User associated with this connection. If the
user is set to NULL the connection can do
anything (admin). */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
uint64_t flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* Needed when the default user requires auth. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on first ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long read_reploff; /* Read replication offset if this is a master. */
long long reploff; /* Applied replication offset if this is a master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
listNode *client_list_node; /* list node in client list */
RedisModuleUserChangedFunc auth_callback; /* Module callback to execute
* when the authenticated user
* changes. */
void *auth_callback_privdata; /* Private data that is passed when the auth
* changed callback is executed. Opaque for
* Redis Core. */
void *auth_module; /* The module that owns the callback, which is used
* to disconnect the client if the module is
* unloaded for cleanup. Opaque for Redis Core.*/

/* If this client is in tracking mode and this field is non zero,
* invalidation messages for keys fetched by this client will be send to
* the specified client ID. */
uint64_t client_tracking_redirection;
rax *client_tracking_prefixes; /* A dictionary of prefixes we are already
subscribed to in BCAST mode, in the
context of client side caching. */
/* In clientsCronTrackClientsMemUsage() we track the memory usage of
* each client and add it to the sum of all the clients of a given type,
* however we need to remember what was the old contribution of each
* client, and in which categoty the client was, in order to remove it
* before adding it the new value. */
uint64_t client_cron_last_memory_usage;
int client_cron_last_memory_type;
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;