1. Task node
typedef void (*cb_fun)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数地址有效) cb_fun handler; //任务函数(返回值必须为0 非0值用作增加线程,和销毁线程池) struct task *next; //任务链指针 }zoey_task_t;
Handler is a function pointer, which is the actual task function, argv is the parameter of the function, and next points to the next task.
2. Task Queue
typedef struct task_queue { zoey_task_t *head; //队列头 zoey_task_t **tail; //队列尾 unsigned int maxtasknum; //最大任务限制 unsigned int curtasknum; //当前任务数 }zoey_task_queue_t;
Head is the head pointer of the task queue, tail is the tail pointer of the task queue, maxtasknum is the maximum number of tasks in the queue, and curtasknum is the current task of the queue. number.
3. Thread pool
typedef struct threadpool { pthread_mutex_t mutex; //互斥锁 pthread_cond_t cond; //条件锁 zoey_task_queue_t tasks;//任务队列 unsigned int threadnum; //线程数 unsigned int thread_stack_size; //线程堆栈大小 }zoey_threadpool_t;
mutex is a mutex lock and cond is a conditional lock. Mutex and cond jointly ensure the mutual exclusion of thread pool tasks to receive or add.
Tasks points to the task queue.
Threadnum is the number of threads in the thread pool
Thread_stack_size is the thread stack size
4. Startup configuration
//配置参数 typedef struct threadpool_conf { unsigned int threadnum; //线程数 unsigned int thread_stack_size;//线程堆栈大小 unsigned int maxtasknum;//最大任务限制 }zoey_threadpool_conf_t;
Startup configuration structure The body is some parameters when initializing the thread pool.
5. Initialize the thread pool
First check whether the parameters are legal, and then initialize mutex, cond, key (pthread_key_t). The key is used to read and write thread global variables. This global variable controls whether the thread exits.
Finally create the thread.
zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf) { zoey_threadpool_t *pool = null; int error_flag_mutex = 0; int error_flag_cond = 0; pthread_attr_t attr; do{ if (z_conf_check(conf) == -1){ //检查参数是否合法 break; } pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申请线程池句柄 if (pool == null){ break; } //初始化线程池基本参数 pool->threadnum = conf->threadnum; pool->thread_stack_size = conf->thread_stack_size; pool->tasks.maxtasknum = conf->maxtasknum; pool->tasks.curtasknum = 0; z_task_queue_init(&pool->tasks); if (z_thread_key_create() != 0){//创建一个pthread_key_t,用以访问线程全局变量。 free(pool); break; } if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥锁 z_thread_key_destroy(); free(pool); break; } if (z_thread_cond_create(&pool->cond) != 0){ //初始化条件锁 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); free(pool); break; } if (z_threadpool_create(pool) != 0){ //创建线程池 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); free(pool); break; } return pool; }while(0); return null; }
6. Add a task
First apply for a task node, add the node to the task queue after instantiation, add the current task queue number and notify other processes of new tasks. Task. The entire process is locked.
int zoey_threadpool_add_task(zoey_threadpool_t *pool, cb_fun handler, void* argv) { zoey_task_t *task = null; //申请一个任务节点并赋值 task = (zoey_task_t *)malloc(sizeof(zoey_task_t)); if (task == null){ return -1; } task->handler = handler; task->argv = argv; task->next = null; if (pthread_mutex_lock(&pool->mutex) != 0){ //加锁 free(task); return -1; } do{ if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判断工作队列中的任务数是否达到限制 break; } //将任务节点尾插到任务队列 *(pool->tasks.tail) = task; pool->tasks.tail = &task->next; pool->tasks.curtasknum++; //通知阻塞的线程 if (pthread_cond_signal(&pool->cond) != 0){ break; } //解锁 pthread_mutex_unlock(&pool->mutex); return 0; }while(0); pthread_mutex_unlock(&pool->mutex); free(task); return -1; }
7. Destroy the thread pool
Destroying the thread pool is actually adding tasks to the task queue, but the added task is to let the thread exit. The z_threadpool_exit_cb function will set the lock to 0 and then exit the thread. A lock of 0 means that this thread
has exited, and then exits the next thread. After exiting the thread, all resources are released.
void zoey_threadpool_destroy(zoey_threadpool_t *pool) { unsigned int n = 0; volatile unsigned int lock; //z_threadpool_exit_cb函数会使对应线程退出 for (; n < pool->threadnum; n++){ lock = 1; if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){ return; } while (lock){ usleep(1); } } z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); z_thread_key_destroy(); free(pool); }
8. Add a thread
It’s very simple, just generate a thread and the number of threads. Lock.
int zoey_thread_add(zoey_threadpool_t *pool) { int ret = 0; if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } ret = z_thread_add(pool); pthread_mutex_unlock(&pool->mutex); return ret; }
9. Change the maximum task limit of the task queue
When num=0, set the number of threads to infinite.
void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num) { if (pthread_mutex_lock(&pool->mutex) != 0){ return -1; } z_change_maxtask_num(pool, num); //改变最大任务限制 pthread_mutex_unlock(&pool->mutex); }
10.Usage examples
int main() { int array[10000] = {0}; int i = 0; zoey_threadpool_conf_t conf = {5,0,5}; //实例化启动参数 zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化线程池 if (pool == null){ return 0; } for (; i < 10000; i++){ array[i] = i; if (i == 80){ zoey_thread_add(pool); //增加线程 zoey_thread_add(pool); } if (i == 100){ zoey_set_max_tasknum(pool, 0); //改变最大任务数 0为不做上限 } while(1){ if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){ break; } printf("error in i = %d\n",i); } } zoey_threadpool_destroy(pool); while(1){ sleep(5); } return 0; }
The above is the detailed content of What is the source code of nginx thread pool?. For more information, please follow other related articles on the PHP Chinese website!