当Redis启动之后,会创建事件循环,不断地处理文件事件和时间事件。在注册服务端套接字时,会为其设置事件回调函数,当请求到来时,会调用这些回调函数来建立连接。本文就来分析一下连接的建立过程。
虽然Redis会监听Unix Socket和TLS套接字,但是实际工作中还是用TCP最多,所以本文只分析TCP方式。
TCP监听回调
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
// MAX_ACCEPTS_PER_CALL值为1000,表示每次事件循环最多只处理1000个连接。
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
// 每次最多只能处理1000个连接
while(max--) {
// 接收客户端的连接,并返回数据套接字文件描述符
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
/*
* connCreateAcceptedSocket函数创建并返回connection对象;
* acceptCommonHandler函数执行连接建立后的逻辑;
*/
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
接受请求
int anetTcpAccept(char *err, int s, char *ip, size_t ip_len, int *port) {
int fd;
struct sockaddr_storage sa;
socklen_t salen = sizeof(sa);
// 执行accept,接受请求
if ((fd = anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)) == -1)
return ANET_ERR;
if (sa.ss_family == AF_INET) {
struct sockaddr_in *s = (struct sockaddr_in *)&sa;
if (ip) inet_ntop(AF_INET,(void*)&(s->sin_addr),ip,ip_len);
if (port) *port = ntohs(s->sin_port);
} else {
struct sockaddr_in6 *s = (struct sockaddr_in6 *)&sa;
if (ip) inet_ntop(AF_INET6,(void*)&(s->sin6_addr),ip,ip_len);
if (port) *port = ntohs(s->sin6_port);
}
return fd;
}
static int anetGenericAccept(char *err, int s, struct sockaddr *sa, socklen_t *len) {
int fd;
while(1) {
// 调用accept系统调用,接受连接
fd = accept(s,sa,len);
if (fd == -1) {
if (errno == EINTR)
continue;
else {
anetSetError(err, "accept: %s", strerror(errno));
return ANET_ERR;
}
}
break;
}
return fd;
}
调用Linux的accept系统调用来接受连接请求。
创建连接
connection *connCreateAcceptedSocket(int fd) {
// 创建Connection对象
connection *conn = connCreateSocket();
conn->fd = fd;
// 设置连接状态
conn->state = CONN_STATE_ACCEPTING;
return conn;
}
connection *connCreateSocket() {
// 创建Connection对象
connection *conn = zcalloc(sizeof(connection));
// 设置连接类型,包含了处理客户端连接通道的函数
conn->type = &CT_Socket;
conn->fd = -1;
return conn;
}
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine,
.get_type = connSocketGetType
};
在上面函数中,创建了连接对象并设置type字段,后续会调用到里面的操作函数。
处理连接
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
char conninfo[100];
UNUSED(ip);
// 判断连接状态
if (connGetState(conn) != CONN_STATE_ACCEPTING) {
serverLog(LL_VERBOSE,
"Accepted client connection in error state: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn);
return;
}
/* Limit the number of connections we take at the same time.
*
* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected. */
// 如果客户端连接数量加上cluster连接数量已经超过了server.maxclients配置项。
if (listLength(server.clients) + getClusterConnectionsCount()
>= server.maxclients)
{
char *err;
if (server.cluster_enabled)
err = "-ERR max number of clients + cluster "
"connections reached\r\n";
else
err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing
* is written and the connection will just drop. */
// 响应错误
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
// 拒绝计数自增
server.stat_rejected_conn++;
// 关闭连接
connClose(conn);
return;
}
/* Create connection and client */
// 创建client对象,用于存储客户端数据
if ((c = createClient(conn)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
// 保存标记
c->flags |= flags;
/* Initiate accept.
*
* Note that connAccept() is free to do two things here:
* 1. Call clientAcceptHandler() immediately;
* 2. Schedule a future call to clientAcceptHandler().
*
* Because of that, we must do nothing else afterwards.
*/
// 接收客户端连接,并执行函数clientAcceptHandler。
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}
重点是调用了createClient函数来创建客户端对象。
创建客户端
client *createClient(connection *conn) {
// 创建client对象
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connNonBlock(conn); // 设置为非阻塞模式
connEnableTcpNoDelay(conn); // 关闭TCP的delay选项
if (server.tcpkeepalive)
// 开启TCP的keepAlive选项,服务器定时向客户端发送ACK进行探测。
connKeepAlive(conn,server.tcpkeepalive);
// 设置读事件的处理函数,会往事件循环中新增事件
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
// 选择0号数据库
selectDb(c,0);
uint64_t client_id = ++server.next_client_id;
// 设置一系列的客户端属性
c->id = client_id;
c->resp = 2;
c->conn = conn;
c->name = NULL;
c->bufpos = 0;
c->qb_pos = 0;
c->querybuf = sdsempty();
c->pending_querybuf = sdsempty();
c->querybuf_peak = 0;
c->reqtype = 0; // 请求类型初始化为0
c->argc = 0;
c->argv = NULL;
c->argv_len_sum = 0;
c->cmd = c->lastcmd = NULL;
c->user = DefaultUser;
c->multibulklen = 0;
c->bulklen = -1;
c->sentlen = 0;
c->flags = 0;
c->ctime = c->lastinteraction = server.unixtime;
/* If the default user does not require authentication, the user is
* directly authenticated. */
c->authenticated = (c->user->flags & USER_FLAG_NOPASS) &&
!(c->user->flags & USER_FLAG_DISABLED);
c->replstate = REPL_STATE_NONE;
c->repl_put_online_on_ack = 0;
c->reploff = 0;
c->read_reploff = 0;
c->repl_ack_off = 0;
c->repl_ack_time = 0;
c->slave_listening_port = 0;
c->slave_ip[0] = '\0';
c->slave_capa = SLAVE_CAPA_NONE;
c->reply = listCreate();
c->reply_bytes = 0;
c->obuf_soft_limit_reached_time = 0;
listSetFreeMethod(c->reply,freeClientReplyValue);
listSetDupMethod(c->reply,dupClientReplyValue);
c->btype = BLOCKED_NONE;
c->bpop.timeout = 0;
c->bpop.keys = dictCreate(&objectKeyHeapPointerValueDictType,NULL);
c->bpop.target = NULL;
c->bpop.xread_group = NULL;
c->bpop.xread_consumer = NULL;
c->bpop.xread_group_noack = 0;
c->bpop.numreplicas = 0;
c->bpop.reploffset = 0;
c->woff = 0;
c->watched_keys = listCreate();
c->pubsub_channels = dictCreate(&objectKeyPointerValueDictType,NULL);
c->pubsub_patterns = listCreate();
c->peerid = NULL;
c->client_list_node = NULL;
c->client_tracking_redirection = 0;
c->client_tracking_prefixes = NULL;
c->client_cron_last_memory_usage = 0;
c->client_cron_last_memory_type = CLIENT_TYPE_NORMAL;
c->auth_callback = NULL;
c->auth_callback_privdata = NULL;
c->auth_module = NULL;
listSetFreeMethod(c->pubsub_patterns,decrRefCountVoid);
listSetMatchMethod(c->pubsub_patterns,listMatchObjects);
// linkClient函数将客户端添加到server.client、server.client_index中。
if (conn) linkClient(c);
// 初始化client事务上下文
initClientMultiState(c);
return c;
}
这里调用了connSetReadHandler函数来设置读事件的回调函数readQueryFromClient,这个回调会处理请求后续发送来的数据。
static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
// 这里的type参考connection.c中的CT_Socket
return conn->type->set_read_handler(conn, func);
}
连接的type就是上面提高过的CT_Socket,set_read_handler函数指针指向的是connSocketSetReadHandler。
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
// 将新连接请求加入事件循环中
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
在上面函数中,为新连接创建并注册了文件事件,这样后续发送来了数据,Redis的事件循环就会处理到了。
客户端请求处理
上面提到Redis为客户端连接设置的回调函数是readQueryFromClient。
/*
* 该函数处理客户端的读取请求。
*/
void readQueryFromClient(connection *conn) {
// 从连接对象中获取客户端
client *c = connGetPrivateData(conn);
int nread, readlen;
size_t qblen;
/* Check if we want to read from the client later when exiting from
* the event loop. This is the case if threaded I/O is enabled. */
// 如果开启了I/O线程,则交给I/O线程读取并解析请求数据。
if (postponeClientRead(c)) return;
/* Update total number of reads on server */
// 请求计数加1
server.stat_total_reads_processed++;
// 请求读取的最大字节数,默认是16KB
readlen = PROTO_IOBUF_LEN;
/* If this is a multi bulk request, and we are processing a bulk reply
* that is large enough, try to maximize the probability that the query
* buffer contains exactly the SDS string representing the object, even
* at the risk of requiring more read(2) calls. This way the function
* processMultiBulkBuffer() can avoid copying buffers to create the
* Redis Object representing the argument. */
/*
* 如果读取的是超大参数(长度超过PROTO_MBULK_BIG_ARG,即32KB),则需要保证查询缓冲区中只有当前参数数据。
* multibulklen属性不为0,说明发生了TCP拆包并且没有读取完一个完整的命令请求。
*/
if (c->reqtype == PROTO_REQ_MULTIBULK // 批量请求
&& c->multibulklen && // 是否还存在需要读取的批量参数
c->bulklen != -1 // 确保批量命令中的参数数量是有效的,即不是无效值-1
&& c->bulklen >= PROTO_MBULK_BIG_ARG // 判断请求的数据量是否太大(32KB)
)
{
// 计算还需要读取的数据长度,+2是为了把CRLF考虑在内
ssize_t remaining = (size_t)(c->bulklen+2)-sdslen(c->querybuf);
/* Note that the 'remaining' variable may be zero in some edge case,
* for example once we resume a blocked client after CLIENT PAUSE. */
// 设置本次需要读取的字节数
if (remaining > 0 && remaining < readlen) readlen = remaining;
}
// 获取请求缓冲区中已有数据的长度
qblen = sdslen(c->querybuf);
// 更新单次读取数据量峰值querybuf_peak
if (c->querybuf_peak < qblen) c->querybuf_peak = qblen;
// 扩容请求缓冲区,保证可用内存不小于读取字节数readlen
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
// 从socket中读取数据,该函数返回实际读取字节数
nread = connRead(c->conn, c->querybuf+qblen, readlen);
if (nread == -1) {
if (connGetState(conn) == CONN_STATE_CONNECTED) {
return;
} else {
serverLog(LL_VERBOSE, "Reading from client: %s",connGetLastError(c->conn));
freeClientAsync(c);
return;
}
} else if (nread == 0) {
serverLog(LL_VERBOSE, "Client closed connection");
freeClientAsync(c);
return;
} else if (c->flags & CLIENT_MASTER) {
/* Append the query buffer to the pending (not applied) buffer
* of the master. We'll use this buffer later in order to have a
* copy of the string applied by the last command executed. */
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf+qblen,nread);
}
// 将读到的字节数加到sds的长度上
sdsIncrLen(c->querybuf,nread);
// 设置该客户端上次交互的时间
c->lastinteraction = server.unixtime;
// 如果当前节点是主节点,则更新相关变量
if (c->flags & CLIENT_MASTER) c->read_reploff += nread;
// 累积入站的请求数据量的大小
server.stat_net_input_bytes += nread;
// 如果请求缓冲区的大小超过了最大值,
if (sdslen(c->querybuf) > server.client_max_querybuf_len) {
sds ci = catClientInfoString(sdsempty(),c), bytes = sdsempty();
bytes = sdscatrepr(bytes,c->querybuf,64);
serverLog(LL_WARNING,"Closing client that reached max query buffer length: %s (qbuf initial bytes: %s)", ci, bytes);
sdsfree(ci);
sdsfree(bytes);
freeClientAsync(c);
// 直接返回
return;
}
/* There is more data in the client input buffer, continue parsing it
* in case to check if there is a full command to execute. */
// 处理已读取数据
processInputBuffer(c);
}
该函数主要工作就是解析请求的数据,细节比较多,注释中已经给出了说明,不再赘述。最后调用processInputBuffer来处理客户端请求。但在介绍处理过程之前,在上面函数前面部分,调用了postponeClientRead函数来判断是否能将当前请求交给I/O子线程来处理。
int postponeClientRead(client *c) {
// 如果启用了I/O线程
if (server.io_threads_active && // 开启IO线程
server.io_threads_do_reads && // 开启IO线程来处理读请求
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ)))
{
c->flags |= CLIENT_PENDING_READ;
/*
* 将客户端添加到pending_read队列中,���加入全局的一个列表中,
* 在事件循环的beforeSleep函数中会调用到
*/
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
如果Redis开启了I/O线程,那么则会使用I/O线程来处理请求。
I/O子线程处理请求
在事件循环调用的beforeSleep函数中,会调用handleClientsWithPendingReadsUsingThreads来将客户端请求分派给I/O子线程来处理。注意此时这个方法还是主线程在执行。
/*
* server.c中的beforeSleep函数中会调用该函数。
*/
int handleClientsWithPendingReadsUsingThreads(void) {
// 如果没有启用IO线程,或者没有开启多线程读IO配置,则直接退出
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
if (tio_debug) printf("%d TOTAL READ pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_read,&li);
int item_id = 0;
// 将待处理的客户端划分到各个线程的客户端队列中,
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 通过取余法,将客户端进行分派
int target_id = item_id % server.io_threads_num;
// 将客户端添加到对应的线程列表中
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_READ;
// 记录每个IO线程���处理客户端数量
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
// 主线程也需要分派到的客户端
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
// 不断地检测每个线程的io_threads_pending,直到所有都为0,这时所有线程都已处理完。
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O READ All threads finshed\n");
/* Run the list of clients again to process the new buffers. */
/*
* 执行到这里,所有的客户端请求数据已经被读取并解析完成。这里遍历所有的客户端,并执行命令。
* 这里也可以发现,只有IO操作会交给IO线程去执行,命令的执行还是由主线程单线程执行的。
*
*/
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_READ;
listDelNode(server.clients_pending_read,ln);
if (c->flags & CLIENT_PENDING_COMMAND) {
c->flags &= ~CLIENT_PENDING_COMMAND;
// 执行命令并重置客户端
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
}
// 处理输入的数据
processInputBuffer(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
上面函数中会从全局链表中取出待处理的请求,将其转移到I/O线程自己的链表中,再次强调一遍,此时还是主线程在执行这个操作,最后主线程会等待所有I/O线程处理完读操作。调用processInputBuffer函数来处理请求,这样就和主线程处理请求过程重合了。从这里也可以看出来,I/O线程只是处理读写请求中的数据,而命令只有主线程在执行。
处理请求数据
void processInputBuffer(client *c) {
/* Keep processing while there is something in the input buffer */
// 一直循环,直到读取完缓冲区
while(c->qb_pos < sdslen(c->querybuf)) {
/*
* 下面的几个if(...) break; 语句块主要是判断不执行命令的情况。
* 如果满足对应的条件,那么会直接退出。
*/
/* Return if clients are paused. */
if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break;
/* Immediately abort if the client is in the middle of something. */
if (c->flags & CLIENT_BLOCKED) break;
/* Don't process more buffers from clients that have already pending
* commands to execute in c->argv. */
if (c->flags & CLIENT_PENDING_COMMAND) break;
/* Don't process input from the master while there is a busy script
* condition on the slave. We want just to accumulate the replication
* stream (instead of replying -BUSY like we do with other clients) and
* later resume the processing. */
if (server.lua_timedout && c->flags & CLIENT_MASTER) break;
/* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is
* written to the client. Make sure to not let the reply grow after
* this flag has been set (i.e. don't process more commands).
*
* The same applies for clients we want to terminate ASAP. */
if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break;
/* Determine request type when unknown. */
// 如果还未确认请求的类型,那么根据请求中的第一个字符是否为'*'来设置不同的类型。
if (!c->reqtype) {
// 如果以*开头,则请求类型为PROTO_REQ_MULTIBULK。
if (c->querybuf[c->qb_pos] == '*') {
c->reqtype = PROTO_REQ_MULTIBULK;
} else { // 单行请求
c->reqtype = PROTO_REQ_INLINE;
}
}
if (c->reqtype == PROTO_REQ_INLINE) {
if (processInlineBuffer(c) != C_OK) break;
/* If the Gopher mode and we got zero or one argument, process
* the request in Gopher mode. To avoid data race, Redis won't
* support Gopher if enable io threads to read queries. */
if (server.gopher_enabled && !server.io_threads_do_reads &&
((c->argc == 1 && ((char*)(c->argv[0]->ptr))[0] == '/') ||
c->argc == 0))
{
processGopherRequest(c);
resetClient(c);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
break;
}
} else if (c->reqtype == PROTO_REQ_MULTIBULK) {
// 从请求报文中解析命令参数
if (processMultibulkBuffer(c) != C_OK) break;
} else {
serverPanic("Unknown request type");
}
/* Multibulk processing could see a <= 0 length. */
if (c->argc == 0) {
resetClient(c);
} else {
/* If we are in the context of an I/O thread, we can't really
* execute the command here. All we can do is to flag the client
* as one that needs to process the command. */
if (c->flags & CLIENT_PENDING_READ) {
c->flags |= CLIENT_PENDING_COMMAND;
break;
}
/* We are finally ready to execute the command. */
/*
* 执行命令并重置客户端。
*/
if (processCommandAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid exiting this
* loop and trimming the client buffer later. So we return
* ASAP in that case. */
return;
}
}
}
/* Trim to pos */
if (c->qb_pos) {
// 将qb_pos所指的内容拷贝到数组的下标0处,即将有效内容往前移动
sdsrange(c->querybuf,c->qb_pos,-1);
// 重置为0
c->qb_pos = 0;
}
}
int processCommandAndResetClient(client *c) {
int deadclient = 0;
server.current_client = c;
// 执行命令
if (processCommand(c) == C_OK) {
// 命令执行完毕之后,执行后续的其他操作。
commandProcessed(c);
}
if (server.current_client == NULL) deadclient = 1;
server.current_client = NULL;
/* freeMemoryIfNeeded may flush slave output buffers. This may
* result into a slave, that may be the active client, to be
* freed. */
return deadclient ? C_ERR : C_OK;
}
关键点在于调用了processCommand函数来处理请求。
int processCommand(client *c) {
// 执行模块过滤器
moduleCallCommandFilters(c);
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
// 针对quit命令特殊处理
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
// 从server.commands命令字典中查找对应的redisCommand,并检查参数是否满足命令要求。
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) { // 如果没找到命令
sds args = sdsempty();
int i;
// 将命令内容写入到args中,最多只写128个字节;避免请求什么就完整响应,如果被恶意攻击发送长度很长的命令,完整响应岂不是很傻;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
// 拒绝执行命令
rejectCommandFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) { // 命令参数不符合要求,则拒绝执行
rejectCommandFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
int is_write_command = (c->cmd->flags & CMD_WRITE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_WRITE));
int is_denyoom_command = (c->cmd->flags & CMD_DENYOOM) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_flags & CMD_DENYOOM));
int is_denystale_command = !(c->cmd->flags & CMD_STALE) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_STALE));
int is_denyloading_command = !(c->cmd->flags & CMD_LOADING) ||
(c->cmd->proc == execCommand && (c->mstate.cmd_inv_flags & CMD_LOADING));
/* Check if the user is authenticated. This check is skipped in case
* the default user is flagged as "nopass" and is active. */
int auth_required = (!(DefaultUser->flags & USER_FLAG_NOPASS) ||
(DefaultUser->flags & USER_FLAG_DISABLED)) &&
!c->authenticated;
if (auth_required) {
/* AUTH and HELLO and no auth modules are valid even in
* non-authenticated state. */
// 如果客户端未通过认证,那么只能执行AUTH命令。
if (!(c->cmd->flags & CMD_NO_AUTH)) {
rejectCommand(c,shared.noautherr);
return C_OK;
}
}
/* Check if the user can run this command according to the current
* ACLs. */
int acl_keypos;
int acl_retval = ACLCheckCommandPerm(c,&acl_keypos);
// 根据ACL权限控制列表,检查该客户端用户是否有权限执行该命令。
if (acl_retval != ACL_OK) {
addACLLogEntry(c,acl_retval,acl_keypos,NULL);
if (acl_retval == ACL_DENIED_CMD)
rejectCommandFormat(c,
"-NOPERM this user has no permissions to run "
"the '%s' command or its subcommand", c->cmd->name);
else
rejectCommandFormat(c,
"-NOPERM this user has no permissions to access "
"one of the keys used as arguments");
return C_OK;
}
/* If cluster is enabled perform the cluster redirection here.
* However we don't perform the redirection if:
* 1) The sender of this command is our master.
* 2) The command has no key arguments. */
if (server.cluster_enabled &&
!(c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_LUA &&
server.lua_caller->flags & CLIENT_MASTER) &&
!(c->cmd->getkeys_proc == NULL && c->cmd->firstkey == 0 &&
c->cmd->proc != execCommand))
{
int hashslot;
int error_code;
clusterNode *n = getNodeByQuery(c,c->cmd,c->argv,c->argc,
&hashslot,&error_code);
/*
* 如果是在cluster模式下,且当前节点不是该命令的键的存储节点,
* 则返回ASK或MOVED通知客户端请求真正的存储节点。
*/
if (n == NULL || n != server.cluster->myself) {
if (c->cmd->proc == execCommand) {
discardTransaction(c);
} else {
flagTransaction(c);
}
clusterRedirectClient(c,n,hashslot,error_code);
return C_OK;
}
}
/* Handle the maxmemory directive.
*
* Note that we do not want to reclaim memory if we are here re-entering
* the event loop since there is a busy Lua script running in timeout
* condition, to avoid mixing the propagation of scripts with the
* propagation of DELs due to eviction. */
// 如果设置了能够使用的最大内存大小,则进行数据淘汰。
if (server.maxmemory && !server.lua_timedout) {
// 进行数据淘汰,并判断是否oom
int out_of_memory = freeMemoryIfNeededAndSafe() == C_ERR;
/* freeMemoryIfNeeded may flush slave output buffers. This may result
* into a slave, that may be the active client, to be freed. */
if (server.current_client == NULL) return C_ERR;
int reject_cmd_on_oom = is_denyoom_command;
/* If client is in MULTI/EXEC context, queuing may consume an unlimited
* amount of memory, so we want to stop that.
* However, we never want to reject DISCARD, or even EXEC (unless it
* contains denied commands, in which case is_denyoom_command is already
* set. */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand &&
c->cmd->proc != discardCommand) {
reject_cmd_on_oom = 1;
}
// 如果数据淘汰失败,则拒绝命令。
if (out_of_memory && reject_cmd_on_oom) {
rejectCommand(c, shared.oomerr);
return C_OK;
}
/* Save out_of_memory result at script start, otherwise if we check OOM
* until first write within script, memory used by lua stack and
* arguments might interfere. */
if (c->cmd->proc == evalCommand || c->cmd->proc == evalShaCommand) {
server.lua_oom = out_of_memory;
}
}
/* Make sure to use a reasonable amount of memory for client side
* caching metadata. */
/*
* Redis Tracking机制要求服务器记录客户端查询过的键,
* 如果记录的键的数量大于了配置的上限,那么随机删除一些键,并向对应客户端发送失效消息。
*/
if (server.tracking_clients) trackingLimitUsedSlots();
/* Don't accept write commands if there are problems persisting on disk
* and if this is a master instance. */
// 如果当前节点是主节点,且存在持久化错误,那么拒绝执行命令。
int deny_write_type = writeCommandsDeniedByDiskError();
if (deny_write_type != DISK_ERROR_TYPE_NONE &&
server.masterhost == NULL &&
(is_write_command ||c->cmd->proc == pingCommand))
{
if (deny_write_type == DISK_ERROR_TYPE_RDB)
rejectCommand(c, shared.bgsaveerr);
else
rejectCommandFormat(c,
"-MISCONF Errors writing to the AOF file: %s",
strerror(server.aof_last_write_errno));
return C_OK;
}
/* Don't accept write commands if there are not enough good slaves and
* user configured the min-slaves-to-write option. */
// 如果当前节点是主节点,且正常的从节点小于配置的下限,那么拒绝执行命令。
if (server.masterhost == NULL &&
server.repl_min_slaves_to_write &&
server.repl_min_slaves_max_lag &&
is_write_command &&
server.repl_good_slaves_count < server.repl_min_slaves_to_write)
{
rejectCommand(c, shared.noreplicaserr);
return C_OK;
}
/* Don't accept write commands if this is a read only slave. But
* accept write commands if this is our master. */
// 如果该服务器是从节点,而且客户端不是主节点,那么拒绝命令。
if (server.masterhost && server.repl_slave_ro &&
!(c->flags & CLIENT_MASTER) &&
is_write_command)
{
rejectCommand(c, shared.roslaveerr);
return C_OK;
}
/* Only allow a subset of commands in the context of Pub/Sub if the
* connection is in RESP2 mode. With RESP3 there are no limits. */
// 客户端处于Pub/Sub模式下,而且使用的是RESP2协议,则只支持几种命令,拒绝其他的命令。
if ((c->flags & CLIENT_PUBSUB && c->resp == 2) &&
c->cmd->proc != pingCommand &&
c->cmd->proc != subscribeCommand &&
c->cmd->proc != unsubscribeCommand &&
c->cmd->proc != psubscribeCommand &&
c->cmd->proc != punsubscribeCommand) {
rejectCommandFormat(c,
"Can't execute '%s': only (P)SUBSCRIBE / "
"(P)UNSUBSCRIBE / PING / QUIT are allowed in this context",
c->cmd->name);
return C_OK;
}
/* Only allow commands with flag "t", such as INFO, SLAVEOF and so on,
* when slave-serve-stale-data is no and we are a slave with a broken
* link with master. */
// 如果当前节点是从节点,且与主节点处于断连状态,则拒绝查询数据的命令。
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED &&
server.repl_serve_stale_data == 0 &&
is_denystale_command)
{
rejectCommand(c, shared.masterdownerr);
return C_OK;
}
/* Loading DB? Return an error if the command has not the
* CMD_LOADING flag. */
// 正在加载数据,只有特定命令能够执行。
if (server.loading && is_denyloading_command) {
rejectCommand(c, shared.loadingerr);
return C_OK;
}
/* Lua script too slow? Only allow a limited number of commands.
* Note that we need to allow the transactions commands, otherwise clients
* sending a transaction with pipelining without error checking, may have
* the MULTI plus a few initial commands refused, then the timeout
* condition resolves, and the bottom-half of the transaction gets
* executed, see Github PR #7022. */
// 服务器处于Lua脚本超时状态,只有特定命令能够执行。
if (server.lua_timedout &&
c->cmd->proc != authCommand &&
c->cmd->proc != helloCommand &&
c->cmd->proc != replconfCommand &&
c->cmd->proc != multiCommand &&
c->cmd->proc != discardCommand &&
c->cmd->proc != watchCommand &&
c->cmd->proc != unwatchCommand &&
!(c->cmd->proc == shutdownCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'n') &&
!(c->cmd->proc == scriptCommand &&
c->argc == 2 &&
tolower(((char*)c->argv[1]->ptr)[0]) == 'k'))
{
rejectCommand(c, shared.slowscripterr);
return C_OK;
}
/* Exec the command */
/*
* 如果除当前client处于事务上下文中,那么除了EXEC、DISCARD、MULTI和WATCH外的命令会被加入到事务队列中。
*/
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
/*
* 调用call函数执行命令。
*/
call(c,CMD_CALL_FULL);
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
这个函数非常长,包含了Module、Tracking、ACL、PUB/SUB、Lua脚本、主从、Cluster、事务等几乎所有Redis支持的特性。但重点在于最下面调用的call函数。
void call(client *c, int flags) {
long long dirty;
ustime_t start, duration;
int client_old_flags = c->flags;
struct redisCommand *real_cmd = c->cmd;
server.fixed_time_expire++;
/* Send the command to clients in MONITOR mode if applicable.
* Administrative commands are considered too dangerous to be shown. */
if (listLength(server.monitors) &&
!server.loading &&
!(c->cmd->flags & (CMD_SKIP_MONITOR|CMD_ADMIN)))
{
// 发送命令信息给监控模式下的客户端
replicationFeedMonitors(c,server.monitors,c->db->id,c->argv,c->argc);
}
/* Initialization: clear the flags that must be set by the command on
* demand, and initialize the array for additional commands propagation. */
/*
* 重置传���控制标志,这些标志应该只在命令执行过程中开启。
* 由于call可以递归调用,所以执行命令前先清除这些标志。
*/
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
redisOpArray prev_also_propagate = server.also_propagate;
redisOpArrayInit(&server.also_propagate);
/* Call the command. */
dirty = server.dirty;
updateCachedTime(0);
start = server.ustime;
/*
* 执行命令的处理逻辑
*/
c->cmd->proc(c);
duration = ustime()-start;
dirty = server.dirty-dirty;
if (dirty < 0) dirty = 0;
/* After executing command, we will close the client after writing entire
* reply if it is set 'CLIENT_CLOSE_AFTER_COMMAND' flag. */
// 替换标记
if (c->flags & CLIENT_CLOSE_AFTER_COMMAND) {
c->flags &= ~CLIENT_CLOSE_AFTER_COMMAND;
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
}
/* When EVAL is called loading the AOF we don't want commands called
* from Lua to go into the slowlog or to populate statistics. */
// 如果当前正在加载数据,并且当前命令执行的是Lua脚本,则清除慢日志。
if (server.loading && c->flags & CLIENT_LUA)
flags &= ~(CMD_CALL_SLOWLOG | CMD_CALL_STATS);
/* If the caller is Lua, we want to force the EVAL caller to propagate
* the script if the command flag or client flag are forcing the
* propagation. */
// 如果当前是Lua脚本伪客户端
if (c->flags & CLIENT_LUA && server.lua_caller) {
if (c->flags & CLIENT_FORCE_REPL)
server.lua_caller->flags |= CLIENT_FORCE_REPL;
if (c->flags & CLIENT_FORCE_AOF)
server.lua_caller->flags |= CLIENT_FORCE_AOF;
}
/* Log the command into the Slow log if needed, and populate the
* per-command statistics that we show in INFO commandstats. */
// 记录慢日志并统计命令信息
if (flags & CMD_CALL_SLOWLOG && !(c->cmd->flags & CMD_SKIP_SLOWLOG)) {
char *latency_event = (c->cmd->flags & CMD_FAST) ?
"fast-command" : "command";
latencyAddSampleIfNeeded(latency_event,duration/1000);
slowlogPushEntryIfNeeded(c,c->argv,c->argc,duration);
}
if (flags & CMD_CALL_STATS) {
/* use the real command that was executed (cmd and lastamc) may be
* different, in case of MULTI-EXEC or re-written commands such as
* EXPIRE, GEOADD, etc. */
real_cmd->microseconds += duration;
real_cmd->calls++;
}
/* Propagate the command into the AOF and replication link */
// 将命令记录到AOF文件或复制到从服务器中
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementation called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
// 执行命令传播
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags);
}
/* Restore the old replication flags, since call() can be executed
* recursively. */
// 恢复上面清除了的标志
c->flags &= ~(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
c->flags |= client_old_flags &
(CLIENT_FORCE_AOF|CLIENT_FORCE_REPL|CLIENT_PREVENT_PROP);
/* Handle the alsoPropagate() API to handle commands that want to propagate
* multiple separated commands. Note that alsoPropagate() is not affected
* by CLIENT_PREVENT_PROP flag. */
// also_propagate中存放了一系列需额外传播的命令,则将它记录到AOF或复制到从服务器中。
if (server.also_propagate.numops) {
int j;
redisOp *rop;
if (flags & CMD_CALL_PROPAGATE) {
int multi_emitted = 0;
/* Wrap the commands in server.also_propagate array,
* but don't wrap it if we are already in MULTI context,
* in case the nested MULTI/EXEC.
*
* And if the array contains only one command, no need to
* wrap it, since the single command is atomic. */
if (server.also_propagate.numops > 1 &&
!(c->cmd->flags & CMD_MODULE) &&
!(c->flags & CLIENT_MULTI) &&
!(flags & CMD_CALL_NOWRAP))
{
execCommandPropagateMulti(c);
multi_emitted = 1;
}
for (j = 0; j < server.also_propagate.numops; j++) {
rop = &server.also_propagate.ops[j];
int target = rop->target;
/* Whatever the command wish is, we honor the call() flags. */
if (!(flags&CMD_CALL_PROPAGATE_AOF)) target &= ~PROPAGATE_AOF;
if (!(flags&CMD_CALL_PROPAGATE_REPL)) target &= ~PROPAGATE_REPL;
if (target)
// 执行命令传播
propagate(rop->cmd,rop->dbid,rop->argv,rop->argc,target);
}
if (multi_emitted) {
execCommandPropagateExec(c);
}
}
redisOpArrayFree(&server.also_propagate);
}
server.also_propagate = prev_also_propagate;
/* If the client has keys tracking enabled for client side caching,
* make sure to remember the keys it fetched via this command. */
/*
* 如果执行的是一个查询命令,那么Redis要记住该命令。后续当查询的键发生变化时,需要通知客户端。
* 这是Redis 6新增的Tracking机制。
*/
if (c->cmd->flags & CMD_READONLY) {
client *caller = (c->flags & CLIENT_LUA && server.lua_caller) ?
server.lua_caller : c;
if (caller->flags & CLIENT_TRACKING &&
!(caller->flags & CLIENT_TRACKING_BCAST))
{
trackingRememberKeys(caller);
}
}
server.fixed_time_expire--;
server.stat_numcommands++;
/* Record peak memory after each command and before the eviction that runs
* before the next command. */
size_t zmalloc_used = zmalloc_used_memory();
if (zmalloc_used > server.stat_peak_memory)
server.stat_peak_memory = zmalloc_used;
}
这个函数也是很长,但最关键的就是下面这一行。
c->cmd->proc(c);
不同的命令有不同的处理函数,所以分析到这里就暂时不再进一步分析了。