Redis服务端的事件循环

和众多其他中间件一样,Redis也是采用的事件驱动。在Redis服务端启动后,最后会进入一个事件循环,在该循环内不断地处理事件,Redis支持两种事件:文件事件和时间事件。本文分析一下Redis中的事件循环的实现。


main函数

v6.0.9
c
src/server.c
int main(int argc, char **argv) {
    printf("Welcome to lzip-redis-6.0.9\n");
    struct timeval tv;
    int j;

#ifdef REDIS_TEST
    if (argc == 3 && !strcasecmp(argv[1], "test")) {
        if (!strcasecmp(argv[2], "ziplist")) {
            return ziplistTest(argc, argv);
        } else if (!strcasecmp(argv[2], "quicklist")) {
            quicklistTest(argc, argv);
        } else if (!strcasecmp(argv[2], "intset")) {
            return intsetTest(argc, argv);
        } else if (!strcasecmp(argv[2], "zipmap")) {
            return zipmapTest(argc, argv);
        } else if (!strcasecmp(argv[2], "sha1test")) {
            return sha1Test(argc, argv);
        } else if (!strcasecmp(argv[2], "util")) {
            return utilTest(argc, argv);
        } else if (!strcasecmp(argv[2], "endianconv")) {
            return endianconvTest(argc, argv);
        } else if (!strcasecmp(argv[2], "crc64")) {
            return crc64Test(argc, argv);
        } else if (!strcasecmp(argv[2], "zmalloc")) {
            return zmalloc_test(argc, argv);
        }

        return -1; /* test not found */
    }
#endif

    /* We need to initialize our libraries, and the server configuration. */
#ifdef INIT_SETPROCTITLE_REPLACEMENT
    spt_init(argc, argv);
#endif
    setlocale(LC_COLLATE,"");
    tzset(); /* Populates 'timezone' global. */
    // 设置内存溢出处理器
    zmalloc_set_oom_handler(redisOutOfMemoryHandler);
    srand(time(NULL)^getpid());
    gettimeofday(&tv,NULL);
    crc64_init();

    uint8_t hashseed[16];
    getRandomBytes(hashseed,sizeof(hashseed));
    dictSetHashFunctionSeed(hashseed);
    // 检查该Redis服务器是否以sentinel模式启动
    server.sentinel_mode = checkForSentinelMode(argc,argv);
    // 初始化服务器配置
    initServerConfig();
    // 初始化ACL机制
    ACLInit(); /* The ACL subsystem must be initialized ASAP because the
                  basic networking code and client creation depends on it. */
    // 初始化模块机制
    moduleInitModulesSystem();
    // 初始化TLS
    tlsInit();

    /* Store the executable path and arguments in a safe place in order
     * to be able to restart the server later. */
    // 记录可执行文件的路径及启动参数,以便后续启动服务器
    server.executable = getAbsolutePath(argv[0]);
    // 分配参数数组的内存
    server.exec_argv = zmalloc(sizeof(char*)*(argc+1));
    server.exec_argv[argc] = NULL;
    // 设置参数
    for (j = 0; j < argc; j++) server.exec_argv[j] = zstrdup(argv[j]);

    /* We need to init sentinel right now as parsing the configuration file
     * in sentinel mode will have the effect of populating the sentinel
     * data structures with master nodes to monitor. */
    // 如果以sentinel模式启动,则初始化sentinel机制。
    if (server.sentinel_mode) {
        initSentinelConfig();
        initSentinel();
    }

    /* Check if we need to start in redis-check-rdb/aof mode. We just execute
     * the program main. However the program is part of the Redis executable
     * so that we can easily execute an RDB check on loading errors. */

    // 特殊处理两种可执行程序,分别是校验并修复RDB、AOF文件。
    if (strstr(argv[0],"redis-check-rdb") != NULL)
        redis_check_rdb_main(argc,argv,NULL);
    else if (strstr(argv[0],"redis-check-aof") != NULL)
        redis_check_aof_main(argc,argv);

    if (argc >= 2) {
        // 从第二个参数开始处理
        j = 1; /* First option to parse in argv[] */
        // 创建一个字符串sds变量
        sds options = sdsempty();
        char *configfile = NULL;

        /* Handle special options --help and --version */
        // 处理特殊的命令选项
        if (strcmp(argv[1], "-v") == 0 ||
            strcmp(argv[1], "--version") == 0) version();
        if (strcmp(argv[1], "--help") == 0 ||
            strcmp(argv[1], "-h") == 0) usage();
        if (strcmp(argv[1], "--test-memory") == 0) {
            if (argc == 3) {
                memtest(atoi(argv[2]),50);
                exit(0);
            } else {
                fprintf(stderr,"Please specify the amount of memory to test in megabytes.\n");
                fprintf(stderr,"Example: ./redis-server --test-memory 4096\n\n");
                exit(1);
            }
        }

        /* First argument is the config file name? */
        // 如果第二个参数不是以'--'开头,那么说明是指定的配置文件
        if (argv[j][0] != '-' || argv[j][1] != '-') {
            configfile = argv[j];
            // 获取配置文件的绝对路径
            server.configfile = getAbsolutePath(configfile);
            /* Replace the config file in server.exec_argv with
             * its absolute path. */
            // 将参数替换为配置文件的绝对路径
            zfree(server.exec_argv[j]);
            server.exec_argv[j] = zstrdup(server.configfile);
            j++;
        }

        /* All the other options are parsed and conceptually appended to the
         * configuration file. For instance --port 6380 will generate the
         * string "port 6380\n" to be parsed after the actual file name
         * is parsed, if any. */
        // 读��启动命令中的选项,并将它们拼接到一个字符串中,各个选项以空格分隔。
        while(j != argc) {
            if (argv[j][0] == '-' && argv[j][1] == '-') { // 如果是长选项(以--开头)
                /* Option name */
                // 跳过“--check-rdb”选项
                if (!strcmp(argv[j], "--check-rdb")) {
                    /* Argument has no options, need to skip for parsing. */
                    j++;
                    continue;
                }
                // 往sds中添加一个换行符,用于隔开各个选项
                if (sdslen(options)) options = sdscat(options,"\n");
                // 将参数设置到选项sds中,加2是为了跳过前面的--
                options = sdscat(options,argv[j]+2);
                // 并追加一个空格
                options = sdscat(options," ");
            } else {
                /* Option argument */
                // 追加选项参数到options这个sds中,
                options = sdscatrepr(options,argv[j],strlen(argv[j]));
                // 并追加一个空格
                options = sdscat(options," ");
            }
            j++;
        }
        // 如果以sentinel方式启动,那么必须指定配置文件,否则报错退出。
        if (server.sentinel_mode && configfile && *configfile == '-') {
            serverLog(LL_WARNING,
                "Sentinel config from STDIN not allowed.");
            serverLog(LL_WARNING,
                "Sentinel needs config file on disk to save state.  Exiting...");
            exit(1);
        }
        // 重置服务器的saveParams
        resetServerSaveParams();
        // 从配置文件中加载配置项
        loadServerConfig(configfile,options);
        // 释放options这个sds变量
        sdsfree(options);
    }

    // 判断是否以upstart或systemd启动的Redis Server,如果没有且设置了server.daemonize,那么需要以守护进程启动服务。
    server.supervised = redisIsSupervised(server.supervised_mode);
    int background = server.daemonize && !server.supervised;
    if (background) daemonize(); // 设置以守护进程方式启动

    serverLog(LL_WARNING, "oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo");
    serverLog(LL_WARNING,
        "Redis version=%s, bits=%d, commit=%s, modified=%d, pid=%d, just started",
            REDIS_VERSION,
            (sizeof(long) == 8) ? 64 : 32,
            redisGitSHA1(),
            strtol(redisGitDirty(),NULL,10) > 0,
            (int)getpid());

    if (argc == 1) {
        serverLog(LL_WARNING, "Warning: no config file specified, using the default config. In order to specify a config file use %s /path/to/%s.conf", argv[0], server.sentinel_mode ? "sentinel" : "redis");
    } else {
        serverLog(LL_WARNING, "Configuration loaded");
    }

    // 从proc文件系统中读取当前进程的OOM分数
    readOOMScoreAdj();
    // 初始化server
    initServer();
    // 创建pid文件
    if (background || server.pidfile) createPidFile();
    redisSetProcTitle(argv[0]);
    redisAsciiArt();
    // 检查TCP的backlog设置
    checkTcpBacklogSettings();

    if (!server.sentinel_mode) { // 如果以非sentinel模式启动
        /* Things not needed when running in Sentinel mode. */
        serverLog(LL_WARNING,"Server initialized");
    #ifdef __linux__
        // 检查Linux内存相关的系统参数
        linuxMemoryWarnings();
    #endif
        // 加载配置文件指定的模块
        moduleLoadFromQueue();
        // 加载ACL用户控制列表
        ACLLoadUsersAtStartup();
        // 创建后台线程、IO线程
        InitServerLast();
        // 从磁盘中加载AOF或RDB文件
        loadDataFromDisk();
        if (server.cluster_enabled) {
            // 如果以cluster模式启动,则还需验证加载的数据是否正确
            if (verifyClusterConfigWithData() == C_ERR) {
                serverLog(LL_WARNING,
                    "You can't have keys in a DB different than DB 0 when in "
                    "Cluster mode. Exiting.");
                exit(1);
            }
        }
        if (server.ipfd_count > 0 || server.tlsfd_count > 0)
            serverLog(LL_NOTICE,"Ready to accept connections");
        if (server.sofd > 0)
            serverLog(LL_NOTICE,"The server is now ready to accept connections at %s", server.unixsocket);
        if (server.supervised_mode == SUPERVISED_SYSTEMD) {
            if (!server.masterhost) {
                redisCommunicateSystemd("STATUS=Ready to accept connections\n");
                redisCommunicateSystemd("READY=1\n");
            } else {
                redisCommunicateSystemd("STATUS=Waiting for MASTER <-> REPLICA sync\n");
            }
        }
    } else { // 如果以sentinel模式启动
        InitServerLast();
        // 启动sentinel机制
        sentinelIsRunning();
        if (server.supervised_mode == SUPERVISED_SYSTEMD) {
            redisCommunicateSystemd("STATUS=Ready to accept connections\n");
            redisCommunicateSystemd("READY=1\n");
        }
    }

    /* Warning the user about suspicious maxmemory setting. */
    if (server.maxmemory > 0 && server.maxmemory < 1024*1024) {
        serverLog(LL_WARNING,"WARNING: You specified a maxmemory value that is less than 1MB (current value is %llu bytes). Are you sure this is what you really want?", server.maxmemory);
    }

    /*
     *  设置CPU亲和性,尽量将Redis主线程绑定到server.server_cpulist配置的CPU列表上。
     *  可以减少不必要的线程切换,提高性能。
     */
    redisSetCpuAffinity(server.server_cpulist);
    // 将OOM分数设置为-1,告诉OOM killer,在选择结束掉哪个进程时,应该优先不考虑本线程
    setOOMScoreAdj(-1);

    // 启动事件循环,一直循环直到退出该函数才会返回。
    aeMain(server.el);
    // 清除事件循环器中的事件
    aeDeleteEventLoop(server.el);
    // 退出程序
    return 0;
}

