목차
MariaDB线程池源码分析
心中无码
0  前言
1  相关参数
2  thread handling设置
3  线程池初始化——tp_init
4  添加新连接——tp_add_connection
5  工作线程——worker_main
6  事件获取——get_event
7  事件处理——handle_event
8  线程等待
9  唤醒等待线程
10  小结
References
데이터 베이스 MySQL 튜토리얼 MariaDB线程池源码分析_MySQL

MariaDB线程池源码分析_MySQL

Jun 01, 2016 pm 01:34 PM

MariaDB

bitsCN.com

 

MariaDB线程池源码分析

心中无码

 

0  前言

MySQL5.5的Enterprise版本以plugin的方式引入了thread pool,在并发请求数达到一定 数量的时候,性能相比社区版貌似有不少提高, 可以看下这个性能对比。

 

在引入线程池之前,MySQL支持的线程处理方式(thread_handling参数控制)有no-threads和one-thread-per-connection两种方式,no-threads方式是指任一时刻最多只有一个连接可以连接到server,一般用于实验性质。 one-thread-per-connection是指针对每个连接创建一个线程来处理这个连接的所有请求,直到连接断开,线程 结束。是thread_handling的默认方式。

 

one-thread-per-connection存在的问题就是需要为每个连接创建一个新的thread,当并发连接数达到一定 程度,性能会有明显下降,因为过多的线程会导致频繁的上下文切换,CPU cache命中率降低和锁的竞争 更加激烈。

 

解决one-thread-per-connection的方法就是降低线程数,这样就需要多个连接共用线程,这便引入了线程 池的概念。线程池中的线程是针对请求的,而不是针对连接的,也就是说几个连接可能使用相同的线程处理 各自的请求。

 

MariaDB在5.5引入了一个动态的线程池方案,可以根据当前请求的并发情况自动增加或减少线程数,还好 MariaDB完全开源,本文结合MariaDB的代码来介绍下thread pool的实现。这里使用的MariaDB 10.0的 代码树。

 

1  相关参数

MySQL的参数都写在sys_vars.cc文件下。

static Sys_var_uint Sys_threadpool_idle_thread_timeout(  "thread_pool_idle_timeout",  "Timeout in seconds for an idle thread in the thread pool."  "Worker thread will be shut down after timeout",  GLOBAL_VAR(threadpool_idle_timeout), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, UINT_MAX), DEFAULT(60), BLOCK_SIZE(1));static Sys_var_uint Sys_threadpool_oversubscribe(  "thread_pool_oversubscribe",  "How many additional active worker threads in a group are allowed.",  GLOBAL_VAR(threadpool_oversubscribe), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, 1000), DEFAULT(3), BLOCK_SIZE(1));static Sys_var_uint Sys_threadpool_size( "thread_pool_size", "Number of thread groups in the pool. " "This parameter is roughly equivalent to maximum number of concurrently " "executing threads (threads in a waiting state do not count as executing).",  GLOBAL_VAR(threadpool_size), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(1, MAX_THREAD_GROUPS), DEFAULT(my_getncpus()), BLOCK_SIZE(1),  NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),  ON_UPDATE(fix_threadpool_size));static Sys_var_uint Sys_threadpool_stall_limit( "thread_pool_stall_limit", "Maximum query execution time in milliseconds," "before an executing non-yielding thread is considered stalled." "If a worker thread is stalled, additional worker thread " "may be created to handle remaining clients.",  GLOBAL_VAR(threadpool_stall_limit), CMD_LINE(REQUIRED_ARG),  VALID_RANGE(10, UINT_MAX), DEFAULT(500), BLOCK_SIZE(1),  NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(0),   ON_UPDATE(fix_threadpool_stall_limit));
로그인 후 복사

这几个参数都有相应的描述,这里再稍微具体介绍一下。
thread_pool_size: 线程池的分组(group)个数。MariaDB的线程池并不是说一整个大池子,而是分成了不同的group,而且是按照到来connection的顺序进行分组的,如第一个connection分配到group[0],那么第二个connection就分配到group[1],是一种Round Robin的轮询分配方式。默认值是CPU core个数。

 

thread_pool_idle_timeout: 线程最大空闲时间,如果某个线程空闲的时间大于这个参数,则线程退出。

 

