和众多其他中间件一样,Redis也是采用的事件驱动。在Redis服务端启动后,最后会进入一个事件循环,在该循环内不断地处理事件,Redis支持两种事件:文件事件和时间事件。本文分析一下Redis中的事件循环的实现。
main函数
int main(int argc, char **argv) {
printf("Welcome to lzip-redis-6.0.9\n");
struct timeval tv;
int j;
#ifdef REDIS_TEST
if (argc == 3 && !strcasecmp(argv[1], "test")) {
if (!strcasecmp(argv[2], "ziplist")) {
return ziplistTest(argc, argv);
} else if (!strcasecmp(argv[2], "quicklist")) {
quicklistTest(argc, argv);
} else if (!strcasecmp(argv[2], "intset")) {
return intsetTest(argc, argv);
} else if (!strcasecmp(argv[2], "zipmap")) {
return zipmapTest(argc, argv);
} else if (!strcasecmp(argv[2], "sha1test")) {
return sha1Test(argc, argv);
} else if (!strcasecmp(argv[2], "util")) {
return utilTest(argc, argv);
} else if (!strcasecmp(argv[2], "endianconv")) {
return endianconvTest(argc, argv);
} else if (!strcasecmp(argv[2], "crc64")) {
return crc64Test(argc, argv);
} else if (!strcasecmp(argv[2], "zmalloc")) {
return zmalloc_test(argc, argv);
}
return -1; /* test not found */
}
#endif
/* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
spt_init(argc, argv);
#endif
setlocale(LC_COLLATE,"");
tzset(); /* Populates 'timezone' global. */
// 设置内存溢出处理器
zmalloc_set_oom_handler(redisOutOfMemoryHandler);
srand(time(NULL)^getpid());
gettimeofday(&tv,NULL);
crc64_init();
uint8_t hashseed[16];
getRandomBytes(hashseed,sizeof(hashseed));
dictSetHashFunctionSeed(hashseed);
// 检查该Redis服务器是否以sentinel模式启动
server.sentinel_mode = checkForSentinelMode(argc,argv);
// 初始化服务器配置
initServerConfig();
// 初始化ACL机制
ACLInit(); /* The ACL subsystem must be initialized ASAP because the
basic networking code and client creation depends on it. */
// 初始化模块机制
moduleInitModulesSystem();
// 初始化TLS
tlsInit();
/* Store the executable path and arguments in a safe place in order
* to be able to restart the server later. */
// 记录可执行文件的路径及启动参数,以便后续启动服务器
server.executable = getAbsolutePath(argv[0]);
// 分配参数数组的内存
server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
server.exec_argv[argc] = NULL;
// 设置参数
for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);
/* We need to init sentinel right now as parsing the configuration file
* in sentinel mode will have the effect of populating the sentinel
* data structures with master nodes to monitor. */
// 如果以sentinel模式启动,则初始化sentinel机制。
if (server.sentinel_mode) {
initSentinelConfig();
initSentinel();
}
/* Check if we need to start in redis-check-rdb/aof mode. We just execute
* the program main. However the program is part of the Redis executable
* so that we can easily execute an RDB check on loading errors. */
// 特殊处理两种可执行程序,分别是校验并修复RDB、AOF文件。
if (strstr(argv[0],"redis-check-rdb") != NULL)
redis_check_rdb_main(argc,argv,NULL);
else if (strstr(argv[0],"redis-check-aof") != NULL)
redis_check_aof_main(argc,argv);
if (argc >= 2) {
// 从第二个参数开始处理
j = 1; /* First option to parse in argv[] */
// 创建一个字符串sds变量
sds options = sdsempty();
char *configfile = NULL;
/* Handle special options --help and --version */
// 处理特殊的命令选项
if (strcmp(argv[1], "-v") == 0 ||
strcmp(argv[1], "--version") == 0) version();
if (strcmp(argv[1], "--help") == 0 ||
strcmp(argv[1], "-h") == 0) usage();
if (strcmp(argv[1], "--test-memory") == 0) {
if (argc == 3) {
memtest(atoi(argv[2]),50);
exit(0);
} else {
fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
exit(1);
}
}
/* First argument is the config file name? */
// 如果第二个参数不是以'--'开头,那么说明是指定的配置文件
if (argv[j][0] != '-' || argv[j][1] != '-') {
configfile = argv[j];
// 获取配置文件的绝对路径
server.configfile = getAbsolutePath(configfile);
/* Replace the config file in server.exec_argv with
* its absolute path. */
// 将参数替换为配置文件的绝对路径
zfree(server.exec_argv[j]);
server.exec_argv[j] = zstrdup(server.configfile);
j++;
}
/* All the other options are parsed and conceptually appended to the
* configuration file. For instance --port 6380 will generate the
* string "port 6380\n" to be parsed after the actual file name
* is parsed, if any. */
// 读��启动命令中的选项,并将它们拼接到一个字符串中,各个选项以空格分隔。
while(j != argc) {
if (argv[j][0] == '-' && argv[j][1] == '-') { // 如果是长选项(以--开头)
/* Option name */
// 跳过“--check-rdb”选项
if (!strcmp(argv[j], "--check-rdb")) {
/* Argument has no options, need to skip for parsing. */
j++;
continue;
}
// 往sds中添加一个换行符,用于隔开各个选项
if (sdslen(options)) options = sdscat(options,"\n");
// 将参数设置到选项sds中,加2是为了跳过前面的--
options = sdscat(options,argv[j]+2);
// 并追加一个空格
options = sdscat(options," ");
} else {
/* Option argument */
// 追加选项参数到options这个sds中,
options = sdscatrepr(options,argv[j],strlen(argv[j]));
// 并追加一个空格
options = sdscat(options," ");
}
j++;
}
// 如果以sentinel方式启动,那么必须指定配置文件,否则报错退出。
if (server.sentinel_mode && configfile && *configfile == '-') {
serverLog(LL_WARNING,
"Sentinel config from STDIN not allowed.");
serverLog(LL_WARNING,
"Sentinel needs config file on disk to save state. Exiting...");
exit(1);
}
// 重置服务器的saveParams
resetServerSaveParams();
// 从配置文件中加载配置项
loadServerConfig(configfile,options);
// 释放options这个sds变量
sdsfree(options);
}
// 判断是否以upstart或systemd启动的Redis Server,如果没有且设置了server.daemonize,那么需要以守护进程启动服务。
server.supervised = redisIsSupervised(server.supervised_mode);
int background = server.daemonize && !server.supervised;
if (background) daemonize(); // 设置以守护进程方式启动
serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
serverLog(LL_WARNING,
"Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
REDIS_VERSION,
(sizeof(long) == 8) ? 64 : 32,
redisGitSHA1(),
strtol(redisGitDirty(),NULL,10) > 0,
(int)getpid());
if (argc == 1) {
serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
} else {
serverLog(LL_WARNING, "Configuration loaded");
}
// 从proc文件系统中读取当前进程的OOM分数
readOOMScoreAdj();
// 初始化server
initServer();
// 创建pid文件
if (background || server.pidfile) createPidFile();
redisSetProcTitle(argv[0]);
redisAsciiArt();
// 检查TCP的backlog设置
checkTcpBacklogSettings();
if (!server.sentinel_mode) { // 如果以非sentinel模式启动
/* Things not needed when running in Sentinel mode. */
serverLog(LL_WARNING,"Server initialized");
#ifdef __linux__
// 检查Linux内存相关的系统参数
linuxMemoryWarnings();
#endif
// 加载配置文件指定的模块
moduleLoadFromQueue();
// 加载ACL用户控制列表
ACLLoadUsersAtStartup();
// 创建后台线程、IO线程
InitServerLast();
// 从磁盘中加载AOF或RDB文件
loadDataFromDisk();
if (server.cluster_enabled) {
// 如果以cluster模式启动,则还需验证加载的数据是否正确
if (verifyClusterConfigWithData() == C_ERR) {
serverLog(LL_WARNING,
"You can't have keys in a DB different than DB 0 when in "
"Cluster mode. Exiting.");
exit(1);
}
}
if (server.ipfd_count > 0 || server.tlsfd_count > 0)
serverLog(LL_NOTICE,"Ready to accept connections");
if (server.sofd > 0)
serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
if (!server.masterhost) {
redisCommunicateSystemd("STATUS=Ready to accept connections\n");
redisCommunicateSystemd("READY=1\n");
} else {
redisCommunicateSystemd("STATUS=Waiting for MASTER <-> REPLICA sync\n");
}
}
} else { // 如果以sentinel模式启动
InitServerLast();
// 启动sentinel机制
sentinelIsRunning();
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=Ready to accept connections\n");
redisCommunicateSystemd("READY=1\n");
}
}
/* Warning the user about suspicious maxmemory setting. */
if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
}
/*
* 设置CPU亲和性,尽量将Redis主线程绑定到server.server_cpulist配置的CPU列表上。
* 可以减少不必要的线程切换,提高性能。
*/
redisSetCpuAffinity(server.server_cpulist);
// 将OOM分数设置为-1,告诉OOM killer,在选择结束掉哪个进程时,应该优先不考虑本线程
setOOMScoreAdj(-1);
// 启动事件循环,一直循环直到退出该函数才会返回。
aeMain(server.el);
// 清除事件循环器中的事件
aeDeleteEventLoop(server.el);
// 退出程序
return 0;
}
上面是Redis服务端的main函数,也是主要的启动过程,包含了比较多的内容。本文主需要关注上面被调用的initServer和aeMain两个函数。
初始化服务器
void initServer(void) {
int j;
// 注册UNIX信号处理函数,以便进程在收到这些信号时执行操作。
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
setupSignalHandlers();
// 设置线程随时响应CANCEL信号,以便终止线程和停止程序
makeThreadKillable();
if (server.syslog_enabled) { // 如果开启了系统日志
// 与系统日志建立输出连接,以便输出系统日志
openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
server.syslog_facility);
}
/* Initialization after setting defaults from the config system. */
// 初始化server中负责存储运行时数据的相关属性
server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
server.hz = server.config_hz;
server.pid = getpid();
server.in_fork_child = CHILD_TYPE_NONE;
server.main_thread_id = pthread_self();
server.current_client = NULL;
server.fixed_time_expire = 0;
server.clients = listCreate();
server.clients_index = raxNew();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
server.clients_pending_write = listCreate();
server.clients_pending_read = listCreate();
server.clients_timeout_table = raxNew();
server.slaveseldb = -1; /* Force to emit the first SELECT command. */
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
server.events_processed_while_blocked = 0;
server.system_memory_size = zmalloc_get_memory_size();
if ((server.tls_port || server.tls_replication || server.tls_cluster)
&& tlsConfigure(&server.tls_ctx_config) == C_ERR) {
serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
exit(1);
}
// 创建共享数据集,包括:小数字、常用字符串。
createSharedObjects();
// 修改环境变量,提高系统允许打开的文件描述符上限。避免由于大量客户端连接导致错误。
adjustOpenFilesLimit();
/*
* 创建事件循环对象,传递的参数会作为事件循环中事件数组的长度。
* server.maxclients:最大客户端连接数量,表示客户端的连接事件。
* CONFIG_FDSET_INCR:值为128,包含了一些服务端的文件事件(4个),时间事件(1个)等。
*/
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
// 创建redisDb结构体数组,长度为参数dbnum
server.db = zmalloc(sizeof(redisDb)*server.dbnum);
/* Open the TCP listening socket for the user commands. */
// TCP监听
if (server.port != 0 &&
// 监听地址端口
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
// 监听TLS端口(6.0开始支持TLS连接)
if (server.tls_port != 0 &&
listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
exit(1);
/* Open the listening Unix domain socket. */
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
// 如果配置了unix,则开启unix socket服务
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
anetNonBlock(NULL,server.sofd);
}
/* Abort if there are no listening sockets at all. */
if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) {
serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
exit(1);
}
/* Create the Redis databases, and initialize other internal state. */
// 初始化数据库server.db,用于存储数据
for (j = 0; j < server.dbnum; j++) {
server.db[j].dict = dictCreate(&dbDictType,NULL);
server.db[j].expires = dictCreate(&keyptrDictType,NULL);
server.db[j].expires_cursor = 0;
server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
server.db[j].id = j;
server.db[j].avg_ttl = 0;
server.db[j].defrag_later = listCreate();
listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
}
// 初始化LRU/LFU样本池,用于实现LRU/LFU近似算法。
evictionPoolAlloc(); /* Initialize the LRU keys pool. */
server.pubsub_channels = dictCreate(&keylistDictType,NULL);
server.pubsub_patterns = listCreate();
server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
server.cronloops = 0;
server.rdb_child_pid = -1;
server.aof_child_pid = -1;
server.module_child_pid = -1;
server.rdb_child_type = RDB_CHILD_TYPE_NONE;
server.rdb_pipe_conns = NULL;
server.rdb_pipe_numconns = 0;
server.rdb_pipe_numconns_writing = 0;
server.rdb_pipe_buff = NULL;
server.rdb_pipe_bufflen = 0;
server.rdb_bgsave_scheduled = 0;
server.child_info_pipe[0] = -1;
server.child_info_pipe[1] = -1;
server.child_info_data.magic = 0;
// 重置AOF的重写缓冲区
aofRewriteBufferReset();
server.aof_buf = sdsempty();
server.lastsave = time(NULL); /* At startup we consider the DB saved. */
server.lastbgsave_try = 0; /* At startup we never tried to BGSAVE. */
server.rdb_save_time_last = -1;
server.rdb_save_time_start = -1;
server.dirty = 0;
resetServerStats();
/* A few stats we don't want to reset: server startup time, and peak mem. */
server.stat_starttime = time(NULL);
server.stat_peak_memory = 0;
server.stat_rdb_cow_bytes = 0;
server.stat_aof_cow_bytes = 0;
server.stat_module_cow_bytes = 0;
for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
server.stat_clients_type_memory[j] = 0;
server.cron_malloc_stats.zmalloc_used = 0;
server.cron_malloc_stats.process_rss = 0;
server.cron_malloc_stats.allocator_allocated = 0;
server.cron_malloc_stats.allocator_active = 0;
server.cron_malloc_stats.allocator_resident = 0;
server.lastbgsave_status = C_OK;
server.aof_last_write_status = C_OK;
server.aof_last_write_errno = 0;
server.repl_good_slaves_count = 0;
/* Create the timer callback, this is our way to process many background
* operations incrementally, like clients timeout, eviction of unaccessed
* expired keys and so forth. */
// 创建一个时间事件,执行函数为serverCron,负责处理Redis中的定时任务,包括清理过期数据,生成RDB文件。
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/* Create an event handler for accepting new connections in TCP and Unix
* domain sockets. */
// 分别为各个socket对象注册AE_READABLE事件的处理函数。
for (j = 0; j < server.ipfd_count; j++) {
// 处理函数为acceptTcpHandler
if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.ipfd file event.");
}
}
for (j = 0; j < server.tlsfd_count; j++) {
// 处理函数为acceptTLSHandler
if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
acceptTLSHandler,NULL) == AE_ERR)
{
serverPanic(
"Unrecoverable error creating server.tlsfd file event.");
}
}
// 处理函数为acceptUnixHandler
if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* Register a readable event for the pipe used to awake the event loop
* when a blocked client in a module needs attention. */
// 为管道注册处理函数,用于唤醒事件循环,处理函数为moduleBlockedClientPipeReadable。
if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
/* Register before and after sleep handlers (note this needs to be done
* before loading persistence since it is used by processEventsWhileBlocked. */
// 注册事件循环执行epoll_wait阻塞前后需要执行的钩子函数
aeSetBeforeSleepProc(server.el,beforeSleep);
aeSetAfterSleepProc(server.el,afterSleep);
/* Open the AOF file if needed. */
if (server.aof_state == AOF_ON) {
// 如果开启了AOF,则先打开AOF文件。
server.aof_fd = open(server.aof_filename,
O_WRONLY|O_APPEND|O_CREAT,0644);
if (server.aof_fd == -1) {
serverLog(LL_WARNING, "Can't open the append-only file: %s",
strerror(errno));
exit(1);
}
}
/* 32 bit instances are limited to 4GB of address space, so if there is
* no explicit limit in the user provided configuration we set a limit
* at 3 GB using maxmemory with 'noeviction' policy'. This avoids
* useless crashes of the Redis instance for out of memory. */
// 如果运行在32位系统,则做相��处理。不分析这类情况。
if (server.arch_bits == 32 && server.maxmemory == 0) {
serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
}
// 如果以cluster模式启动,则初始化cluster机制。
if (server.cluster_enabled) clusterInit();
// 初始化server.repl_scriptcache_dict属性。
replicationScriptCacheInit();
// 初始化LUA机制
scriptingInit(1);
// 初始化慢日志机制
slowlogInit();
// 初始化延迟监控机制
latencyMonitorInit();
}
在上面的函数中,主要关注事件循环的创建和事件的注册。
创建事件循环
aeEventLoop *aeCreateEventLoop(int setsize) {
// 事件循环对象
aeEventLoop *eventLoop;
int i;
// 创建事件循环对象
if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
// 创建文件事件数组
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
// 创建就绪事件数组
eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
eventLoop->setsize = setsize;
eventLoop->lastTime = time(NULL);
// 初始化时间事件链表头
eventLoop->timeEventHead = NULL;
eventLoop->timeEventNextId = 0;
eventLoop->stop = 0;
eventLoop->maxfd = -1;
eventLoop->beforesleep = NULL;
eventLoop->aftersleep = NULL;
eventLoop->flags = 0;
/*
* 根据运行系统选择具体的I/O复用适配代码,对于Linux一般考虑ae_epoll.c这个实现。
* 内部会创建epoll对象。
*/
if (aeApiCreate(eventLoop) == -1) goto err;
/* Events with mask == AE_NONE are not set. So let's initialize the
* vector with it. */
for (i = 0; i < setsize; i++)
// 初始化事件掩码为0
eventLoop->events[i].mask = AE_NONE;
return eventLoop;
err:
if (eventLoop) {
zfree(eventLoop->events);
zfree(eventLoop->fired);
zfree(eventLoop);
}
return NULL;
}
在这个函数中,先创建了aeEventLoop对象,再调用aeApiCreate函数来创建epoll。
static int aeApiCreate(aeEventLoop *eventLoop) {
// 创建aeApiState结构体,用于后续存放已就绪事件
aeApiState *state = zmalloc(sizeof(aeApiState));
if (!state) return -1;
// 创建events数组,用于保存就绪事件
state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
if (!state->events) {
zfree(state);
return -1;
}
// 创建epoll对象
state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
if (state->epfd == -1) {
zfree(state->events);
zfree(state);
return -1;
}
// 将新创建的aeApiState赋值给事件循环中的属性
eventLoop->apidata = state;
return 0;
}
在这个中,调用了epoll_create系统调用来创建epoll。
监听地址
创建事件循环对象后,会调用listenToPort函数来监听地址并返回socket的fd,下面注册事件时会将该fd注册到epoll中。
int listenToPort(int port, int *fds, int *count) {
int j;
/* Force binding of 0.0.0.0 if no bind address is specified, always
* entering the loop if j == 0. */
// 如果没有设置绑定地址,那么则设置为NULL
if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
// 没有设置绑定地址的情况
if (server.bindaddr[j] == NULL) {
int unsupported = 0;
/* Bind * for both IPv6 and IPv4, we enter here only if
* server.bindaddr_count == 0. */
fds[*count] = anetTcp6Server(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
}
if (*count == 1 || unsupported) {
/* Bind the IPv4 address as well. */
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
if (fds[*count] != ANET_ERR) {
anetNonBlock(NULL,fds[*count]);
(*count)++;
} else if (errno == EAFNOSUPPORT) {
unsupported++;
serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
}
}
/* Exit the loop if we were able to bind * on IPv4 and IPv6,
* otherwise fds[*count] will be ANET_ERR and we'll print an
* error and return to the caller with an error. */
if (*count + unsupported == 2) break;
} else if (strchr(server.bindaddr[j],':')) {
// 绑定IPv6地址
/* Bind IPv6 address. */
fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
} else {
// 绑定IPv4地址
/* Bind IPv4 address. */
fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
server.tcp_backlog);
}
if (fds[*count] == ANET_ERR) {
serverLog(LL_WARNING,
"Could not create server TCP listening socket %s:%d: %s",
server.bindaddr[j] ? server.bindaddr[j] : "*",
port, server.neterr);
if (errno == ENOPROTOOPT || errno == EPROTONOSUPPORT ||
errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
errno == EAFNOSUPPORT || errno == EADDRNOTAVAIL)
continue;
return C_ERR;
}
// 设置套接字为非阻塞模式
anetNonBlock(NULL,fds[*count]);
(*count)++;
}
return C_OK;
}
这里不管是ipv4还是ipv6,最终都是调用到的_anetTcpServer函数,只不过af参数不一样。
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
int s = -1, rv;
char _port[6]; /* strlen("65535") */
struct addrinfo hints, *servinfo, *p;
snprintf(_port,6,"%d",port);
memset(&hints,0,sizeof(hints));
hints.ai_family = af;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; /* No effect if bindaddr != NULL */
// getaddrinfo函数将主机名或点分十进制的IP地址解析为数值格式的IP地址
if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
anetSetError(err, "%s", gai_strerror(rv));
return ANET_ERR;
}
// 处理解析到的IP地址
for (p = servinfo; p != NULL; p = p->ai_next) {
// 调用socket系统调用,打开套接字
if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
continue;
// anetV6Only函数会限制该连接仅能发送和接收IPv6数据包
if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
// 开启TCP的SO_REUSEADDR选项,保证端口释放后可以立即被再次使用
if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
// 调用anetListen监听端口
if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
goto end;
}
if (p == NULL) {
anetSetError(err, "unable to bind socket, errno: %d", errno);
goto error;
}
error:
if (s != -1) close(s);
s = ANET_ERR;
end:
freeaddrinfo(servinfo);
return s;
}
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
// 调用bind系统调用,进行端口绑定
if (bind(s,sa,len) == -1) {
anetSetError(err, "bind: %s", strerror(errno));
close(s);
return ANET_ERR;
}
// 调用listen系统调用,进行端口监听,
if (listen(s, backlog) == -1) {
anetSetError(err, "listen: %s", strerror(errno));
close(s);
return ANET_ERR;
}
return ANET_OK;
}
在_anetTcpServer中,先根据所要监听的地址和协议信息调用socket系统调用来创建socket对象,然后调用listen系统调用来进行监听,最后返回socket的fd(文件描述符)。 返回到listenToPort函数中时,会把fd设置到数组中,这个数组是上层传递过来的,对于TCP是server.ipfd。
注册事件
时间事件
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
aeTimeProc *proc, void *clientData,
aeEventFinalizerProc *finalizerProc)
{
// 获取新时间事件的id
long long id = eventLoop->timeEventNextId++;
aeTimeEvent *te;
// 创建时间事件对象
te = zmalloc(sizeof(*te));
if (te == NULL) return AE_ERR;
// 设置事件的id
te->id = id;
aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
// 设置处理函数
te->timeProc = proc;
te->finalizerProc = finalizerProc;
te->clientData = clientData;
// 采用头插法插入到时间事件链表中
te->prev = NULL;
te->next = eventLoop->timeEventHead;
te->refcount = 0;
if (te->next)
te->next->prev = te;
// 将新事件作为链表的头结点
eventLoop->timeEventHead = te;
return id;
}
事件的注册其实很简单,就是创建事件对象,然后设置一些属性,最后加入到事件循环的时间事件链表中。注意,这里采用的是头插法,即新的事件作为链表的头,后续在处理该事件时会分析到该特性。另外,传入进来的例程函数,也就是处理时间事件的函数是src/server.c文件中的serverCron函数。
文件事件
不管是TCP还是本地Unix Socket,都是调用的aeCreateFileEvent来注册的文件事件。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 如果fd超过了setsize,说明文件事件数组的容量不够,也说明注册的文件事件太多了,则返回错误。
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
// 获取文件事件对象
aeFileEvent *fe = &eventLoop->events[fd];
// 为指定文件添加事件监听对象
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
// 初始化文件事件对象的属性
fe->mask |= mask;
// 设置事件处理函数
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
和时间事件的链表不同的是,文件事件对象是保存在事件数组中的,数组下标就是fd。所以通过fd来获取事件对象,然后设置一些属性并调用aeApiAddEvent函数来进行epoll层面的事件注册。
在aeApiAddEvent函数中,会调用epoll_ctl系统调用来将所监听地址的socket注册到epoll中,后续在事件循环中会判断是否有就绪事件发生,也就是处理新连接。
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0}; /* avoid valgrind warning */
/* If the fd was already monitored for some event, we need a MOD
* operation. Otherwise we need an ADD operation. */
// 根据fd是否已存在,决定epoll操作的类型(添加还是修改)
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
// 将AE抽象层事件类型转换为epoll事件类型
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
// 为读写操作分别设置感兴趣的事件
if (mask & AE_READABLE) ee.events |= EPOLLIN;
if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
ee.data.fd = fd;
// 调用epoll_ct系统调用,执行底层epoll操作
if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
return 0;
}
在该函数中,会将ae层的事件类型转换为epoll的事件类型,然后调用系统调用epoll_ctl。
启动事件循环
当Redis结束完启动操作时,最后会调用aeMain函数进入事件循环。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
/*
* 事件循环,不断循环处理事件,直到被终止。
*/
while (!eventLoop->stop) {
// 会处理所有类型的事件,并在睡眠前后执行回调
aeProcessEvents(eventLoop, AE_ALL_EVENTS|
AE_CALL_BEFORE_SLEEP|
AE_CALL_AFTER_SLEEP);
}
}
Redis会不断地调用aeProcessEvents函数来处理事件。
// 事件循环所执行的函数
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
int processed = 0, numevents;
/* Nothing to do? return ASAP */
// 如果既没有时间事件,也没有文件事件,则直接返回
if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
/* Note that we want call select() even if there are no
* file events to process as long as we want to process time
* events, in order to sleep until the next time event is ready
* to fire. */
// 判断是否有就绪事件
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
struct timeval tv, *tvp;
// 下面两个if块,都是在计算最大阻塞时间
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
/*
* 查找下一个时间事件
* Redis中只有一个时间事件,就是周期性地执行执行serverCron函数。
*/
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
long now_sec, now_ms;
// 获取当前的时间,精确到毫秒
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
/* How many milliseconds we need to wait for the next
* time event to fire? */
// 计算下一个时间事件执行时间减去当前时间有多少毫秒
long long ms =
(shortest->when_sec - now_sec)*1000 +
shortest->when_ms - now_ms;
if (ms > 0) { // 将毫秒转为秒
tvp->tv_sec = ms/1000;
// 这里为什么要记录毫秒部分的1000倍?
tvp->tv_usec = (ms % 1000)*1000;
} else {
tvp->tv_sec = 0;
tvp->tv_usec = 0;
}
} else { // 没找到下一个时间事件
/* If we have to check for events but need to return
* ASAP because of AE_DONT_WAIT we need to set the timeout
* to zero */
if (flags & AE_DONT_WAIT) { // 存在该标志,进程不会阻塞,而是由事件循环不断调用该函数。
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else { // 如果没有DONT_WAIT标记,则会一直等待
/* Otherwise we can block */
tvp = NULL; /* wait forever */
}
}
/*
* 如果eventLoop本身设置了AE_DONT_WAIT标记,那么事件循环也不会阻塞。
*/
if (eventLoop->flags & AE_DONT_WAIT) {
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
}
// 进程阻塞前,执行钩子函数beforeSleep
if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
eventLoop->beforesleep(eventLoop);
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
/*
* 判断底层IO多路复用哪些事件已就绪,aeApiPoll返回已就绪的文件事件数量。
* 第二个参数包含了所能阻塞的时间
*/
numevents = aeApiPoll(eventLoop, tvp);
/* After sleep callback. */
// 进程结束阻塞后,执行钩子函数afterSleep
if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
eventLoop->aftersleep(eventLoop);
// 处理所有已就绪事件
for (j = 0; j < numevents; j++) {
// 获取就绪事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsyncing a file to disk,
* before replying to a client. */
// 判断是否设置了AE_BARRIER标志
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
/*
* 下面的rfileProc和wfileProc是在aeCreateFileEvent中设置的
*/
// 如果没有设置AE_BARRIER标志,那么先处理READABLE事件
if (!invert && fe->mask & mask & AE_READABLE) {
// 处理读事件
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
}
/* Fire the writable event. */
// 如果是WRITABLE事件
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
// 处理��写事件
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
// 如果设置了AE_BARRIER标志,则在处理可写事件之后再处理可读事件。
if (invert) {
fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
if ((fe->mask & mask & AE_READABLE) &&
(!fired || fe->wfileProc != fe->rfileProc))
{
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
}
/* Check time events */
if (flags & AE_TIME_EVENTS)
// 处理时间事件
processed += processTimeEvents(eventLoop);
return processed; /* return the number of processed file/time events */
}
该函数的步骤如下:
- 计算epoll_wait能够阻塞的最大时间,不能阻塞太久,否则影响时间事件的处理。
- 执行钩子函数beforeSleep。
- 获取就绪事件。
- 处理文件事件。
- 执行钩子函数afterSleep。
- 处理时间事件。
其中两个钩子函数定义在src/server.c文件中,是在上面介绍过的initServer函数中被设置的。而文件事件主要关注acceptTcpHandler函数,时间事件关注serverCron函数,这些函数都是在注册事件时设置的,可以参考上面的initServer函数代码。
上面每个步骤做的事情还是很复杂的,后面单独写文章来分析。
总结
本文分析了Redis中的事件循环的创建和启动,事件的注册,以及是怎么进行地址绑定的监听的。在事件循环中不断处理就绪的文件事件和时间事件,以及执行两个钩子函数。总体来说本文的脉络还是很清晰的,把握好该脉络,在分析各个部分的实现时就知道是怎么和主线联系上的。