上面是Redis服务端的main函数,也是主要的启动过程,包含了比较多的内容。本文主需要关注上面被调用的initServeraeMain两个函数。

初始化服务器

v6.0.9
c
src/server.c
void initServer(void) {
    int j;

    // 注册UNIX信号处理函数,以便进程在收到这些信号时执行操作。
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
    setupSignalHandlers();
    // 设置线程随时响应CANCEL信号,以便终止线程和停止程序
    makeThreadKillable();

    if (server.syslog_enabled) { // 如果开启了系统日志
        // 与系统日志建立输出连接,以便输出系统日志
        openlog(server.syslog_ident, LOG_PID | LOG_NDELAY | LOG_NOWAIT,
            server.syslog_facility);
    }

    /* Initialization after setting defaults from the config system. */
    // 初始化server中负责存储运行时数据的相关属性
    server.aof_state = server.aof_enabled ? AOF_ON : AOF_OFF;
    server.hz = server.config_hz;
    server.pid = getpid();
    server.in_fork_child = CHILD_TYPE_NONE;
    server.main_thread_id = pthread_self();
    server.current_client = NULL;
    server.fixed_time_expire = 0;
    server.clients = listCreate();
    server.clients_index = raxNew();
    server.clients_to_close = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    server.clients_pending_write = listCreate();
    server.clients_pending_read = listCreate();
    server.clients_timeout_table = raxNew();
    server.slaveseldb = -1; /* Force to emit the first SELECT command. */
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.clients_paused = 0;
    server.events_processed_while_blocked = 0;
    server.system_memory_size = zmalloc_get_memory_size();

    if ((server.tls_port || server.tls_replication || server.tls_cluster)
                && tlsConfigure(&server.tls_ctx_config) == C_ERR) {
        serverLog(LL_WARNING, "Failed to configure TLS. Check logs for more info.");
        exit(1);
    }

    // 创建共享数据集,包括:小数字、常用字符串。
    createSharedObjects();
    // 修改环境变量,提高系统允许打开的文件描述符上限。避免由于大量客户端连接导致错误。
    adjustOpenFilesLimit();
    /*
     * 创建事件循环对象,传递的参数会作为事件循环中事件数组的长度。
     * server.maxclients:最大客户端连接数量,表示客户端的连接事件。
     * CONFIG_FDSET_INCR:值为128,包含了一些服务端的文件事件(4个),时间事件(1个)等。
     */
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    if (server.el == NULL) {
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
    // 创建redisDb结构体数组,长度为参数dbnum
    server.db = zmalloc(sizeof(redisDb)*server.dbnum);

    /* Open the TCP listening socket for the user commands. */
    // TCP监听
    if (server.port != 0 &&
        // 监听地址端口
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    // 监听TLS端口(6.0开始支持TLS连接)
    if (server.tls_port != 0 &&
        listenToPort(server.tls_port,server.tlsfd,&server.tlsfd_count) == C_ERR)
        exit(1);

    /* Open the listening Unix domain socket. */
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        // 如果配置了unix,则开启unix socket服务
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
            serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        anetNonBlock(NULL,server.sofd);
    }

    /* Abort if there are no listening sockets at all. */
    if (server.ipfd_count == 0 && server.tlsfd_count == 0 && server.sofd < 0) {
        serverLog(LL_WARNING, "Configured to not listen anywhere, exiting.");
        exit(1);
    }

    /* Create the Redis databases, and initialize other internal state. */
    // 初始化数据库server.db,用于存储数据
    for (j = 0; j < server.dbnum; j++) {
        server.db[j].dict = dictCreate(&dbDictType,NULL);
        server.db[j].expires = dictCreate(&keyptrDictType,NULL);
        server.db[j].expires_cursor = 0;
        server.db[j].blocking_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].ready_keys = dictCreate(&objectKeyPointerValueDictType,NULL);
        server.db[j].watched_keys = dictCreate(&keylistDictType,NULL);
        server.db[j].id = j;
        server.db[j].avg_ttl = 0;
        server.db[j].defrag_later = listCreate();
        listSetFreeMethod(server.db[j].defrag_later,(void (*)(void*))sdsfree);
    }
    // 初始化LRU/LFU样本池,用于实现LRU/LFU近似算法。
    evictionPoolAlloc(); /* Initialize the LRU keys pool. */
    server.pubsub_channels = dictCreate(&keylistDictType,NULL);
    server.pubsub_patterns = listCreate();
    server.pubsub_patterns_dict = dictCreate(&keylistDictType,NULL);
    listSetFreeMethod(server.pubsub_patterns,freePubsubPattern);
    listSetMatchMethod(server.pubsub_patterns,listMatchPubsubPattern);
    server.cronloops = 0;
    server.rdb_child_pid = -1;
    server.aof_child_pid = -1;
    server.module_child_pid = -1;
    server.rdb_child_type = RDB_CHILD_TYPE_NONE;
    server.rdb_pipe_conns = NULL;
    server.rdb_pipe_numconns = 0;
    server.rdb_pipe_numconns_writing = 0;
    server.rdb_pipe_buff = NULL;
    server.rdb_pipe_bufflen = 0;
    server.rdb_bgsave_scheduled = 0;
    server.child_info_pipe[0] = -1;
    server.child_info_pipe[1] = -1;
    server.child_info_data.magic = 0;
    // 重置AOF的重写缓冲区
    aofRewriteBufferReset();
    server.aof_buf = sdsempty();
    server.lastsave = time(NULL); /* At startup we consider the DB saved. */
    server.lastbgsave_try = 0;    /* At startup we never tried to BGSAVE. */
    server.rdb_save_time_last = -1;
    server.rdb_save_time_start = -1;
    server.dirty = 0;
    resetServerStats();
    /* A few stats we don't want to reset: server startup time, and peak mem. */
    server.stat_starttime = time(NULL);
    server.stat_peak_memory = 0;
    server.stat_rdb_cow_bytes = 0;
    server.stat_aof_cow_bytes = 0;
    server.stat_module_cow_bytes = 0;
    for (int j = 0; j < CLIENT_TYPE_COUNT; j++)
        server.stat_clients_type_memory[j] = 0;
    server.cron_malloc_stats.zmalloc_used = 0;
    server.cron_malloc_stats.process_rss = 0;
    server.cron_malloc_stats.allocator_allocated = 0;
    server.cron_malloc_stats.allocator_active = 0;
    server.cron_malloc_stats.allocator_resident = 0;
    server.lastbgsave_status = C_OK;
    server.aof_last_write_status = C_OK;
    server.aof_last_write_errno = 0;
    server.repl_good_slaves_count = 0;

    /* Create the timer callback, this is our way to process many background
     * operations incrementally, like clients timeout, eviction of unaccessed
     * expired keys and so forth. */
    // 创建一个时间事件,执行函数为serverCron,负责处理Redis中的定时任务,包括清理过期数据,生成RDB文件。
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }

    /* Create an event handler for accepting new connections in TCP and Unix
     * domain sockets. */
    // 分别为各个socket对象注册AE_READABLE事件的处理函数。
    for (j = 0; j < server.ipfd_count; j++) {
        // 处理函数为acceptTcpHandler
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
            acceptTcpHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.ipfd file event.");
            }
    }
    for (j = 0; j < server.tlsfd_count; j++) {
        // 处理函数为acceptTLSHandler
        if (aeCreateFileEvent(server.el, server.tlsfd[j], AE_READABLE,
            acceptTLSHandler,NULL) == AE_ERR)
            {
                serverPanic(
                    "Unrecoverable error creating server.tlsfd file event.");
            }
    }
    // 处理函数为acceptUnixHandler
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");


    /* Register a readable event for the pipe used to awake the event loop
     * when a blocked client in a module needs attention. */
    // 为管道注册处理函数,用于唤醒事件循环,处理函数为moduleBlockedClientPipeReadable。
    if (aeCreateFileEvent(server.el, server.module_blocked_pipe[0], AE_READABLE,
        moduleBlockedClientPipeReadable,NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }

    /* Register before and after sleep handlers (note this needs to be done
     * before loading persistence since it is used by processEventsWhileBlocked. */
    // 注册事件循环执行epoll_wait阻塞前后需要执行的钩子函数
    aeSetBeforeSleepProc(server.el,beforeSleep);
    aeSetAfterSleepProc(server.el,afterSleep);

    /* Open the AOF file if needed. */
    if (server.aof_state == AOF_ON) {
        // 如果开启了AOF,则先打开AOF文件。
        server.aof_fd = open(server.aof_filename,
                               O_WRONLY|O_APPEND|O_CREAT,0644);
        if (server.aof_fd == -1) {
            serverLog(LL_WARNING, "Can't open the append-only file: %s",
                strerror(errno));
            exit(1);
        }
    }

    /* 32 bit instances are limited to 4GB of address space, so if there is
     * no explicit limit in the user provided configuration we set a limit
     * at 3 GB using maxmemory with 'noeviction' policy'. This avoids
     * useless crashes of the Redis instance for out of memory. */
    // 如果运行在32位系统,则做相��处理。不分析这类情况。
    if (server.arch_bits == 32 && server.maxmemory == 0) {
        serverLog(LL_WARNING,"Warning: 32 bit instance detected but no memory limit set. Setting 3 GB maxmemory limit with 'noeviction' policy now.");
        server.maxmemory = 3072LL*(1024*1024); /* 3 GB */
        server.maxmemory_policy = MAXMEMORY_NO_EVICTION;
    }

    // 如果以cluster模式启动,则初始化cluster机制。
    if (server.cluster_enabled) clusterInit();
    // 初始化server.repl_scriptcache_dict属性。
    replicationScriptCacheInit();
    // 初始化LUA机制
    scriptingInit(1);
    // 初始化慢日志机制
    slowlogInit();
    // 初始化延迟监控机制
    latencyMonitorInit();
}

