1.任務節點
typedef void (*cb_fun)(void *); //任务结构体 typedef struct task { void *argv; //任务函数的参数(任务执行结束前,要保证参数地址有效) cb_fun handler; //任务函数(返回值必须为0 非0值用作增加线程,和销毁线程池) struct task *next; //任务链指针 }zoey_task_t;
handler為函數指針,是實際的任務函數,argv為該函數的參數,next指向下一個任務。
2.任務隊列
typedef struct task_queue { zoey_task_t *head; //队列头 zoey_task_t **tail; //队列尾 unsigned int maxtasknum; //最大任务限制 unsigned int curtasknum; //当前任务数 }zoey_task_queue_t;
head為任務隊列頭指針,tail為任務隊列尾指針,maxtasknum為隊列最大任務數限制,curtasknum為隊列當前任務數。
3.執行緒池
typedef struct threadpool { pthread_mutex_t mutex; //互斥锁 pthread_cond_t cond; //条件锁 zoey_task_queue_t tasks;//任务队列 unsigned int threadnum; //线程数 unsigned int thread_stack_size; //线程堆栈大小 }zoey_threadpool_t;
mutex為互斥鎖定 cond為條件鎖定。 mutex和cond共同保證線程池任務的互斥領取或新增。
tasks指向任務佇列。
threadnum為執行緒池的執行緒數
thread_stack_size為執行緒堆疊大小
4.啟動設定##
//配置参数 typedef struct threadpool_conf { unsigned int threadnum; //线程数 unsigned int thread_stack_size;//线程堆栈大小 unsigned int maxtasknum;//最大任务限制 }zoey_threadpool_conf_t;
4.啟動設定
##zoey_threadpool_t* zoey_threadpool_init(zoey_threadpool_conf_t *conf) { zoey_threadpool_t *pool = null; int error_flag_mutex = 0; int error_flag_cond = 0; pthread_attr_t attr; do{ if (z_conf_check(conf) == -1){ //检查参数是否合法 break; } pool = (zoey_threadpool_t *)malloc(sizeof(zoey_threadpool_t));//申请线程池句柄 if (pool == null){ break; } //初始化线程池基本参数 pool->threadnum = conf->threadnum; pool->thread_stack_size = conf->thread_stack_size; pool->tasks.maxtasknum = conf->maxtasknum; pool->tasks.curtasknum = 0; z_task_queue_init(&pool->tasks); if (z_thread_key_create() != 0){//创建一个pthread_key_t,用以访问线程全局变量。 free(pool); break; } if (z_thread_mutex_create(&pool->mutex) != 0){ //初始化互斥锁 z_thread_key_destroy(); free(pool); break; } if (z_thread_cond_create(&pool->cond) != 0){ //初始化条件锁 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); free(pool); break; } if (z_threadpool_create(pool) != 0){ //创建线程池 z_thread_key_destroy(); z_thread_mutex_destroy(&pool->mutex); z_thread_cond_destroy(&pool->cond); free(pool); break; } return pool; }while(0); return null; }
」啟動設定結構 體是初始化執行緒池時的一些參數。
5.初始化執行緒池先檢查參數是否合法,然後初始化mutex,cond,key(pthread_key_t)。 key用來讀寫執行緒全域變量,此全域變數控制執行緒是否退出。
最後建立執行緒。 int zoey_threadpool_add_task(zoey_threadpool_t *pool, cb_fun handler, void* argv)
{
zoey_task_t *task = null;
//申请一个任务节点并赋值
task = (zoey_task_t *)malloc(sizeof(zoey_task_t));
if (task == null){
return -1;
}
task->handler = handler;
task->argv = argv;
task->next = null;
if (pthread_mutex_lock(&pool->mutex) != 0){ //加锁
free(task);
return -1;
}
do{
if (pool->tasks.curtasknum >= pool->tasks.maxtasknum){//判断工作队列中的任务数是否达到限制
break;
}
//将任务节点尾插到任务队列
*(pool->tasks.tail) = task;
pool->tasks.tail = &task->next;
pool->tasks.curtasknum++;
//通知阻塞的线程
if (pthread_cond_signal(&pool->cond) != 0){
break;
}
//解锁
pthread_mutex_unlock(&pool->mutex);
return 0;
}while(0);
pthread_mutex_unlock(&pool->mutex);
free(task);
return -1;
}
先申請一個任務節點,實例化後將節點加入任務佇列,並將目前任務佇列數並通知其他行程有新任務。整個過程加鎖。 void zoey_threadpool_destroy(zoey_threadpool_t *pool)
{
unsigned int n = 0;
volatile unsigned int lock;
//z_threadpool_exit_cb函数会使对应线程退出
for (; n < pool->threadnum; n++){
lock = 1;
if (zoey_threadpool_add_task(pool, z_threadpool_exit_cb, &lock) != 0){
return;
}
while (lock){
usleep(1);
}
}
z_thread_mutex_destroy(&pool->mutex);
z_thread_cond_destroy(&pool->cond);
z_thread_key_destroy();
free(pool);
}
銷毀執行緒池其實也是向任務佇列新增任務,只不過新增的任務是讓執行緒退出。 z_threadpool_exit_cb函數會將lock置0後退出線程,lock為0表示此線程
# 已經退出,接著退出下一個線程。退出完執行緒釋放所有資源。 int zoey_thread_add(zoey_threadpool_t *pool)
{
int ret = 0;
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
ret = z_thread_add(pool);
pthread_mutex_unlock(&pool->mutex);
return ret;
}
很簡單,再產生一個執行緒以及執行緒數 即可。加鎖。 void zoey_set_max_tasknum(zoey_threadpool_t *pool,unsigned int num)
{
if (pthread_mutex_lock(&pool->mutex) != 0){
return -1;
}
z_change_maxtask_num(pool, num); //改变最大任务限制
pthread_mutex_unlock(&pool->mutex);
}
# 當num=0時設定執行緒數為無限大。 以上是nginx線程池源碼是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!int main()
{
int array[10000] = {0};
int i = 0;
zoey_threadpool_conf_t conf = {5,0,5}; //实例化启动参数
zoey_threadpool_t *pool = zoey_threadpool_init(&conf);//初始化线程池
if (pool == null){
return 0;
}
for (; i < 10000; i++){
array[i] = i;
if (i == 80){
zoey_thread_add(pool); //增加线程
zoey_thread_add(pool);
}
if (i == 100){
zoey_set_max_tasknum(pool, 0); //改变最大任务数 0为不做上限
}
while(1){
if (zoey_threadpool_add_task(pool, testfun, &array[i]) == 0){
break;
}
printf("error in i = %d\n",i);
}
}
zoey_threadpool_destroy(pool);
while(1){
sleep(5);
}
return 0;
}