MySQL是一个多线程程序,使用了Reactor模型。主线程在完成了启动初始化操作后,最后会进入一个事件循环,等待客户端的连接请求。本文分析MySQL是怎么进入事件循环的,以及在这个事件循环中是怎么处理连接请求的?
main函数
MySQL的服务端的main函数很简单,这是因为所有逻辑都被抽离到了mysqld_main函数中了,后者非常复杂,代码量达到了1000行左右。
#include <iostream>
extern int mysqld_main(int argc, char **argv);
/*
* 服务端的入口函数
*/
int main(int argc, char **argv) {
// 自己添加的一行代码
std::cout << "Welcome to lzip-mysql-8.0.32" << std::endl;
return mysqld_main(argc, argv);
}
mysqld_main函数
正如上面提到的,该函数非常复杂,主要是做一些启动过程中的初始化操作,本文主要关注其中涉及连接器的操作。
int mysqld_main(int argc, char **argv)
#endif
{
/*
* 初始化网络,内部会创建用于监听的套接字,下面会对这些套接字进行监听。
*/
if (network_init()) unireg_abort(MYSQLD_ABORT_EXIT);
/*
* 执行事件循环
*/
mysqld_socket_acceptor->connection_event_loop();
}
调用network_init函数会创建监听器和Acceptor对象,然后在完成其他操作后会进入Acceptor的事件循环中。
事件循环
/*
* MySQL服务端的事件循环
*/
void connection_event_loop() {
// 获取连接处理器管理器,是个单例对象。
Connection_handler_manager *mgr =
Connection_handler_manager::get_instance();
while (!connection_events_loop_aborted()) {
// 获取客户端的连接请求
Channel_info *channel_info = m_listener->listen_for_connection_event();
// 处理新连接,连接处理器管理器会将请求交给请求处理器来处理
if (channel_info != nullptr) mgr->process_new_connection(channel_info);
}
}
该函数主要分为两部分:
- 获取连接处理器管理器;
- 事件循环主体:
- 获取连接请求
- 处理连接请求
获取连接处理器管理器
// 是在Connection_handler_manager::init()函数中初始化的,在本文将对应的.cc文件中。
static Connection_handler_manager *m_instance;
// 获取单例对象
static Connection_handler_manager *get_instance() {
assert(m_instance != nullptr);
return m_instance;
}
m_instance是在Connection_handler_manager::init函数中初始化的。这个init函数的被调用链条如下:
Connection_handler_manager::init <- get_options <- init_common_variables <- mysqld_main
后面三个函数都是定义在mysqld.cc文件中的,其中mysqld_main函数就是上面介绍的,这样就和主线接上了。
/*
* 该变量用于决定使用何种连接处理器。
* 这里的默认值意味着:每个连接都由一个专属的线程来处理。
* 是指创建新连接的时候创建全新的线程吗?
*/
ulong Connection_handler_manager::thread_handling =
SCHEDULER_ONE_THREAD_PER_CONNECTION;
bool Connection_handler_manager::init() {
/*
This is a static member function.
Per_thread_connection_handler's static members need to be initialized
even if One_thread_connection_handler is used instead.
*/
// 初始化默认类型处理器
Per_thread_connection_handler::init();
/*
* 创建连接处理器
*/
Connection_handler *connection_handler = nullptr;
/*
* 这里的thread_handling在上面有赋值,
* 另外可以通过在MySQL客户端执行 SHOW VARIABLES LIKE "thread_handling" 来查询。
*/
switch (Connection_handler_manager::thread_handling) {
case SCHEDULER_ONE_THREAD_PER_CONNECTION: // 默认是这个
// 不启用线程池,每一个连接用单独的线程处理
connection_handler = new (std::nothrow) Per_thread_connection_handler();
break;
case SCHEDULER_NO_THREADS:
// 启用线程池,所有连接用同一个线程处理
connection_handler = new (std::nothrow) One_thread_connection_handler();
break;
default:
assert(false);
}
if (connection_handler == nullptr) {
// This is a static member function.
Per_thread_connection_handler::destroy();
return true;
}
/*
* 创建连接处理器管理器对象,该对象是单例的。
* 该类构造函数是private的,这里在静态函数init中创建对象,是单例模式的体现。
*/
m_instance =
new (std::nothrow) Connection_handler_manager(connection_handler);
if (m_instance == nullptr) {
delete connection_handler;
// This is a static member function.
Per_thread_connection_handler::destroy();
return true;
}
return false;
}
这里的重点是会根据连接处理器的线程模型thread_handling来选择不同的处理器类型,代码中定义了默认值SCHEDULER_ONE_THREAD_PER_CONNECTION,即默认的处理器类型是Per_thread_connection_handler。另外可以通过下面的方式来查看:
mysql> SHOW VARIABLES LIKE 'thread_handling';
+-----------------+---------------------------+
| Variable_name | Value |
+-----------------+---------------------------+
| thread_handling | one-thread-per-connection |
+-----------------+---------------------------+
1 row in set (0.00 sec)
这里的one-thread-per-connection 就表示使用的是Per_thread_connection_handler。 另外在Connection_handler_manager::init函数中还初始化了Per_thread_connection_handler。
void Per_thread_connection_handler::init() {
// 初始化一些互斥量、条件量
#ifdef HAVE_PSI_INTERFACE
int count = static_cast<int>(array_elements(all_per_thread_mutexes));
mysql_mutex_register("sql", all_per_thread_mutexes, count);
count = static_cast<int>(array_elements(all_per_thread_conds));
mysql_cond_register("sql", all_per_thread_conds, count);
#endif
mysql_mutex_init(key_LOCK_thread_cache, &LOCK_thread_cache,
MY_MUTEX_INIT_FAST);
mysql_cond_init(key_COND_thread_cache, &COND_thread_cache);
mysql_cond_init(key_COND_flush_thread_cache, &COND_flush_thread_cache);
// 创建用于保存待处理信道的列表
waiting_channel_info_list = new (std::nothrow) std::list<Channel_info *>;
assert(waiting_channel_info_list != nullptr);
}
在该函数中会初始化用于保存待处理信道的列表,后续会从该列表中取出信道来进行处理。
获取连接请求
Channel_info *Mysqld_socket_listener::listen_for_connection_event() {
#ifdef HAVE_POLL
/*
* 通过poll系统调用,获取已经就绪的事件。
* 为什么MySQL不使用epoll?因为MySQL的性能瓶颈不在于网络连接的接收,而更多的是在磁盘读写上。
*
* 这里传入的timeout是-1,表示如果没请求则会阻塞在这里。
*/
int retval = poll(&m_poll_info.m_fds[0], m_socket_vector.size(), -1);
#endif
/* Is this a new connection request ? */
// 获取请求对应的socket
const Listen_socket *listen_socket = get_listen_socket();
/*
When poll/select returns control flow then at least one ready server socket
must exist. Check that get_ready_socket() returns a valid socket.
*/
assert(listen_socket != nullptr);
MYSQL_SOCKET connect_sock;
// 接受连接请求,将信息写到connect_sock这个MYSQL_SOCKET对象中
if (accept_connection(listen_socket->m_socket, &connect_sock)) {
}
// 创建表示客户端连接的通道
Channel_info *channel_info = nullptr;
// 这里是根据socket的类型,来创建不同的信道。
if (listen_socket->m_socket_type == Socket_type::UNIX_SOCKET) // 本地交互
channel_info = new (std::nothrow) Channel_info_local_socket(connect_sock);
else // TCP/IP交互
channel_info = new (std::nothrow) Channel_info_tcpip_socket(
connect_sock, (listen_socket->m_socket_interface ==
Socket_interface_type::ADMIN_INTERFACE));
return channel_info;
}
在该函数主要做了以下步骤:
- 调用poll获取就绪事件:直接调用了Linux中的poll系统调用,之所以不使用epoll,是因为MySQL的性能瓶颈不在网络连接处理上,而是在磁盘操作上。
- 获取请求对应的socket:处理poll系统调用的结果。
- 接受请求:调用accept系统调用来接受请求。
- 创建连接信道并返回:封装连接请求对应的socket为MySQL中的信道,会区分是本地连接还是TCP/IP连接。
获取socket
const Listen_socket *Mysqld_socket_listener::get_listen_socket() const {
/*
In case admin interface was set up, then first check whether an admin socket
ready to accept a new connection. Doing this way provides higher priority
to admin interface over other listeners.
*/
#ifdef HAVE_POLL
uint start_index = 0;
// 如果设置了admin的监听地址,且没有使用单独的线程来处理admin请求,
if (!m_admin_bind_address.address.empty() &&
!m_use_separate_thread_for_admin) {
// 如果存在admin请求,则先处理
if (m_poll_info.m_fds[0].revents & POLLIN) {
// 返回admin请求的socket
return &m_socket_vector[0];
} else
start_index = 1;
}
// 遍历普通的监听地址
for (uint i = start_index; i < m_socket_vector.size(); ++i) {
// 如果有请求进来
if (m_poll_info.m_fds[i].revents & POLLIN) {
// 则返回请求对应的socket
return &m_socket_vector[i];
}
}
#else // HAVE_POLL
#endif // HAVE_POLL
return nullptr;
;
}
可以看到,在该方法中,会优先处理admin请求。
接受请求
static bool accept_connection(MYSQL_SOCKET listen_sock,
MYSQL_SOCKET *connect_sock) {
struct sockaddr_storage c_addr;
for (uint retry = 0; retry < MAX_ACCEPT_RETRY; retry++) {
socket_len_t length = sizeof(struct sockaddr_storage);
// 接受连接请求
*connect_sock =
mysql_socket_accept(key_socket_client_connection, listen_sock,
(struct sockaddr *)(&c_addr), &length);
// mysql_socket_getfd函数在获取刚刚获取的连接请求的socket的fd
if (mysql_socket_getfd(*connect_sock) != INVALID_SOCKET ||
(socket_errno != SOCKET_EINTR && socket_errno != SOCKET_EAGAIN))
break;
}
if (mysql_socket_getfd(*connect_sock) == INVALID_SOCKET) {
/*
accept(2) failed on the listening port, after many retries.
There is not much details to report about the client,
increment the server global status variable.
*/
// 这里的与运算相当于是在取余,当连接错误每发生255次时,记录一下错误日志。
if ((connection_errors_accept++ & 255) == 0) { // This can happen often
return true;
}
return false;
}
#define mysql_socket_accept(K, FD, AP, LP) \
inline_mysql_socket_accept(__FILE__, __LINE__, K, FD, AP, LP)
static inline MYSQL_SOCKET inline_mysql_socket_accept(
#ifdef HAVE_PSI_SOCKET_INTERFACE
const char *src_file, uint src_line, PSI_socket_key key,
#endif
MYSQL_SOCKET socket_listen, struct sockaddr *addr, socklen_t *addr_len) {
MYSQL_SOCKET socket_accept = MYSQL_INVALID_SOCKET;
socklen_t addr_length = (addr_len != nullptr) ? *addr_len : 0;
#ifdef HAVE_PSI_SOCKET_INTERFACE
if (socket_listen.m_psi != nullptr) {
if (socket_listen.m_psi->m_enabled) {
/* Instrumentation start */
PSI_socket_locker *locker;
PSI_socket_locker_state state;
locker = PSI_SOCKET_CALL(start_socket_wait)(&state, socket_listen.m_psi,
PSI_SOCKET_CONNECT, (size_t)0,
src_file, src_line);
/* Instrumented code */
// 接受连接请求
socket_accept.fd = accept(socket_listen.fd, addr, &addr_length);
return socket_accept;
}
}
#endif
/* Non instrumented code */
socket_accept.fd = accept(socket_listen.fd, addr, &addr_length);
return socket_accept;
}
创建信道
Channel_info类有两个子类:
- Channel_info_local_socket:用于表示本地基于unix socket的连接。
- Channel_info_tcpip_socket:用于表示TCP/IP的连接。该类中有一个字段表示该连接是否是admin连接,MySQL会特殊处理admin连接,后续会在处理连接请求的时候看到admin连接的特殊之处。
cppsql/conn_handler/socket_connection.cc
/* * 标识是否是admin连接,admin连接用于实现一些管理功能。 * admin连接是不受最大连接数的限制的,也不会影响最大连接数。 * 也就是说接入了admin连接,不会占用1个普通连接的名额,普通连接能够该连多少还是多少。 */ bool m_is_admin_conn;
处理连接请求
void Connection_handler_manager::process_new_connection(
Channel_info *channel_info) {
if (connection_events_loop_aborted() ||
// 增加连接计数,如果超过了最大连接数,则发送错误码。
!check_and_incr_conn_count(channel_info->is_admin_connection())) {
// 客户端收到的1040错误码,就是这里的ER_CON_COUNT_ERROR
channel_info->send_error_and_close_channel(ER_CON_COUNT_ERROR, 0, true);
delete channel_info;
return;
}
// 通过连接处理器添加连接
if (m_connection_handler->add_connection(channel_info)) {
inc_aborted_connects();
delete channel_info;
}
}
连接数量判断
bool Connection_handler_manager::check_and_incr_conn_count(
bool is_admin_connection) {
bool connection_accepted = true;
mysql_mutex_lock(&LOCK_connection_count);
if (connection_count > max_connections && !is_admin_connection) {
connection_accepted = false;
// 连接错误计数增1
m_connection_errors_max_connection++;
} else { // 如果连接数没有超过最大值,或者是admin连接
// 连接数增1
++connection_count;
// 如果连接数超了,那说明当前请求只能是admin请求了
if (connection_count > max_used_connections) {
// 更新一下最大连接数,因为admin连接是不会占用普通连接的名额的。
max_used_connections = connection_count;
// 重置一下达到最大连接数的时间
max_used_connections_time = time(nullptr);
}
}
mysql_mutex_unlock(&LOCK_connection_count);
return connection_accepted;
}
在该函数中,判断了连接的数量是否超过了限制,admin连接不受该限制。 可以通过下面的命令来查看最大连接数量:
mysql> SHOW VARIABLES LIKE 'max_connections';
+-----------------+-------+
| Variable_name | Value |
+-----------------+-------+
| max_connections | 151 |
+-----------------+-------+
1 row in set (0.00 sec)
处理连接
这里以默认的处理器Per_thread_connection_handler为例。
bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
int error = 0;
my_thread_handle id;
// 检查空闲线程,如果存在可用的空闲线程,则这里直接返回了。
if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;
/*
There are no idle threads available to take up the new
connection. Create a new thread to handle the connection
*/
// 设置一下创建处理该信道的新线程的时间
channel_info->set_prior_thr_create_utime();
/*
* 创建新线程,并以handle_connection函数作为入口函数。
* 这里传入的第一个参数是线程的key,该key的name名称为one_connection,
* 所以在实际操作中可以筛选名称叫作这个的线程,看看实际有多少个处理客户端请求的线程。
*
* 另外,这里是通过mysql_thread_create宏来创建新线程,这样在开启PSI线程接口的情况下,会使用PFS的线程监控机制。
*/
error =
mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
handle_connection, (void *)channel_info);
return false;
}
这里主要做了2件事情:
- 检查是否有空闲线程,如果有的话,就直接使用。
- 如果没有空闲线程,则创建新的线程。可以看到这里设置了新线程所要执行的入口函数是handle_connection函数。
检查空闲线程
bool Per_thread_connection_handler::check_idle_thread_and_enqueue_connection(
Channel_info *channel_info) {
bool res = true;
mysql_mutex_lock(&LOCK_thread_cache);
/*
* 判断是否可以由已创建的线程来处理连接。
* 如果进入阻塞的线程计数大于被唤醒的线程计数,那么说明存在空闲线程可以处理新连接,则不用创建新线程。
*/
if (Per_thread_connection_handler::blocked_pthread_count > wake_pthread) {
DBUG_PRINT("info", ("waiting_channel_info_list->push %p", channel_info));
// 将连接请求添加到待处理的信道列表中
waiting_channel_info_list->push_back(channel_info);
// 增加唤醒计数
wake_pthread++;
// 唤醒线程
mysql_cond_signal(&COND_thread_cache);
// 表示不用创建新的线程
res = false;
}
mysql_mutex_unlock(&LOCK_thread_cache);
return res;
}
创建新的线程
#define mysql_thread_create(K, P1, P2, P3, P4) \
inline_mysql_thread_create(K, 0, P1, P2, P3, P4)
static inline int inline_mysql_thread_create(
PSI_thread_key key [[maybe_unused]],
unsigned int sequence_number [[maybe_unused]], my_thread_handle *thread,
const my_thread_attr_t *attr, my_start_routine start_routine, void *arg) {
int result;
// 如果开启了PSI线程接口
#ifdef HAVE_PSI_THREAD_INTERFACE
/*
* 调用pfs.cc文件中的pfs_spawn_thread_vc函数,在该函数中会创建新线程来执行pfs_spawn_thread函数,
* 而在pfs_spawn_thread函数中,才会执行这里的start_routine指向的函数。
* 具体的差别就在就在使用PSI线程接口的话,会通过PFS_thread来实现线程监控功能,具体信息参考pfs_spawn_thread函数。
*
* 最终还是要调用到my_thread_create函数,所以不管有无开启PSI线程接口,都会调用到该函数来创建新线程。
*/
result = PSI_THREAD_CALL(spawn_thread)(key, sequence_number, thread, attr,
start_routine, arg);
#else // 没有开启PSI线程接口
// 直接调用my_thread_create函数
result = my_thread_create(thread, attr, start_routine, arg);
#endif
return result;
}
#define PSI_THREAD_CALL(M) pfs_##M##_vc
会调用到pfs_spawn_thread_vc函数,在该函数中会创建新的线程来执行pfs_spawn_thread函数。为什么执行的是pfs_spawn_thread函数,而不是上层调用传递过来的函数呢?别着急,下面很快就会分析到了。
int pfs_spawn_thread_vc(PSI_thread_key key, PSI_thread_seqnum seqnum,
my_thread_handle *thread, const my_thread_attr_t *attr,
void *(*start_routine)(void *), void *arg) {
PFS_spawn_thread_arg *psi_arg;
PFS_thread *parent;
/* psi_arg can not be global, and can not be a local variable. */
// 创建参数对象
psi_arg = (PFS_spawn_thread_arg *)my_malloc(
PSI_NOT_INSTRUMENTED, sizeof(PFS_spawn_thread_arg), MYF(MY_WME));
if (unlikely(psi_arg == nullptr)) {
return EAGAIN;
}
// 设置参数对象的一些属性
psi_arg->m_child_key = key;
psi_arg->m_child_seqnum = seqnum;
psi_arg->m_child_identity = (arg ? arg : thread);
// 将实际的入口函数封装在psi线程参数对象里面
psi_arg->m_user_start_routine = start_routine;
// 将实际的入口函数的参数封装在psi线程参数对象里面
psi_arg->m_user_arg = arg;
// 获取父级pfs线程
parent = my_thread_get_THR_PFS();
// 设置与父级pfs线程相关的属性
if (parent != nullptr) {
/*
Make a copy of the parent attributes.
This is required, because instrumentation for this thread (the parent)
may be destroyed before the child thread instrumentation is created.
*/
psi_arg->m_thread_internal_id = parent->m_thread_internal_id;
psi_arg->m_user_name = parent->m_user_name;
psi_arg->m_host_name = parent->m_host_name;
} else {
psi_arg->m_thread_internal_id = 0;
psi_arg->m_user_name.reset();
psi_arg->m_host_name.reset();
}
// 创建新线程,并以 pfs_spawn_thread 函数作为入口
int result = my_thread_create(thread, attr, pfs_spawn_thread, psi_arg);
if (unlikely(result != 0)) {
// 如果线程创建失败,则释放掉psi参数对象
my_free(psi_arg);
}
return result;
}
其中调用的my_thread_create函数是真正创建线程的函数,会调用glibc提供的pthread接口来创建操作系统该层面的线程。
int my_thread_create(my_thread_handle *thread, const my_thread_attr_t *attr,
my_start_routine func, void *arg) {
#ifndef _WIN32
// 调用glibc库函数创建线程
return pthread_create(&thread->thread, attr, func, arg);
#else
#endif
}
上面说到新线程的入口函数是pfs_spawn_thread,那这里面做了什么事情呢?
// 该函数是PFS线程的入口函数,封装了实际线程的目标入口函数。
static void *pfs_spawn_thread(void *arg) {
PFS_spawn_thread_arg *typed_arg = (PFS_spawn_thread_arg *)arg;
void *user_arg;
void *(*user_start_routine)(void *);
PFS_thread *pfs;
/* First, attach instrumentation to this newly created pthread. */
PFS_thread_class *klass = find_thread_class(typed_arg->m_child_key);
if (likely(klass != nullptr)) {
// 创建PFS_thread对象,注意并不是创建真正的操作系统线程,而是一个包装对象而已。
pfs = create_thread(klass, typed_arg->m_child_seqnum,
typed_arg->m_child_identity, 0);
if (likely(pfs != nullptr)) {
// 关联实际的线程
pfs->m_thread_os_id = my_thread_os_id();
clear_thread_account(pfs);
pfs->m_parent_thread_internal_id = typed_arg->m_thread_internal_id;
pfs->m_user_name = typed_arg->m_user_name;
pfs->m_host_name = typed_arg->m_host_name;
set_thread_account(pfs);
}
} else {
pfs = nullptr;
}
my_thread_set_THR_PFS(pfs);
pfs_notify_thread_create((PSI_thread *)pfs);
/*
Secondly, free the memory allocated in spawn_thread_v1().
It is preferable to do this before invoking the user
routine, to avoid memory leaks at shutdown, in case
the server exits without waiting for this thread.
*/
// 从参数对象中获取实际的入口函数和参数
user_start_routine = typed_arg->m_user_start_routine;
user_arg = typed_arg->m_user_arg;
// 释放psi参数对象,因为该拿到的信息都已经拿到了,该对象没有用了。
my_free(typed_arg);
/* Then, execute the user code for this thread. */
/*
* 执行实际的入口函数
* 既然PFS线程用于监控的目的,那么在入口函数中又是怎么使用PFS的呢?
* 比如是怎么获取到PFS_thread对象的呢?
*/
(*user_start_routine)(user_arg);
return nullptr;
}
原来在pfs_spawn_thread_vc函数中把上层传递过来的入口函数和参数封装到参数对象中,然后传递给pfs_spawn_thread函数。pfs_spawn_thread函数会从参数对象中取出实际的入口函数和参数,然后调用执行。之所以不直接调用实际的入口函数,目的是封装一层PFS线程监控。
总结
本文首先分析了MySQL服务层的Acceptor的创建过程,随后介绍了Acceptor中的事件循环的执行流程:接受请求并处理请求。本文所介绍的所有内容都是在MySQL的主线程mysqld中执行的,子线程会调用handle_connection函数来处理请求。