在上面的函数中,主要关注事件循环的创建和事件的注册。

创建事件循环

v6.0.9
c
src/ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
    // 事件循环对象
    aeEventLoop *eventLoop;
    int i;

    // 创建事件循环对象
    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
    // 创建文件事件数组
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
    // 创建就绪事件数组
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
    eventLoop->setsize = setsize;
    eventLoop->lastTime = time(NULL);
    // 初始化时间事件链表头
    eventLoop->timeEventHead = NULL;
    eventLoop->timeEventNextId = 0;
    eventLoop->stop = 0;
    eventLoop->maxfd = -1;
    eventLoop->beforesleep = NULL;
    eventLoop->aftersleep = NULL;
    eventLoop->flags = 0;
    /*
     * 根据运行系统选择具体的I/O复用适配代码,对于Linux一般考虑ae_epoll.c这个实现。
     * 内部会创建epoll对象。
     */
    if (aeApiCreate(eventLoop) == -1) goto err;
    /* Events with mask == AE_NONE are not set. So let's initialize the
     * vector with it. */
    for (i = 0; i < setsize; i++)
        // 初始化事件掩码为0
        eventLoop->events[i].mask = AE_NONE;
    return eventLoop;

err:
    if (eventLoop) {
        zfree(eventLoop->events);
        zfree(eventLoop->fired);
        zfree(eventLoop);
    }
    return NULL;
}