thread_pool_stall_limit: 监控间隔时间,thread pool有个监控线程,每隔这个时间,会检查每个group的线程可用数等状态,然后进行相应的处理,如wake up或者create thread。

 

thread_pool_oversubscribe: 允许的每个group上的活跃的线程数,注意这并不是每个group上的最大线程数,而只是可以处理请求的线程数。

 

 

2  thread handling设置

thread pool模式其实是新增了一种thread_handling的方式,即在配置文件中设置:

[mysqld]thread_handling=pool-of-threads.....
로그인 후 복사
 

MySQL内部是有一个scheduler_functions结构体,不论thread_handling是哪种方式,都是通过设置这个 结构体中的函数来进行不同的调度。

/** scheduler_functions结构体 */struct scheduler_functions{  uint max_threads, *connection_count;  ulong *max_connections;  bool (*init)(void);  bool (*init_new_connection_thread)(void);  void (*add_connection)(THD *thd);  void (*thd_wait_begin)(THD *thd, int wait_type);  void (*thd_wait_end)(THD *thd);  void (*post_kill_notification)(THD *thd);  bool (*end_thread)(THD *thd, bool cache_thread);  void (*end)(void);};static int get_options(int *argc_ptr, char ***argv_ptr){  ...  /** 根据thread_handling选项的设置,选择不同的处理方式*/if (thread_handling max_threads= threadpool_max_threads;  func->max_connections= arg_max_connections;  func->connection_count= arg_connection_count;  scheduler_init();}
로그인 후 복사

上面可以看到设置了thread_scheduler的处理函数为tp_scheduler_functions,即 为thread pool方式,这种方式对应的初始函数为tp_init, 创建新连接的函数为 tp_add_connection,等待开始函数为tp_wait_begin,等待结束函数为tp_wait_end. 这里说明下等待函数的意义,等待函数一般是在等待磁盘I/O,等待锁资源,SLEEP,或者等待 网络消息的时候,调用wait_begin,在等待结束后调用wait_end,那么为什么要等待的时候 调用等待函数呢?这个在后面进行介绍。

 

上面讲的其实和thread pool关系不是很大,下面开始thread pool流程的介绍。thread pool涉及 到的源码在emphsql/threadpool_common.cc和emphsql/threadpool_unix.cc, 对于windows而言,还有emphsql/threadpool_win.cc.

 

3  线程池初始化——tp_init

 
>tp_init| >thread_group_init| >start_timer
로그인 후 복사

tp_init非常简单,首先是调用了thread_group_init进行组的初始化, 然后调用的start_timer开启了监控线程timer_thread。 至此为止,thread pool里面只有一个监控线程启动,而没有任何工作线程, 直到有新的连接到来。

 

4  添加新连接——tp_add_connection

 
void tp_add_connection(THD *thd){  DBUG_ENTER("tp_add_connection");    threads.append(thd);  mysql_mutex_unlock(&LOCK_thread_count);  connection_t *connection= alloc_connection(thd);  if (connection)  {    thd->event_scheduler.data= connection;          /* Assign connection to a group. */    thread_group_t *group=       &all_groups[thd->thread_id%group_count];        connection->thread_group=group;          mysql_mutex_lock(&group->mutex);    group->connection_count++;    mysql_mutex_unlock(&group->mutex);        /*       Add connection to the work queue.Actual logon        will be done by a worker thread.    */    queue_put(group, connection);  }  else  {    /* Allocation failed */    threadpool_remove_connection(thd);  }   DBUG_VOID_RETURN;}
로그인 후 복사

但server的主监听线程监听到有客户端的connect时,会调用tp_add_connection函数进行处理。 首先根据thread_id对group_count取模,找到其所属的group,然后调用queue_put将此connection 放入到group中的queue中。这里涉及到两个新的结构体,connection_t和thread_group_t。

struct connection_t{  THD *thd;  thread_group_t *thread_group;  connection_t *next_in_queue;  connection_t **prev_in_queue;  ulonglong abs_wait_timeout; //等待超时时间  bool logged_in; //是否进行了登录验证  bool bound_to_poll_descriptor; //是否添加到了epoll进行监听  bool waiting; //是否在等待状态,如I/O, sleep};struct thread_group_t {  mysql_mutex_t mutex;  connection_queue_t queue;  //connection请求链表  worker_list_t waiting_threads; //group中正在等待被唤醒的thread  worker_thread_t *listener;  //当前group中用于监听的线程  pthread_attr_t *pthread_attr;  int  pollfd;  //epoll 文件描述符,用于绑定group中的所有连接  int  thread_count;  //线程数  int  active_thread_count;//活跃线程数  int  connection_count; //连接数  /* Stats for the deadlock detection timer routine.*/  int io_event_count;  //epoll产生的事件数  int queue_event_count; //工作线程消化的事件数  ulonglong last_thread_creation_time;  int  shutdown_pipe[2];  bool shutdown;  bool stalled; // 工作线程是否处于停滞状态  } MY_ALIGNED(512);
로그인 후 복사

上面对这些参数进行了说明,理解这些参数的意义,才能了解这个动态thread pool的管理机制, 因为每个参数都会影响到thread pool的增长或收缩。

 

介绍完结构体,继续回到新的连接到来,这时会调用queue_put函数,将此connection放到 group的队列queue中。

static void queue_put(thread_group_t *thread_group, connection_t *connection){  DBUG_ENTER("queue_put");  mysql_mutex_lock(&thread_group->mutex);  thread_group->queue.push_back(connection);  if (thread_group->active_thread_count == 0)    wake_or_create_thread(thread_group);  mysql_mutex_unlock(&thread_group->mutex);  DBUG_VOID_RETURN;}
로그인 후 복사

注意,这时候有个active_thread_count的判断,如果没有活跃的线程,那么就无法处理 这个新到的请求啊,这时就需要调用wake_or_create_thread,这个函数首先会尝试唤醒group 等待线程链表waiting_threads中的线程,如果没有等待中的线程,则需要创建一个线程。 至此,新到的connection被挂到了group的queue上,这样一个连接算是add进队列了,那么如何 处理这个连接呢?我们继续往下看。

 

5  工作线程——worker_main

由于是第一个连接到来,那么肯定没有waiting_threads,此时会调用create_worker 函数创建一个工作线程。我们直接来看下工作线程。

 
static void *worker_main(void *param){  ...  DBUG_ENTER("worker_main");    thread_group_t *thread_group = (thread_group_t *)param;  /* Run event loop */  for(;;)  {    connection_t *connection;    struct timespec ts;    set_timespec(ts,threadpool_idle_timeout);    connection = get_event(&this_thread, thread_group, &ts);    if (!connection)      break;    this_thread.event_count++;    handle_event(connection);  }  ....  my_thread_end();  return NULL;}
로그인 후 복사

上面是整个工作线程的逻辑,可以看到是一个循环,get_event用来获取新的需要处理的 connection,然后调用handle_event进行处理相应的connection。one thread per connection 中每个线程也是一个循环体,这两者之间的区别就是,thread pool的循环等待的是一个可用的event, 并不局限于某个固定的connection的event,而one thread per connection的循环等待是等待固定的 connection上的event,这就是两者最大的区别。

 

6  事件获取——get_event

 

工作线程通过get_event获取需要处理的connection,

 
connection_t *get_event(worker_thread_t *current_thread,   thread_group_t *thread_group,  struct timespec *abstime){   ...  for(;;)   {  ...      /** 从QUEUE中获取connection */      connection = queue_get(thread_group);      if(connection) {        fprintf(stderr, "Thread %x get a new connection./n", (unsigned int)pthread_self());        break;      }      ...      /**监听epoll */    if(!thread_group->listener)    {      thread_group->listener= current_thread;      thread_group->active_thread_count--;      mysql_mutex_unlock(&thread_group->mutex);      fprintf(stderr, "Thread %x waiting for a new event./n", (unsigned int)pthread_self());      connection = listener(current_thread, thread_group);      fprintf(stderr, "Thread %x get a new event for connection %p./n",              (unsigned int)pthread_self(), connection);      mysql_mutex_lock(&thread_group->mutex);      thread_group->active_thread_count++;      /* There is no listener anymore, it just returned. */      thread_group->listener= NULL;      break;    }    ...}
로그인 후 복사
 

这个get_event的函数逻辑稍微有点多,这里只抽取了获取事件的两个点, 我们接着按照第一个连接到来是的情形进行说明, 第一个连接到来,queue中有了一个connection,这是get_event便会从queue中获取到一个 connection,返回给worker_main线程。worker_main接着调用handle_event进行事件处理。

 

每个新的connection连接到服务器后,其socket会绑定到group的epoll中,所以,如果queue中 没有connection,需要从epool中获取,每个group的所有连接的socket都绑定在group的epool 中,所以任何一个时刻,最多只有一个线程能够监听epoll,如果epoll监听到有event的话,也会返回 相应的connection,然后再调用handle_event进行处理。

 

7  事件处理——handle_event

 

handle_event的逻辑比较简单,就是根据connection_t上是否登录过,进行分支,如果没 登录过,说明是新到的连接,则进行验证,否则直接进行请求处理。

static void handle_event(connection_t *connection){  DBUG_ENTER("handle_event");  int err;  if (!connection->logged_in) //处理登录  {    err= threadpool_add_connection(connection->thd);    connection->logged_in= true;  }  else  //处理请求  {    err= threadpool_process_request(connection->thd);  }  if(err)    goto end;  set_wait_timeout(connection);  /** 设置socket到epoll的监听 */  err= start_io(connection);end:  if (err)    connection_abort(connection);  DBUG_VOID_RETURN;}static int start_io(connection_t *connection){   int fd = mysql_socket_getfd(connection->thd->net.vio->mysql_socket);  ...      /* 绑定到epoll *。  if (!connection->bound_to_poll_descriptor)  {    connection->bound_to_poll_descriptor= true;    return io_poll_associate_fd(group->pollfd, fd, connection);  }    return io_poll_start_read(group->pollfd, fd, connection);}
로그인 후 복사

注意,在handle_event之后,会调用start_io,这个函数很重要,这个函数会将新 到的connection的socket绑定到group的epoll上进行监听。

 

8  线程等待

 

当group中的线程没有任务执行时,所有线程都会在get_event处等待,但是有两种等待方式, 一种是在epoll上等待事件,每个group中只有一个线程会做这个事情,且这个会一直等待,直到有新 的事件到来。另一种就是等待一定的时间, 即参数thread_pool_idle_time这个时间,如果超过了这个时间,那么当前的线程的get_event就会 返回空,然后worker_main线程就会退出。如果在线程等待的过程被唤醒的话,那么就会继续在 get_event中进行循环,等待新的事件。

 

9  唤醒等待线程

 

有两种方式会唤醒等待的线程,一种是监控线程timer_thread,另一种就是一些active的线程碰到 需要等待的时候,会调用tp_wait_begin,这个函数如果判断当前没有active的thread且没有thread监听 epoll,则会调用wake_or_create_thread。

 

监控线程timer_thread用于定期监控group中的thread使用情况,具体的检查函数是check_stall.

void check_stall(thread_group_t *thread_group){  ...  /** 如果没有线程监听epoll且自上次检查到现在没有新的event事件产生,说明所有的  活跃线程都在 忙于执行长任务,则需要唤醒或创建工作线程 */  if (!thread_group->listener && !thread_group->io_event_count)  {    wake_or_create_thread(thread_group);    mysql_mutex_unlock(&thread_group->mutex);    return;  }    /*  Reset io event count */  thread_group->io_event_count= 0;  /** 如果队列queue中有请求,且自上次检查到现在queue中的请求没有被消化,  则说明所有活跃线程忙于执行长任务,需要唤醒或创建工作线程*/  if (!thread_group->queue.is_empty() && !thread_group->queue_event_count)  {    thread_group->stalled= true;    wake_or_create_thread(thread_group);  }    /* Reset queue event count */  thread_group->queue_event_count= 0;    mysql_mutex_unlock(&thread_group->mutex);}
로그인 후 복사
 

10  小结

MariaDB的thread pool的实现相对比较简单,总体上就是将group中所有的connection的socket挂在 group的epoll_fd上进行事件监听,监听到的事件或被当前线程执行,或者被push到group的queue上 被其他线程执行。

 

监控线程timer_thread定期的根据需要去唤醒等待线程或创建新的线程,来达到动态增加的thread的 目的。而thread的收缩则是通过线程等待事件超时来完成的。

 

btw,在跟踪代码的过程中,也发现了使用thread pool时导致server crash的情况,提交了个 bug给MariaDB,发现当天就有回复, 并立刻修复push到source tree上了,看来MariaDB的团队反映够迅速的,赞一个。

 

References

[1]
Thread pool in MariaDB 5.5
[3]
The Thread Pool Plugin
[3]
Thread Pool Worklog
 




File translated fromTEXby TTH,version 4.03.
On 25 May 2013, 01:39.

bitsCN.com
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
2 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
2 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
2 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

Alter Table 문을 사용하여 MySQL에서 테이블을 어떻게 변경합니까? Alter Table 문을 사용하여 MySQL에서 테이블을 어떻게 변경합니까? Mar 19, 2025 pm 03:51 PM

이 기사는 MySQL의 Alter Table 문을 사용하여 열 추가/드롭 테이블/열 변경 및 열 데이터 유형 변경을 포함하여 테이블을 수정하는 것에 대해 설명합니다.

MySQL 연결에 대한 SSL/TLS 암호화를 어떻게 구성합니까? MySQL 연결에 대한 SSL/TLS 암호화를 어떻게 구성합니까? Mar 18, 2025 pm 12:01 PM

기사는 인증서 생성 및 확인을 포함하여 MySQL에 대한 SSL/TLS 암호화 구성에 대해 설명합니다. 주요 문제는 자체 서명 인증서의 보안 영향을 사용하는 것입니다. [문자 수 : 159]

MySQL에서 큰 데이터 세트를 어떻게 처리합니까? MySQL에서 큰 데이터 세트를 어떻게 처리합니까? Mar 21, 2025 pm 12:15 PM

기사는 MySQL에서 파티셔닝, 샤딩, 인덱싱 및 쿼리 최적화를 포함하여 대규모 데이터 세트를 처리하기위한 전략에 대해 설명합니다.

인기있는 MySQL GUI 도구는 무엇입니까 (예 : MySQL Workbench, Phpmyadmin)? 인기있는 MySQL GUI 도구는 무엇입니까 (예 : MySQL Workbench, Phpmyadmin)? Mar 21, 2025 pm 06:28 PM

기사는 MySQL Workbench 및 Phpmyadmin과 같은 인기있는 MySQL GUI 도구에 대해 논의하여 초보자 및 고급 사용자를위한 기능과 적합성을 비교합니다. [159 자].

드롭 테이블 문을 사용하여 MySQL에서 테이블을 어떻게 드롭합니까? 드롭 테이블 문을 사용하여 MySQL에서 테이블을 어떻게 드롭합니까? Mar 19, 2025 pm 03:52 PM

이 기사에서는 Drop Table 문을 사용하여 MySQL에서 테이블을 떨어 뜨리는 것에 대해 설명하여 예방 조치와 위험을 강조합니다. 백업 없이는 행동이 돌이킬 수 없으며 복구 방법 및 잠재적 생산 환경 위험을 상세하게합니다.

외국 키를 사용하여 관계를 어떻게 표현합니까? 외국 키를 사용하여 관계를 어떻게 표현합니까? Mar 19, 2025 pm 03:48 PM

기사는 외국 열쇠를 사용하여 데이터베이스의 관계를 나타내고 모범 사례, 데이터 무결성 및 피할 수있는 일반적인 함정에 중점을 둡니다.

JSON 열에서 인덱스를 어떻게 생성합니까? JSON 열에서 인덱스를 어떻게 생성합니까? Mar 21, 2025 pm 12:13 PM

이 기사에서는 PostgreSQL, MySQL 및 MongoDB와 같은 다양한 데이터베이스에서 JSON 열에서 인덱스를 작성하여 쿼리 성능을 향상시킵니다. 특정 JSON 경로를 인덱싱하는 구문 및 이점을 설명하고 지원되는 데이터베이스 시스템을 나열합니다.

일반적인 취약점 (SQL 주입, 무차별 적 공격)에 대해 MySQL을 어떻게 보호합니까? 일반적인 취약점 (SQL 주입, 무차별 적 공격)에 대해 MySQL을 어떻게 보호합니까? Mar 18, 2025 pm 12:00 PM

기사는 준비된 명령문, 입력 검증 및 강력한 암호 정책을 사용하여 SQL 주입 및 무차별 적 공격에 대한 MySQL 보안에 대해 논의합니다 (159 자)

See all articles