voidacceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { int cport, cfd, max = MAX_ACCEPTS_PER_CALL; char cip[NET_IP_STR_LEN]; UNUSED(el); UNUSED(mask); UNUSED(privdata);
// 连接数量超限, 给客户端报错同时主动断开 TCP 连接 /* Limit the number of connections we take at the same time. * * Admission control will happen before a client is created and connAccept() * called, because we don't want to even start transport-level negotiation * if rejected. */ if (listLength(server.clients) + getClusterConnectionsCount() >= server.maxclients) { char *err; if (server.cluster_enabled) err = "-ERR max number of clients + cluster " "connections reached\r\n"; else err = "-ERR max number of clients reached\r\n";
// 这里是调用 connSocketWrite() 回调函数, 直接同步通过 fd 往客户端写数据 /* That's a best effort error message, don't check write errors. * Note that for TLS connections, no handshake was done yet so nothing * is written and the connection will just drop. */ if (connWrite(conn,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; // 这里是调用 connSocketClose() 回调函数, 从 event_loop 中删除连接监听事件, 并直接同步断开连接 connClose(conn); return; }
// 根据连接创建客户端资源 // 把 rpc 连接设置到 client 的 conn 中, 设置应用层读取回调函数, 把客户端资源放入 ae 服务器中 /* Create connection and client */ if ((c = createClient(conn)) == NULL) { serverLog(LL_WARNING, "Error registering fd event for the new client: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); connClose(conn); /* May be already closed, just ignore errors */ return; }
/* Last chance to keep flags */ c->flags |= flags;
// connAccept() 会调用 connSocketAccept() // 实际上就是设置连接状态, 然后通过 callHandler() 调用 clientAcceptHandler() // 目的是进行数据库部分功能的业务处理 /* Initiate accept. * * Note that connAccept() is free to do two things here: * 1. Call clientAcceptHandler() immediately; * 2. Schedule a future call to clientAcceptHandler(). * * Because of that, we must do nothing else afterwards. */ if (connAccept(conn, clientAcceptHandler) == C_ERR) { char conninfo[100]; if (connGetState(conn) == CONN_STATE_ERROR) serverLog(LL_WARNING, "Error accepting a client connection: %s (conn: %s)", connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); freeClient(connGetPrivateData(conn)); return; } }
/* passing NULL as conn it is possible to create a non connected client. * This is useful since all the commands needs to be executed * in the context of a client. When commands are executed in other * contexts (for instance a Lua script) we need a non connected client. */ if (conn) { connNonBlock(conn); connEnableTcpNoDelay(conn); // 配置文件中默认为 300s if (server.tcpkeepalive) connKeepAlive(conn,server.tcpkeepalive); // 设置连接的回调函数为 readQueryFromClient(), 会被 callHandler() 调用, 用于解析应用层数据 // 同时设置 event_loop 对该 fd 可读的回调函数为 connSocketEventHandler() // 顺便把 fd 的 clientData 指向连接 conn // 读取在 redis 中涉及到三个回调函数, 将在下面的接收 TCP 客户端数据业务中进行说明 connSetReadHandler(conn, readQueryFromClient); // 把 client 结构当作 data 放进 conn 中 connSetPrivateData(conn, c); } // ... // 把当前 client 插入 server.client 队列中, 尾插法, 并在 clinet->client_list_node 保存自己的位置 if (conn) linkClient(c); // 初始化 multi 状态, 事务状态使用 ?? initClientMultiState(c); return c; }
if (connGetState(conn) != CONN_STATE_CONNECTED) { serverLog(LL_WARNING, "Error accepting a client connection: %s", connGetLastError(conn)); freeClientAsync(c); return; }
/* If the server is running in protected mode (the default) and there * is no password set, nor a specific interface is bound, we don't accept * requests from non loopback interfaces. Instead we try to explain the * user what to do to fix it if needed. */ if (server.protected_mode && server.bindaddr_count == 0 && DefaultUser->flags & USER_FLAG_NOPASS && !(c->flags & CLIENT_UNIX_SOCKET)) { char cip[NET_IP_STR_LEN+1] = { 0 }; connPeerToString(conn, cip, sizeof(cip)-1, NULL);
if (strcmp(cip,"127.0.0.1") && strcmp(cip,"::1")) { char *err = "-DENIED Redis is running in protected mode because protected " "mode is enabled, no bind address was specified, no " "authentication password is requested to clients. In this mode " "connections are only accepted from the loopback interface. " "If you want to connect from external computers to Redis you " "may adopt one of the following solutions: " "1) Just disable protected mode sending the command " "'CONFIG SET protected-mode no' from the loopback interface " "by connecting to Redis from the same host the server is " "running, however MAKE SURE Redis is not publicly accessible " "from internet if you do so. Use CONFIG REWRITE to make this " "change permanent. " "2) Alternatively you can just disable the protected mode by " "editing the Redis configuration file, and setting the protected " "mode option to 'no', and then restarting the server. " "3) If you started the server manually just for testing, restart " "it with the '--protected-mode no' option. " "4) Setup a bind address or an authentication password. " "NOTE: You only need to do one of the above things in order for " "the server to start accepting connections from the outside.\r\n"; if (connWrite(c->conn,err,strlen(err)) == -1) { /* Nothing to do, Just to avoid the warning... */ } server.stat_rejected_conn++; freeClientAsync(c); return; } }
Thread 1 "redis-server" hit Breakpoint 4, connSocketEventHandler (el=0x7ffff780b480, fd=8, clientData=0x7ffff78150c0, mask=1) at connection.c:261 261 if (conn->state == CONN_STATE_CONNECTING && (gdb) bt #0 connSocketEventHandler (el=0x7ffff780b480, fd=8, clientData=0x7ffff78150c0, mask=1) at connection.c:261 #1 0x00005555555929f7 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480, flags=flags@entry=27) at ae.c:479 #2 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539 #3 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498
int conn_error = connGetSocketError(conn); if (conn_error) { conn->last_errno = conn_error; conn->state = CONN_STATE_ERROR; } else { conn->state = CONN_STATE_CONNECTED; }
if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
if (!callHandler(conn, conn->conn_handler)) return; conn->conn_handler = NULL; }
// 判断是否需要反转, 正常情况是先执行完读取任务再执行写入任务 // 但在一些特殊情况, 比如同步到磁盘操作的时候, 需要先执行回写操作再执行读取操作 /* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if WRITE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsync'ing a file to disk, * before replying to a client. */ int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler; int call_read = (mask & AE_READABLE) && conn->read_handler;
// 执行的是 readQueryFromClient() /* Handle normal I/O flows */ if (!invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } // 执行的是 () /* Fire the writable event. */ if (call_write) { if (!callHandler(conn, conn->write_handler)) return; } /* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert && call_read) { if (!callHandler(conn, conn->read_handler)) return; } }
readQueryFromClient()
是被 connSocketEvenHandler() 调用的 read_handler().
这个是真正用于接收和处理来自客户端数据的函数.
gdb 跟踪结果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
Thread 1 "redis-server" hit Breakpoint 1, readQueryFromClient (conn=0x7ffff78150c0) at networking.c:1995 1995 client *c = connGetPrivateData(conn); (gdb) bt #0 readQueryFromClient (conn=0x7ffff78150c0) at networking.c:1995 #1 0x000055555562b818 in callHandler (handler=<optimized out>, conn=0x7ffff78150c0) at connhelpers.h:79 #2 connSocketEventHandler (el=<optimized out>, fd=<optimized out>, clientData=0x7ffff78150c0, mask=<optimized out>) at connection.c:296 #3 0x00005555555929f7 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480, flags=flags@entry=27) at ae.c:479 #4 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539 #5 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498
// 涉及到了 redis 6.0 才有的新特性, 在开启了 io 子线程的时候, 会把当前 client 加入 pending_client 队列中, // 之后通过 handleClientsWithPendingReadsUsingThreads() 来获取 client 并进行子线程分发, // 再由子线程在 IOThreadMain() 中去获取 client 并进行读取与处理, 最终还是调用这个函数对数据进行读取与分析处理 /* Check if we want to read from the client later when exiting from * the event loop. This is the case if threaded I/O is enabled. */ if (postponeClientRead(c)) return;
/* Update total number of reads on server */ server.stat_total_reads_processed++;
// 最多读取 1024*16, 即一次 read 最多读取 16k 的数据 readlen = PROTO_IOBUF_LEN; // 如果遇到事务类型需要读取多次数据 /* If this is a multi bulk request, and we are processing a bulk reply * that is large enough, try to maximize the probability that the query * buffer contains exactly the SDS string representing the object, even * at the risk of requiring more read(2) calls. This way the function * processMultiBulkBuffer() can avoid copying buffers to create the * Redis Object representing the argument. */ if (c->reqtype == PROTO_REQ_MULTIBULK && c->multibulklen && c->bulklen != -1 && c->bulklen >= PROTO_MBULK_BIG_ARG) { ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
/* Note that the 'remaining' variable may be zero in some edge case, * for example once we resume a blocked client after CLIENT PAUSE. */ if (remaining > 0 && remaining < readlen) readlen = remaining; }
// 当前用于接收数据的 sds 缓冲区已用掉的大小 qblen = sdslen(c->querybuf); // 更新偏移 ?? 偏移干吗用的 if (c->querybuf_peak < qblen) c->querybuf_peak = qblen; // 给接收缓冲器增加空间 ?? 为什么不是 querybuf_peak + readlen c->querybuf = sdsMakeRoomFor(c->querybuf, readlen); // connRead() 封装了 connSocketRead(), 就是用 read() 去读取 sockfd nread = connRead(c->conn, c->querybuf+qblen, readlen); if (nread == -1) { if (connGetState(conn) == CONN_STATE_CONNECTED) { // nonblock 的 read() 的正常返回, 不过因为用了 event_loop 一般不会调用到这个分支中 return; } else { serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn)); freeClientAsync(c); return; } } elseif (nread == 0) { serverLog(LL_VERBOSE, "Client closed connection"); freeClientAsync(c); return; } elseif (c->flags & CLIENT_MASTER) { // 遇到主从复制状态的机器, 如果是 master, 把读出来的数据复制到 client->pending_querybuf 中, // 之后通过 replicationFeedSlavesFromMasterStream() 发送给 slave 机器一份, 保证主从一致 /* Append the query buffer to the pending (not applied) buffer * of the master. We'll use this buffer later in order to have a * copy of the string applied by the last command executed. */ c->pending_querybuf = sdscatlen(c->pending_querybuf, c->querybuf+qblen,nread); }
sdsIncrLen(c->querybuf,nread); c->lastinteraction = server.unixtime; if (c->flags & CLIENT_MASTER) c->read_reploff += nread; server.stat_net_input_bytes += nread; if (sdslen(c->querybuf) > server.client_max_querybuf_len) { sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
// 处理接收到的消息 /* There is more data in the client input buffer, continue parsing it * in case to check if there is a full command to execute. */ processInputBuffer(c); }
/* Return 1 if we want to handle the client read later using threaded I/O. * This is called by the readable handler of the event loop. * As a side effect of calling this function the client is put in the * pending read clients and flagged as such. */ intpostponeClientRead(client *c) { if (server.io_threads_active && server.io_threads_do_reads && !ProcessingEventsWhileBlocked && !(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ))) { c->flags |= CLIENT_PENDING_READ; listAddNodeHead(server.clients_pending_read,c); return1; } else { return0; } }
/* Don't overwrite the state of a connection that is not already * connected, not to mess with handler callbacks. */ if (conn->state == CONN_STATE_CONNECTED) conn->state = CONN_STATE_ERROR; }
Thread 1 "redis-server" hit Breakpoint 7, connSocketWrite (conn=0x7ffff78150c0, data=0x7ffff791c960, data_len=5) at connection.c:167 167 static int connSocketWrite(connection *conn, const void *data, size_t data_len) { (gdb) bt #0 connSocketWrite (conn=0x7ffff78150c0, data=0x7ffff791c960, data_len=5) at connection.c:167 #1 0x00005555555a76fd in connWrite (data_len=<optimized out>, data=<optimized out>, conn=<optimized out>) at connection.h:140 #2 writeToClient (c=0x7ffff791c700, handler_installed=0) at networking.c:1379 #3 0x00005555555a7946 in handleClientsWithPendingWrites () at networking.c:1497 #4 0x00005555555ad565 in handleClientsWithPendingWritesUsingThreads () at networking.c:3189 #5 0x00005555555960b2 in beforeSleep (eventLoop=<optimized out>) at server.c:2201 #6 beforeSleep (eventLoop=<optimized out>) at server.c:2117 #7 0x00005555555928b9 in aeProcessEvents (eventLoop=eventLoop@entry=0x7ffff780b480, flags=flags@entry=27) at ae.c:443 #8 0x0000555555592d3d in aeMain (eventLoop=0x7ffff780b480) at ae.c:539 #9 0x000055555558f536 in main (argc=3, argv=0x7fffffffe438) at server.c:5498
inthandleClientsWithPendingWritesUsingThreads(void) { int processed = listLength(server.clients_pending_write); if (processed == 0) return0; /* Return ASAP if there are no clients. */
// 当 IO 线程较少或者禁止 IO 线程的时候, 直接由主线程进行数据发送 /* If I/O threads are disabled or we have few clients to serve, don't * use I/O threads, but thejboring synchronous code. */ if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) { // 把记录在每个 client 中的数据发送出去 return handleClientsWithPendingWrites(); }
/* Start threads if needed. */ if (!server.io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */ listIter li; listNode *ln; listRewind(server.clients_pending_write,&li); int item_id = 0; while((ln = listNext(&li))) { client *c = listNodeValue(ln); c->flags &= ~CLIENT_PENDING_WRITE;
/* Remove clients from the list of pending writes since * they are going to be closed ASAP. */ if (c->flags & CLIENT_CLOSE_ASAP) { listDelNode(server.clients_pending_write, ln); continue; }
int target_id = item_id % server.io_threads_num; listAddNodeTail(io_threads_list[target_id],c); item_id++; }
/* Give the start condition to the waiting threads, by setting the * start condition atomic var. */ io_threads_op = IO_THREADS_OP_WRITE; for (int j = 1; j < server.io_threads_num; j++) { int count = listLength(io_threads_list[j]); io_threads_pending[j] = count; }
// 发送数据 /* Also use the main thread to process a slice of clients. */ listRewind(io_threads_list[0],&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln); writeToClient(c,0); } listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */ while(1) { unsignedlong pending = 0; for (int j = 1; j < server.io_threads_num; j++) pending += io_threads_pending[j]; if (pending == 0) break; } if (tio_debug) printf("I/O WRITE All threads finshed\n");
// 发现数据没发完, 设置 write_handler() 为 sendReplyToClient() 下次再发 /* Run the list of clients again to install the write handler where * needed. */ listRewind(server.clients_pending_write,&li); while((ln = listNext(&li))) { client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some * of the clients. */ if (clientHasPendingReplies(c) && connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) { freeClientAsync(c); } } listEmpty(server.clients_pending_write);
/* Update processed count on server */ server.stat_io_writes_processed += processed;
/* This function is called just before entering the event loop, in the hope * we can just write the replies to the client output buffer without any * need to use a syscall in order to install the writable event handler, * get it called, and so forth. */ inthandleClientsWithPendingWrites(void) { listIter li; listNode *ln; int processed = listLength(server.clients_pending_write);
/* If a client is protected, don't do anything, * that may trigger write error or recreate handler. */ if (c->flags & CLIENT_PROTECTED) continue;
/* Don't write to clients that are going to be closed anyway. */ if (c->flags & CLIENT_CLOSE_ASAP) continue;
// 发送数据 /* Try to write buffers to the client socket. */ if (writeToClient(c,0) == C_ERR) continue;
// 发现数据没有全部发完 // 通过 connSetWriteHandlerWithBarrier() 注册可写事件 // 设置到 epoll 中, 标记 AE_WRITABLE // 设置回调函数 sendReplyToClient() 去发送数据 /* If after the synchronous writes above we still have data to * output to the client, we need to install the writable handler. */ if (clientHasPendingReplies(c)) { int ae_barrier = 0; /* For the fsync=always policy, we want that a given FD is never * served for reading and writing in the same event loop iteration, * so that in the middle of receiving the query, and serving it * to the client, we'll call beforeSleep() that will do the * actual fsync of AOF to disk. the write barrier ensures that. */ if (server.aof_state == AOF_ON && server.aof_fsync == AOF_FSYNC_ALWAYS) { ae_barrier = 1; } if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { freeClientAsync(c); } } } return processed; }
/* Write data in output buffers to client. Return C_OK if the client * is still valid after the call, C_ERR if it was freed because of some * error. If handler_installed is set, it will attempt to clear the * write event. * * This function is called by threads, but always with handler_installed * set to 0. So when handler_installed is set to 0 the function must be * thread safe. */ intwriteToClient(client *c, int handler_installed) { /* Update total number of writes on server */ server.stat_total_writes_processed++;
/* If the buffer was sent, set bufpos to zero to continue with * the remainder of the reply. */ if ((int)c->sentlen == c->bufpos) { c->bufpos = 0; c->sentlen = 0; } } else { o = listNodeValue(listFirst(c->reply)); objlen = o->used;
/* If we fully sent the object on head go to the next one */ if (c->sentlen == objlen) { c->reply_bytes -= o->size; listDelNode(c->reply,listFirst(c->reply)); c->sentlen = 0; /* If there are no longer objects in the list, we expect * the count of reply bytes to be exactly zero. */ if (listLength(c->reply) == 0) serverAssert(c->reply_bytes == 0); } } /* Note that we avoid to send more than NET_MAX_WRITES_PER_EVENT * bytes, in a single threaded server it's a good idea to serve * other clients as well, even if a very large request comes from * super fast link that is always able to accept data (in real world * scenario think about 'KEYS *' against the loopback interface). * * However if we are over the maxmemory limit we ignore that and * just deliver as much data as it is possible to deliver. * * Moreover, we also send as much as possible if the client is * a slave or a monitor (otherwise, on high-speed traffic, the * replication/output buffer will grow indefinitely) */ if (totwritten > NET_MAX_WRITES_PER_EVENT && (server.maxmemory == 0 || zmalloc_used_memory() < server.maxmemory) && !(c->flags & CLIENT_SLAVE)) break; } server.stat_net_output_bytes += totwritten; if (nwritten == -1) { if (connGetState(c->conn) == CONN_STATE_CONNECTED) { nwritten = 0; } else { serverLog(LL_VERBOSE, "Error writing to client: %s", connGetLastError(c->conn)); freeClientAsync(c); return C_ERR; } } if (totwritten > 0) { /* For clients representing masters we don't count sending data * as an interaction, since we always send REPLCONF ACK commands * that take some time to just fill the socket output buffer. * We just rely on data / pings received for timeout detection. */ if (!(c->flags & CLIENT_MASTER)) c->lastinteraction = server.unixtime; } // 数据发送完毕 if (!clientHasPendingReplies(c)) { c->sentlen = 0; // 数据已经发送完了, handler_installed 代表本次发送的是上次没发送完的数据, 被设置了 write_handler 需要清空回调函数 /* Note that writeToClient() is called in a threaded way, but * adDeleteFileEvent() is not thread safe: however writeToClient() * is always called with handler_installed set to 0 from threads * so we are fine. */ if (handler_installed) connSetWriteHandler(c->conn, NULL);
/* Close connection after entire reply has been sent. */ if (c->flags & CLIENT_CLOSE_AFTER_REPLY) { freeClientAsync(c); return C_ERR; } } return C_OK; }
clientHasPendingReplies()
如果 bufpos 没置 0 即代表还有数据没有发送完全.
c->reply 中存放的都是对象, 如果不是空的代表有数据未发送完.
1 2 3 4 5
/* Return true if the specified client has pending reply buffers to write to * the socket. */ intclientHasPendingReplies(client *c) { return c->bufpos || listLength(c->reply); }
/* Write event handler. Just send data to the client. */ voidsendReplyToClient(connection *conn) { client *c = connGetPrivateData(conn); writeToClient(c,1); }
/* This function is called every time, in the client structure 'c', there is * more query buffer to process, because we read more data from the socket * or because a client was blocked and later reactivated, so there could be * pending query buffer, already representing a full command, to process. */ voidprocessInputBuffer(client *c) { // 解析不依赖临时 pos, 直接使用 client 内部的 qb_pos 进行 /* Keep processing while there is something in the input buffer */ while(c->qb_pos < sdslen(c->querybuf)) { /* Return if clients are paused. */ if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */ if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending * commands to execute in c->argv. */ if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script * condition on the slave. We want just to accumulate the replication * stream (instead of replying -BUSY like we do with other clients) and * later resume the processing. */ if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is * written to the client. Make sure to not let the reply grow after * this flag has been set (i.e. don't process more commands). * * The same applies for clients we want to terminate ASAP. */ if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */ if (!c->reqtype) { // 判断数据是不是以字符 "*" 开头 if (c->querybuf[c->qb_pos] == '*') { // 解析状态机类型切换成 multibulk 模式 c->reqtype = PROTO_REQ_MULTIBULK; } else { // 解析状态机类型切换成 inline 模式 c->reqtype = PROTO_REQ_INLINE; } }
// 还原命令, 结果存储在 argc 与 argv 中 if (c->reqtype == PROTO_REQ_INLINE) { // inline 类型处理调用 processInlineBuffer() 进行 if (processInlineBuffer(c) != C_OK) break; /* If the Gopher mode and we got zero or one argument, process * the request in Gopher mode. To avoid data race, Redis won't * support Gopher if enable io threads to read queries. */ if (server.gopher_enabled && !server.io_threads_do_reads && ((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') || c->argc == 0)) { processGopherRequest(c); resetClient(c); c->flags |= CLIENT_CLOSE_AFTER_REPLY; break; } } elseif (c->reqtype == PROTO_REQ_MULTIBULK) { // multibulk 类型处理调用 processMultibulkBuffer() 进行 if (processMultibulkBuffer(c) != C_OK) break; } else { serverPanic("Unknown request type"); }
// 执行还原出来的命令 /* Multibulk processing could see a <= 0 length. */ if (c->argc == 0) { resetClient(c); } else { /* If we are in the context of an I/O thread, we can't really * execute the command here. All we can do is to flag the client * as one that needs to process the command. */ if (c->flags & CLIENT_PENDING_READ) { c->flags |= CLIENT_PENDING_COMMAND; break; }
/* We are finally ready to execute the command. */ if (processCommandAndResetClient(c) == C_ERR) { /* If the client is no longer valid, we avoid exiting this * loop and trimming the client buffer later. So we return * ASAP in that case. */ return; } } }
/* Trim to pos */ if (c->qb_pos) { sdsrange(c->querybuf,c->qb_pos,-1); c->qb_pos = 0; } }
/* This function calls processCommand(), but also performs a few sub tasks * for the client that are useful in that context: * * 1. It sets the current client to the client 'c'. * 2. calls commandProcessed() if the command was handled. * * The function returns C_ERR in case the client was freed as a side effect * of processing the command, otherwise C_OK is returned. */ intprocessCommandAndResetClient(client *c) { int deadclient = 0; server.current_client = c; if (processCommand(c) == C_OK) { commandProcessed(c); } if (server.current_client == NULL) deadclient = 1; server.current_client = NULL; /* freeMemoryIfNeeded may flush slave output buffers. This may * result into a slave, that may be the active client, to be * freed. */ return deadclient ? C_ERR : C_OK; }
/* Add the object 'obj' string representation to the client output buffer. */ voidaddReply(client *c, robj *obj) { // if (prepareClientToWrite(c) != C_OK) return;
if (sdsEncodedObject(obj)) { // if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK) _addReplyProtoToList(c,obj->ptr,sdslen(obj->ptr)); } elseif (obj->encoding == OBJ_ENCODING_INT) { /* For integer encoded strings we just convert it into a string * using our optimized function, and attach the resulting string * to the output buffer. */ char buf[32]; size_t len = ll2string(buf,sizeof(buf),(long)obj->ptr); if (_addReplyToBuffer(c,buf,len) != C_OK) _addReplyProtoToList(c,buf,len); } else { serverPanic("Wrong obj->encoding in addReply()"); } }
intprepareClientToWrite(client *c) { // ... // clientHasPendingReplies() 判断发送缓冲区中是否还有未发送的应答命令 // 通过 client->bufpos(int)和 client->reply(链表)的长度是否大于 0 判断 /* Schedule the client to write the output buffers to the socket, unless * it should already be setup to do so (it has already pending data). * * If CLIENT_PENDING_READ is set, we're in an IO thread and should * not install a write handler. Instead, it will be done by * handleClientsWithPendingReadsUsingThreads() upon return. */ if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ)) // 设置 CLIENT_PENDING_WRITE 标志, 并将当前 client 添加到 server->clients_pending_write 链表中 clientInstallWriteHandler(c);
/* Authorize the caller to queue in the output buffer of this client. */ return C_OK; }
/* Attempts to add the reply to the static buffer in the client struct. * Returns C_ERR if the buffer is full, or the reply list is not empty, * in which case the reply must be added to the reply list. */ int _addReplyToBuffer(client *c, constchar *s, size_t len) { size_t available = sizeof(c->buf)-c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY) return C_OK;
// 判断发送缓冲区中是否还有未发送的应答命令 /* If there already are entries in the reply list, we cannot * add anything more to the static buffer. */ if (listLength(c->reply) > 0) return C_ERR;
/* Check that the buffer has enough space available for this string. */ if (len > available) return C_ERR;
/* State of an event based program */ typedefstructaeEventLoop { // 当前注册进该 event_loop 中最大的文件描述符 int maxfd; /* highest file descriptor currently registered */ int setsize; /* max number of file descriptors tracked */ longlong timeEventNextId; time_t lastTime; /* Used to detect system clock skew */ aeFileEvent *events; /* Registered events */ aeFiredEvent *fired; /* Fired events */ aeTimeEvent *timeEventHead; int stop; void *apidata; /* This is used for polling API specific data */ aeBeforeSleepProc *beforesleep; aeBeforeSleepProc *aftersleep; int flags; } aeEventLoop;
struct aeFileEvent
普通任务
1 2 3 4 5 6 7 8 9
/* File event structure */ typedefstructaeFileEvent { int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ // 读事件回调函数 aeFileProc *rfileProc; // 写事件回调函数 aeFileProc *wfileProc; void *clientData; } aeFileEvent;
struct aeFiredEvent
过期任务
1 2 3 4 5
/* A fired event */ typedefstructaeFiredEvent { int fd; int mask; } aeFiredEvent;
struct aeTimeEvent
1 2 3 4 5 6 7 8 9 10 11 12 13
/* Time event structure */ typedefstructaeTimeEvent { longlong id; /* time event identifier. */ long when_sec; /* seconds */ long when_ms; /* milliseconds */ aeTimeProc *timeProc; aeEventFinalizerProc *finalizerProc; void *clientData; structaeTimeEvent *prev; structaeTimeEvent *next; int refcount; /* refcount to prevent timer events from being * freed in recursive time event calls. */ } aeTimeEvent;
if (!state) return-1; state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize); if (!state->events) { zfree(state); return-1; } state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */ if (state->epfd == -1) { zfree(state->events); zfree(state); return-1; } eventLoop->apidata = state; return0; }
staticintaeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { aeApiState *state = eventLoop->apidata; structepoll_eventee = {0}; /* avoid valgrind warning */ // 自动判断是修改还是增加事件 /* If the fd was already monitored for some event, we need a MOD * operation. Otherwise we need an ADD operation. */ int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
/* Process every pending time event, then every pending file event * (that may be registered by time event callbacks just processed). * Without special flags the function sleeps until some file event * fires, or when the next time event occurs (if any). * * If flags is 0, the function does nothing and returns. * if flags has AE_ALL_EVENTS set, all the kind of events are processed. * if flags has AE_FILE_EVENTS set, file events are processed. * if flags has AE_TIME_EVENTS set, time events are processed. * if flags has AE_DONT_WAIT set the function returns ASAP until all * the events that's possible to process without to wait are processed. * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called. * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called. * * The function returns the number of events processed. */ intaeProcessEvents(aeEventLoop *eventLoop, int flags) { int processed = 0, numevents;
/* Nothing to do? return ASAP */ if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return0;
/* Note that we want call select() even if there are no * file events to process as long as we want to process time * events, in order to sleep until the next time event is ready * to fire. */ if (eventLoop->maxfd != -1 || ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) { int j; aeTimeEvent *shortest = NULL; structtimevaltv, *tvp;
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT)) // 搜索最早的定时器任务 shortest = aeSearchNearestTimer(eventLoop); if (shortest) { // 格式化时间戳格式, 组成的 tvp 用于后面判断是否要阻塞等待 long now_sec, now_ms;
aeGetTime(&now_sec, &now_ms); tvp = &tv;
/* How many milliseconds we need to wait for the next * time event to fire? */ longlong ms = (shortest->when_sec - now_sec)*1000 + shortest->when_ms - now_ms;
if (ms > 0) { tvp->tv_sec = ms/1000; tvp->tv_usec = (ms % 1000)*1000; } else { tvp->tv_sec = 0; tvp->tv_usec = 0; } } else { /* If we have to check for events but need to return * ASAP because of AE_DONT_WAIT we need to set the timeout * to zero */ if (flags & AE_DONT_WAIT) { tv.tv_sec = tv.tv_usec = 0; tvp = &tv; } else { /* Otherwise we can block */ tvp = NULL; /* wait forever */ } }
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP) eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when * some event fires. */ numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */ if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) eventLoop->aftersleep(eventLoop);
// 遍历处理当前所有已触发事件 for (j = 0; j < numevents; j++) { // 通过激活事件的 fd 找到用户态事件存储队列中相应的事件, 其中包含了回调函数 // ?? 直接去除 fired 队列直接使用 event 指针回传不好吗 aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; int mask = eventLoop->fired[j].mask; int fd = eventLoop->fired[j].fd; int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable * event later. This is useful as sometimes we may be able * to serve the reply of a query immediately after processing the * query. * * However if AE_BARRIER is set in the mask, our application is * asking us to do the reverse: never fire the writable event * after the readable. In such a case, we invert the calls. * This is useful when, for instance, we want to do things * in the beforeSleep() hook, like fsyncing a file to disk, * before replying to a client. */ int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already * processed event removed an element that fired and we still * didn't processed, so we check if the event is still valid. * * Fire the readable event if the call sequence is not * inverted. */ if (!invert && fe->mask & mask & AE_READABLE) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ }
/* Fire the writable event. */ if (fe->mask & mask & AE_WRITABLE) { if (!fired || fe->wfileProc != fe->rfileProc) { fe->wfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
/* If we have to invert the call, fire the readable event now * after the writable one. */ if (invert) { fe = &eventLoop->events[fd]; /* Refresh in case of resize. */ if ((fe->mask & mask & AE_READABLE) && (!fired || fe->wfileProc != fe->rfileProc)) { fe->rfileProc(eventLoop,fd,fe->clientData,mask); fired++; } }
processed++; } } // 处理到期的定时器任务 /* Check time events */ if (flags & AE_TIME_EVENTS) processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */ }
/* Search the first timer to fire. * This operation is useful to know how many time the select can be * put in sleep without to delay any event. * If there are no timers NULL is returned. * * Note that's O(N) since time events are unsorted. * Possible optimizations (not needed by Redis so far, but...): * 1) Insert the event in order, so that the nearest is just the head. * Much better but still insertion or deletion of timers is O(N). * 2) Use a skiplist to have this operation as O(1) and insertion as O(log(N)). */ static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop) { aeTimeEvent *te = eventLoop->timeEventHead; aeTimeEvent *nearest = NULL;
/* Process time events */ staticintprocessTimeEvents(aeEventLoop *eventLoop) { int processed = 0; aeTimeEvent *te; longlong maxId; time_t now = time(NULL);
// 如果当前计算机的时间被向未来调整过, 那么就强制执行一遍全部的定时任务, 方法是全部设置已到期 /* If the system clock is moved to the future, and then set back to the * right value, time events may be delayed in a random way. Often this * means that scheduled operations will not be performed soon enough. * * Here we try to detect system clock skews, and force all the time * events to be processed ASAP when this happens: the idea is that * processing events earlier is less dangerous than delaying them * indefinitely, and practice suggests it is. */ if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } } eventLoop->lastTime = now;
te = eventLoop->timeEventHead; maxId = eventLoop->timeEventNextId-1; // 遍历每一个定时任务 while(te) { long now_sec, now_ms; longlong id;
/* Remove events scheduled for deletion. */ if (te->id == AE_DELETED_EVENT_ID) { aeTimeEvent *next = te->next; /* If a reference exists for this timer event, * don't free it. This is currently incremented * for recursive timerProc calls */ if (te->refcount) { te = next; continue; } if (te->prev) te->prev->next = te->next; else eventLoop->timeEventHead = te->next; if (te->next) te->next->prev = te->prev; if (te->finalizerProc) te->finalizerProc(eventLoop, te->clientData); zfree(te); te = next; continue; }
// 目前没啥用的防御性代码 /* Make sure we don't process time events created by time events in * this iteration. Note that this check is currently useless: we always * add new timers on the head, however if we change the implementation * detail, this check may be useful again: we keep it here for future * defense. */ if (te->id > maxId) { te = te->next; continue; } // 判断是否过期, 过期就执行 aeGetTime(&now_sec, &now_ms); if (now_sec > te->when_sec || (now_sec == te->when_sec && now_ms >= te->when_ms)) { int retval;
staticint _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog) { int s = -1, rv; char _port[6]; /* strlen("65535") */ structaddrinfohints, *servinfo, *p;
typedefstructclient { uint64_t id; /* Client incremental unique ID. */ connection *conn; int resp; /* RESP protocol version. Can be 2 or 3. */ redisDb *db; /* Pointer to currently SELECTed DB. */ robj *name; /* As set by CLIENT SETNAME. */ sds querybuf; /* Buffer we use to accumulate client queries. */ size_t qb_pos; /* The position we have read in querybuf. */ sds pending_querybuf; /* If this client is flagged as master, this buffer represents the yet not applied portion of the replication stream that we are receiving from the master. */ size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */ int argc; /* Num of arguments of current command. */ robj **argv; /* Arguments of current command. */ size_t argv_len_sum; /* Sum of lengths of objects in argv list. */ structredisCommand *cmd, *lastcmd;/* Last command executed. */ user *user; /* User associated with this connection. If the user is set to NULL the connection can do anything (admin). */ int reqtype; /* Request protocol type: PROTO_REQ_* */ int multibulklen; /* Number of multi bulk arguments left to read. */ long bulklen; /* Length of bulk argument in multi bulk request. */ list *reply; /* List of reply objects to send to the client. */ unsignedlonglong reply_bytes; /* Tot bytes of objects in reply list. */ size_t sentlen; /* Amount of bytes already sent in the current buffer or object being sent. */ time_t ctime; /* Client creation time. */ time_t lastinteraction; /* Time of the last interaction, used for timeout */ time_t obuf_soft_limit_reached_time; uint64_t flags; /* Client flags: CLIENT_* macros. */ int authenticated; /* Needed when the default user requires auth. */ int replstate; /* Replication state if this is a slave. */ int repl_put_online_on_ack; /* Install slave write handler on first ACK. */ int repldbfd; /* Replication DB file descriptor. */ off_t repldboff; /* Replication DB file offset. */ off_t repldbsize; /* Replication DB file size. */ sds replpreamble; /* Replication DB preamble. */ longlong read_reploff; /* Read replication offset if this is a master. */ longlong reploff; /* Applied replication offset if this is a master. */ longlong repl_ack_off; /* Replication ack offset, if this is a slave. */ longlong repl_ack_time;/* Replication ack time, if this is a slave. */ longlong psync_initial_offset; /* FULLRESYNC reply offset other slaves copying this slave output buffer should use. */ char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */ int slave_listening_port; /* As configured with: REPLCONF listening-port */ char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */ int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */ multiState mstate; /* MULTI/EXEC state */ int btype; /* Type of blocking op if CLIENT_BLOCKED. */ blockingState bpop; /* blocking state */ longlong woff; /* Last write global replication offset. */ list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */ dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */ list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */ sds peerid; /* Cached peer ID. */ listNode *client_list_node; /* list node in client list */ RedisModuleUserChangedFunc auth_callback; /* Module callback to execute * when the authenticated user * changes. */ void *auth_callback_privdata; /* Private data that is passed when the auth * changed callback is executed. Opaque for * Redis Core. */ void *auth_module; /* The module that owns the callback, which is used * to disconnect the client if the module is * unloaded for cleanup. Opaque for Redis Core.*/
/* If this client is in tracking mode and this field is non zero, * invalidation messages for keys fetched by this client will be send to * the specified client ID. */ uint64_t client_tracking_redirection; rax *client_tracking_prefixes; /* A dictionary of prefixes we are already subscribed to in BCAST mode, in the context of client side caching. */ /* In clientsCronTrackClientsMemUsage() we track the memory usage of * each client and add it to the sum of all the clients of a given type, * however we need to remember what was the old contribution of each * client, and in which categoty the client was, in order to remove it * before adding it the new value. */ uint64_t client_cron_last_memory_usage; int client_cron_last_memory_type; /* Response buffer */ int bufpos; char buf[PROTO_REPLY_CHUNK_BYTES]; } client;