在这个函数中,先创建了aeEventLoop对象,再调用aeApiCreate函数来创建epoll。

v6.0.9
c
src/ae_epoll.c
static int aeApiCreate(aeEventLoop *eventLoop) {
    // 创建aeApiState结构体,用于后续存放已就绪事件
    aeApiState *state = zmalloc(sizeof(aeApiState));

    if (!state) return -1;
    // 创建events数组,用于保存就绪事件
    state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
    if (!state->events) {
        zfree(state);
        return -1;
    }
    // 创建epoll对象
    state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
    if (state->epfd == -1) {
        zfree(state->events);
        zfree(state);
        return -1;
    }
    // 将新创建的aeApiState赋值给事件循环中的属性
    eventLoop->apidata = state;
    return 0;
}

在这个中,调用了epoll_create系统调用来创建epoll。

监听地址

创建事件循环对象后,会调用listenToPort函数来监听地址并返回socket的fd,下面注册事件时会将该fd注册到epoll中。

v6.0.9
c
src/server.c
int listenToPort(int port, int *fds, int *count) {
    int j;

    /* Force binding of 0.0.0.0 if no bind address is specified, always
     * entering the loop if j == 0. */
    // 如果没有设置绑定地址,那么则设置为NULL
    if (server.bindaddr_count == 0) server.bindaddr[0] = NULL;
    for (j = 0; j < server.bindaddr_count || j == 0; j++) {
        // 没有设置绑定地址的情况
        if (server.bindaddr[j] == NULL) {
            int unsupported = 0;
            /* Bind * for both IPv6 and IPv4, we enter here only if
             * server.bindaddr_count == 0. */
            fds[*count] = anetTcp6Server(server.neterr,port,NULL,
                server.tcp_backlog);
            if (fds[*count] != ANET_ERR) {
                anetNonBlock(NULL,fds[*count]);
                (*count)++;
            } else if (errno == EAFNOSUPPORT) {
                unsupported++;
                serverLog(LL_WARNING,"Not listening to IPv6: unsupported");
            }

            if (*count == 1 || unsupported) {
                /* Bind the IPv4 address as well. */
                fds[*count] = anetTcpServer(server.neterr,port,NULL,
                    server.tcp_backlog);
                if (fds[*count] != ANET_ERR) {
                    anetNonBlock(NULL,fds[*count]);
                    (*count)++;
                } else if (errno == EAFNOSUPPORT) {
                    unsupported++;
                    serverLog(LL_WARNING,"Not listening to IPv4: unsupported");
                }
            }
            /* Exit the loop if we were able to bind * on IPv4 and IPv6,
             * otherwise fds[*count] will be ANET_ERR and we'll print an
             * error and return to the caller with an error. */
            if (*count + unsupported == 2) break;
        } else if (strchr(server.bindaddr[j],':')) {
            // 绑定IPv6地址
            /* Bind IPv6 address. */
            fds[*count] = anetTcp6Server(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        } else {
            // 绑定IPv4地址
            /* Bind IPv4 address. */
            fds[*count] = anetTcpServer(server.neterr,port,server.bindaddr[j],
                server.tcp_backlog);
        }
        if (fds[*count] == ANET_ERR) {
            serverLog(LL_WARNING,
                "Could not create server TCP listening socket %s:%d: %s",
                server.bindaddr[j] ? server.bindaddr[j] : "*",
                port, server.neterr);
                if (errno == ENOPROTOOPT     || errno == EPROTONOSUPPORT ||
                    errno == ESOCKTNOSUPPORT || errno == EPFNOSUPPORT ||
                    errno == EAFNOSUPPORT    || errno == EADDRNOTAVAIL)
                    continue;
            return C_ERR;
        }
        // 设置套接字为非阻塞模式
        anetNonBlock(NULL,fds[*count]);
        (*count)++;
    }
    return C_OK;
}

这里不管是ipv4还是ipv6,最终都是调用到的_anetTcpServer函数,只不过af参数不一样。

v6.0.9
anetTcpServer
_anetTcpServer
anetListen
<
>
c
src/anet.c
int anetTcpServer(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET, backlog);
}
int anetTcp6Server(char *err, int port, char *bindaddr, int backlog)
{
    return _anetTcpServer(err, port, bindaddr, AF_INET6, backlog);
}
c
src/anet.c
static int _anetTcpServer(char *err, int port, char *bindaddr, int af, int backlog)
{
    int s = -1, rv;
    char _port[6];  /* strlen("65535") */
    struct addrinfo hints, *servinfo, *p;

    snprintf(_port,6,"%d",port);
    memset(&hints,0,sizeof(hints));
    hints.ai_family = af;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;    /* No effect if bindaddr != NULL */

    // getaddrinfo函数将主机名或点分十进制的IP地址解析为数值格式的IP地址
    if ((rv = getaddrinfo(bindaddr,_port,&hints,&servinfo)) != 0) {
        anetSetError(err, "%s", gai_strerror(rv));
        return ANET_ERR;
    }
    // 处理解析到的IP地址
    for (p = servinfo; p != NULL; p = p->ai_next) {
        // 调用socket系统调用,打开套接字
        if ((s = socket(p->ai_family,p->ai_socktype,p->ai_protocol)) == -1)
            continue;

        // anetV6Only函数会限制该连接仅能发送和接收IPv6数据包
        if (af == AF_INET6 && anetV6Only(err,s) == ANET_ERR) goto error;
        // 开启TCP的SO_REUSEADDR选项,保证端口释放后可以立即被再次使用
        if (anetSetReuseAddr(err,s) == ANET_ERR) goto error;
        // 调用anetListen监听端口
        if (anetListen(err,s,p->ai_addr,p->ai_addrlen,backlog) == ANET_ERR) s = ANET_ERR;
        goto end;
    }
    if (p == NULL) {
        anetSetError(err, "unable to bind socket, errno: %d", errno);
        goto error;
    }

error:
    if (s != -1) close(s);
    s = ANET_ERR;
end:
    freeaddrinfo(servinfo);
    return s;
}
c
src/anet.c
static int anetListen(char *err, int s, struct sockaddr *sa, socklen_t len, int backlog) {
    // 调用bind系统调用,进行端口绑定
    if (bind(s,sa,len) == -1) {
        anetSetError(err, "bind: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }

    // 调用listen系统调用,进行端口监听,
    if (listen(s, backlog) == -1) {
        anetSetError(err, "listen: %s", strerror(errno));
        close(s);
        return ANET_ERR;
    }
    return ANET_OK;
}

_anetTcpServer中,先根据所要监听的地址和协议信息调用socket系统调用来创建socket对象,然后调用listen系统调用来进行监听,最后返回socket的fd(文件描述符)。 返回到listenToPort函数中时,会把fd设置到数组中,这个数组是上层传递过来的,对于TCP是server.ipfd

注册事件

时间事件

v6.0.9
c
src/ae.c
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
        aeTimeProc *proc, void *clientData,
        aeEventFinalizerProc *finalizerProc)
{
    // 获取新时间事件的id
    long long id = eventLoop->timeEventNextId++;
    aeTimeEvent *te;

    // 创建时间事件对象
    te = zmalloc(sizeof(*te));
    if (te == NULL) return AE_ERR;
    // 设置事件的id
    te->id = id;
    aeAddMillisecondsToNow(milliseconds,&te->when_sec,&te->when_ms);
    // 设置处理函数
    te->timeProc = proc;
    te->finalizerProc = finalizerProc;
    te->clientData = clientData;
    // 采用头插法插入到时间事件链表中
    te->prev = NULL;
    te->next = eventLoop->timeEventHead;
    te->refcount = 0;
    if (te->next)
        te->next->prev = te;
    // 将新事件作为链表的头结点
    eventLoop->timeEventHead = te;
    return id;
}

事件的注册其实很简单,就是创建事件对象,然后设置一些属性,最后加入到事件循环的时间事件链表中。注意,这里采用的是头插法,即新的事件作为链表的头,后续在处理该事件时会分析到该特性。另外,传入进来的例程函数,也就是处理时间事件的函数是src/server.c文件中的serverCron函数。

文件事件

不管是TCP还是本地Unix Socket,都是调用的aeCreateFileEvent来注册的文件事件。

v6.0.9
c
src/ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    // 如果fd超过了setsize,说明文件事件数组的容量不够,也说明注册的文件事件太多了,则返回错误。
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    // 获取文件事件对象
    aeFileEvent *fe = &eventLoop->events[fd];

    // 为指定文件添加事件监听对象
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
    // 初始化文件事件对象的属性
    fe->mask |= mask;
    // 设置事件处理函数
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

和时间事件的链表不同的是,文件事件对象是保存在事件数组中的,数组下标就是fd。所以通过fd来获取事件对象,然后设置一些属性并调用aeApiAddEvent函数来进行epoll层面的事件注册。

aeApiAddEvent函数中,会调用epoll_ctl系统调用来将所监听地址的socket注册到epoll中,后续在事件循环中会判断是否有就绪事件发生,也就是处理新连接。

v6.0.9
c
src/ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0}; /* avoid valgrind warning */
    /* If the fd was already monitored for some event, we need a MOD
     * operation. Otherwise we need an ADD operation. */
    // 根据fd是否已存在,决定epoll操作的类型(添加还是修改)
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;

    // 将AE抽象层事件类型转换为epoll事件类型
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 为读写操作分别设置感兴趣的事件
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;
    // 调用epoll_ct系统调用,执行底层epoll操作
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

在该函数中,会将ae层的事件类型转换为epoll的事件类型,然后调用系统调用epoll_ctl

启动事件循环

当Redis结束完启动操作时,最后会调用aeMain函数进入事件循环。

v6.0.9
c
src/ae.c
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    /*
     * 事件循环,不断循环处理事件,直到被终止。
     */
    while (!eventLoop->stop) {
        // 会处理所有类型的事件,并在睡眠前后执行回调
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|
                                   AE_CALL_BEFORE_SLEEP|
                                   AE_CALL_AFTER_SLEEP);
    }
}

Redis会不断地调用aeProcessEvents函数来处理事件。

v6.0.9
c
src/ae.c
// 事件循环所执行的函数
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;

    /* Nothing to do? return ASAP */
    // 如果既没有时间事件,也没有文件事件,则直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;

    /* Note that we want call select() even if there are no
     * file events to process as long as we want to process time
     * events, in order to sleep until the next time event is ready
     * to fire. */
    // 判断是否有就绪事件
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;

        // 下面两个if块,都是在计算最大阻塞时间
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            /*
             * 查找下一个时间事件
             * Redis中只有一个时间事件,就是周期性地执行执行serverCron函数。
             */
            shortest = aeSearchNearestTimer(eventLoop);
        if (shortest) {
            long now_sec, now_ms;

            // 获取当前的时间,精确到毫秒
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;

            /* How many milliseconds we need to wait for the next
             * time event to fire? */
            // 计算下一个时间事件执行时间减去当前时间有多少毫秒
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;

            if (ms > 0) { // 将毫秒转为秒
                tvp->tv_sec = ms/1000;
                // 这里为什么要记录毫秒部分的1000倍?
                tvp->tv_usec = (ms % 1000)*1000;
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        } else { // 没找到下一个时间事件
            /* If we have to check for events but need to return
             * ASAP because of AE_DONT_WAIT we need to set the timeout
             * to zero */
            if (flags & AE_DONT_WAIT) { // 存在该标志,进程不会阻塞,而是由事件循环不断调用该函数。
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else { // 如果没有DONT_WAIT标记,则会一直等待
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }

        /*
         * 如果eventLoop本身设置了AE_DONT_WAIT标记,那么事件循环也不会阻塞。
         */
        if (eventLoop->flags & AE_DONT_WAIT) {
            tv.tv_sec = tv.tv_usec = 0;
            tvp = &tv;
        }

        // 进程阻塞前,执行钩子函数beforeSleep
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        /*
         * 判断底层IO多路复用哪些事件已就绪,aeApiPoll返回已就绪的文件事件数量。
         * 第二个参数包含了所能阻塞的时间
         */
        numevents = aeApiPoll(eventLoop, tvp);

        /* After sleep callback. */
        // 进程结束阻塞后,执行钩子函数afterSleep
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
            eventLoop->aftersleep(eventLoop);

        // 处理所有已就绪事件
        for (j = 0; j < numevents; j++) {
            // 获取就绪事件
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event later. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsyncing a file to disk,
             * before replying to a client. */
            // 判断是否设置了AE_BARRIER标志
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            /*
             * 下面的rfileProc和wfileProc是在aeCreateFileEvent中设置的
             */
            // 如果没有设置AE_BARRIER标志,那么先处理READABLE事件
            if (!invert && fe->mask & mask & AE_READABLE) {
                // 处理读事件
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
            }

            /* Fire the writable event. */
            // 如果是WRITABLE事件
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    // 处理��写事件
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            // 如果设置了AE_BARRIER标志,则在处理可写事件之后再处理可读事件。
            if (invert) {
                fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                if ((fe->mask & mask & AE_READABLE) &&
                    (!fired || fe->wfileProc != fe->rfileProc))
                {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }
    }
    /* Check time events */
    if (flags & AE_TIME_EVENTS)
        // 处理时间事件
        processed += processTimeEvents(eventLoop);

    return processed; /* return the number of processed file/time events */
}

该函数的步骤如下:

  • 计算epoll_wait能够阻塞的最大时间,不能阻塞太久,否则影响时间事件的处理。
  • 执行钩子函数beforeSleep。
  • 获取就绪事件。
  • 处理文件事件。
  • 执行钩子函数afterSleep。
  • 处理时间事件。

其中两个钩子函数定义在src/server.c文件中,是在上面介绍过的initServer函数中被设置的。而文件事件主要关注acceptTcpHandler函数,时间事件关注serverCron函数,这些函数都是在注册事件时设置的,可以参考上面的initServer函数代码。

上面每个步骤做的事情还是很复杂的,后面单独写文章来分析。

总结

本文分析了Redis中的事件循环的创建和启动,事件的注册,以及是怎么进行地址绑定的监听的。在事件循环中不断处理就绪的文件事件和时间事件,以及执行两个钩子函数。总体来说本文的脉络还是很清晰的,把握好该脉络,在分析各个部分的实现时就知道是怎么和主线联系上的。