執行緒分類
執行緒依照其調度者可以分為使用者級執行緒和核心級執行緒兩種。
(1)用戶級線程
用戶級線程主要解決的是上下文切換的問題,它的調度演算法和調度過程全部由用戶自行選擇決定,在運行時不需要特定的核心支援。在這裡,作業系統往往會提供一個用戶空間的線程庫,該線程庫提供了線程的創建、調度、撤銷等功能,而核心仍然僅對進程進行管理。如果一個行程中的某一個執行緒調用了一個阻塞的系統調用,那麼該行程包含該行程中的其他所有執行緒也同時被阻塞。這種使用者級執行緒的主要缺點是在一個行程中的多個執行緒的調度中無法發揮多處理器的優勢。
(2)核心級線程
這種執行緒允許不同進程中的執行緒按照相同相對優先調度方法進行調度,這樣就可以發揮多處理器的並發優勢。
現在大多數系統都採用使用者級執行緒與核心級執行緒並存的方法。一個使用者級線程可以對應一個或幾個核心級線程,也就是「一對一」或「多對一」模型。這樣既可滿足多處理機系統的需要,也可以盡量減少調度開銷。
Linux的執行緒實作是在核外進行的,核內提供的是建立進程的介面do_fork()。核心提供了兩個系統呼叫clone()和fork(),最後都用不同的參數呼叫do_fork()核內API。當然,要實現線程,沒有核心對多進程(其實是輕量級進程)共享資料段的支援是不行的,因此,do_fork()提供了很多參數,包括CLONE_VM(共享記憶體空間)、CLONE_FS(共享文件系統資訊)、 CLONE_FILES(共享檔案描述符表)、CLONE_SIGHAND(共享訊號句柄表)和CLONE_PID(共享進程ID,僅對核內進程,即0號進程有效)。當使用fork系統呼叫時,核心呼叫do_fork()不使用任何共享屬性,進程擁有獨立的運行環境,而使用 pthread_create()來建立執行緒時,則最終設定了所有這些屬性來呼叫__clone(),而這些參數又全部傳給核內的do_fork(),從而創建的「進程」擁有共享的運行環境,只有棧是獨立的,由__clone()傳入。
Linux執行緒在核內是以輕量級進程的形式存在的,擁有獨立的進程表項,而所有的創建、同步、刪除等操作都在核外pthread庫中進行。 pthread 函式庫使用一個管理執行緒(__pthread_manager(),每個行程獨立且唯一)來管理執行緒的建立和終止,為執行緒分配執行緒ID,發送執行緒相關的訊號(例如Cancel),而主執行緒(pthread_create())的呼叫者則透過管道將請求訊息傳給管理線程。
主要函數說明
1.執行緒的建立與退出
pthread_create 執行緒建立函數
int pthread_create (pthread_t * thread_id,__const
pthread_attr_t * __attr,void *(*__start_routine) (void *),void *__restrict
__arg);
線程創建函數第一個參數為指向線程標識符的指針,第二個參數用來設定線程屬性,第三個參數是線程運行函數的起始地址,最後一個參數是運行函數的參數。這裡,我們的函數thread 不需要參數,所以最後一個參數設為空指標。第二個參數我們也設為空指針,這樣會產生預設屬性的線程。當創建線程成功時,函數返回0,若不為0 則表示建立執行緒失敗,常見的錯誤回傳代碼為EAGAIN 和EINVAL。前者表示系統限制創建新的線程,例如線程數目過多了;後者表示第二個參數代表的線程屬性值非法。建立執行緒成功後,新建立的執行緒則執行參數三和參數四確定的函數,原來的執行緒繼續執行下一行程式碼。
pthread_join 函數,來等待一個執行緒的結束。
函數原型為:int pthread_join (pthread_t __th, void
**__thread_return)
第一個參數為被等待的執行緒標識符,第二個參數為一個使用者定義的指針,它可以用來儲存被等待執行緒的回傳值。這個函數是一個執行緒阻塞的函數,呼叫它的函數會一直等待到被等待的執行緒結束為止,當函數回傳時,被等待執行緒的資源被收回。執行緒只能被一個執行緒等待終止,並且應處於joinable狀態(非detached)。
pthread_exit
函數
一個執行緒的結束有兩種途徑,一種是執行緒運行的函數結束了,呼叫它的執行緒也就結束了;
另一種方式是透過函數pthread_exit
來實現。它的函數原型為:void pthread_exit (void *__retval)唯一的參數是函數的回傳程式碼,只要pthread_join
中的第二個參數thread_return
不是NULL,這個值會被傳遞給thread_return。最後要說明的是,一個執行緒不能被多個執行緒等待,否則第一個接收到訊號的執行緒成功返回,其餘呼叫pthread_join
的執行緒則回傳錯誤代碼ESRCH。
2.線程屬性
pthread_create函數的第二個參數執行緒的屬性。將該值設為NULL,也就是採用預設屬性,執行緒的多項屬性都是可以更改的。這些屬性主要包括綁定屬性、分離屬性、堆疊位址、堆疊大小、優先權。其中系統預設的屬性為非綁定、非分離、缺省1M
的堆疊、與父行程同樣層級的優先權。以下首先對綁定屬性和分離屬性的基本概念進行解說。
綁定屬性:Linux中採用「一對一」的執行緒機制,也就是一個使用者執行緒對應一個核心執行緒。綁定屬性就是指一個使用者執行緒固定地分配給一個核心線程,因為CPU時間片的調度是面向核心線程
(也就是輕量級進程)的,因此具有綁定屬性的執行緒可以保證在需要的時候總有一個核心執行緒與之對應。而與之相對的非綁定屬性就是指使用者執行緒和核心執行緒的關係不是始終固定的,而是由系統來控制分配的。
分離屬性:分離屬性是用來決定一個執行緒以什麼樣的方式來終止自己。在非分離情況下,當一個執行緒結束時,它所佔用的系統資源並沒有被釋放,也就是沒有真正的終止。只有當pthread_join()函數傳回時,建立的執行緒才能釋放自己佔用的系統資源。而在分離屬性情況下,在一個執行緒結束時立即釋放它所佔有的系統資源。
這裡要注意的一點是,如果設定一個執行緒的分離屬性,而這個執行緒運行又非常快,那麼它很可能在pthread_create
函數回傳之前就終止了,它終止以後就可能將執行緒號和系統資源移交給其他的執行緒使用,這時呼叫pthread_create 的執行緒就得到了錯誤的執行緒號。
設定綁定屬性:
int pthread_attr_init(pthread_attr_t *attr)
int
pthread_attr_setscope(pthread_attr_t *attr, int scope)
int
pthread_attr_getscope(pthread_attr_t *tattr, int
*scope)
scope:PTHREAD_SCOPE_SYSTEM:綁定,此執行緒與系統中所有的執行緒競爭
PTHREAD_SCOPE_PROCESS:非綁定,此執行緒與進程中的其他執行緒競爭
設定分離屬性:
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate)
int
pthread_attr_getdetachstate(const pthread_attr_t *tattr,int
*detachstate)
detachstate PTHREAD_CREATE_DETACHED:分離 PTHREAD
_CREATE_JOINABLE:非分離
設定排程策略:
int pthread_attr_setschedpolicy(pthread_attr_t * tattr, int policy)
int
pthread_attr_getschedpolicy(pthread_attr_t * tattr, int *policy)
policy
SCHED_FIFO:先入先出 SCHED_RR:循環 SCHED_OTHER:實作定義的方法
設定優先權:
int pthread_attr_setschedparam (pthread_attr_t *attr, struct sched_param_setschedparam (pthread_attr_t *attr, struct sched_param
*param)
int pthread_attr_getschedparam (pthread_attr_t *attr, struct
sched_param *param)
3.執行緒存取控制
1)互斥鎖(mutex)
透過鎖定機制實現執行緒間的同步。同一時刻只允許一個執行緒執行一個關鍵部分的程式碼。
1 int pthread_mutex_init(pthread_mutex_t *mutex,const pthread_mutex_attr_t
*mutexattr);
2 int pthread_mutex_lock(pthread_mutex_t *mutex);
3 int
pthread_mutex_unlock(pthread_mutex_t *mutex);
4 int
pthread_mutex_destroy(pthread_mutex_t *mutex);
(1)先初始化鎖init()或靜態賦值pthread_mutex_t
mutex=PTHREAD_MUTEX_INITIALIER
(2)加鎖,lock,trylock,lock阻塞等待鎖,trylock立即返回EBUSY
(3)解鎖,unlock需滿足是加鎖狀態,且由加鎖線程解鎖
(4)清除鎖, destroy(此時鎖必需unlock,否則返回EBUSY)
mutex 分為遞歸(recursive) 和非遞歸(non-recursive)兩種,這是POSIX 的叫法,另外的名字是可重入(Reentrant)
與非可重入。這兩種mutex 作為線程間(inter-thread) 的同步工具時沒有區別,它們的惟一區別在於:同一個線程可以重複對recursive
mutex 加鎖,但不能重複對non-recursive mutex
加鎖。
首選非遞歸mutex,絕對不是為了性能,而是為了體現設計意圖。 non-recursive 和recursive
的效能差異其實不大,因為少用一個計數器,前者略快一點而已。在同一個線程裡多次對non-recursive mutex
加鎖會立刻導致死鎖,我認為這是它的優點,能幫助我們思考代碼對鎖的期求,並且及早(在編碼階段)發現問題。毫無疑問recursive mutex
使用起來要方便一些,因為不用考慮一個執行緒會自己把自己把自己鎖死了,我猜這也是Java 和Windows 預設提供recursive mutex 的原因。 (Java
語言自帶的intrinsic lock 是可重入的,它的concurrent
庫裡提供ReentrantLock,Windows的CRITICAL_SECTION 也是可重入的。似乎它們都不提供輕量級的non-recursive
mutex。 )
2)條件變數(cond)
利用執行緒間共享的全域變數進行同步的一種機制。
1 int pthread_cond_init(pthread_cond_t *cond,pthread_condattr_t *cond_attr);
2 int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);
3
int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_t *mutex,const
timespec *abstime);
4 int pthread_cond_destroy(pthread_cond_t *cond);
5
int pthread_cond_signal(pthread_cond_t *cond);
6 int
pthread_cond_broadcast(pthread_cond_t *cond); //解除所有執行緒的阻斷
(1)初始化. init()或pthread_cond_t
cond=PTHREAD_COND_INITIALIER;屬性置為NULL
(2)等待條件成立.
pthread_cond_wait,pthread_cond_timedwait.
wait()釋放鎖定,並阻塞等待條件變數為真
timedwait()設定等待時間,仍未signal,返回ETIMEOUT(加鎖保證只有一個執行緒wait)
(3)啟動條件變數:pthread_cond_signal,pthread_cond_broadcast(3)啟動條件變數:pthread_cond_signal,pthread_cond_broadcast(啟動所有等待執行緒)
(4)清除條件變數:destroy;
無執行緒等待,否則回傳EBUSY
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex); int pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);
這兩個函數一定要在mutex的鎖定區域內使用。
呼叫 pthread_cond_signal() 釋放被條件阻塞的執行緒時,如果沒有任何執行緒基於條件變數阻塞,則呼叫pthread_cond_signal()不起作用。而對於 Windows,當調用 SetEvent 觸發 Auto-reset 的 Event 條件時,如果沒有被條件阻塞的線程,那麼此函數仍然起作用,條件變數會處於觸發狀態。
Linux下生產者消費者問題(使用互斥鎖和條件變數):
#include <stdio.h> #include <stdlib.h> #include <time.h> #include "pthread.h" #define BUFFER_SIZE 16 struct prodcons { int buffer[BUFFER_SIZE]; pthread_mutex_t lock; //mutex ensuring exclusive access to buffer int readpos,writepos; //position for reading and writing pthread_cond_t notempty; //signal when buffer is not empty pthread_cond_t notfull; //signal when buffer is not full }; //initialize a buffer void init(struct prodcons* b) { pthread_mutex_init(&b->lock,NULL); pthread_cond_init(&b->notempty,NULL); pthread_cond_init(&b->notfull,NULL); b->readpos = 0; b->writepos = 0; } //store an integer in the buffer void put(struct prodcons* b, int data) { pthread_mutex_lock(&b->lock); //wait until buffer is not full while((b->writepos+1)%BUFFER_SIZE == b->readpos) { printf("wait for not full\n"); pthread_cond_wait(&b->notfull,&b->lock); } b->buffer[b->writepos] = data; b->writepos++; b->writepos %= BUFFER_SIZE; pthread_cond_signal(&b->notempty); //signal buffer is not empty pthread_mutex_unlock(&b->lock); } //read and remove an integer from the buffer int get(struct prodcons* b) { int data; pthread_mutex_lock(&b->lock); //wait until buffer is not empty while(b->writepos == b->readpos) { printf("wait for not empty\n"); pthread_cond_wait(&b->notempty,&b->lock); } data=b->buffer[b->readpos]; b->readpos++; b->readpos %= BUFFER_SIZE; pthread_cond_signal(&b->notfull); //signal buffer is not full pthread_mutex_unlock(&b->lock); return data; } #define OVER -1 struct prodcons buffer; void * producer(void * data) { int n; for(n=0; n<50; ++n) { printf("put-->%d\n",n); put(&buffer,n); } put(&buffer,OVER); printf("producer stopped\n"); return NULL; } void * consumer(void * data) { int n; while(1) { int d = get(&buffer); if(d == OVER) break; printf("get-->%d\n",d); } printf("consumer stopped\n"); return NULL; } int main() { pthread_t tha,thb; void * retval; init(&buffer); pthread_creare(&tha,NULL,producer,0); pthread_creare(&thb,NULL,consumer,0); pthread_join(tha,&retval); pthread_join(thb,&retval); return 0; }
3)信號量
如同進程一樣,線程也可以透過信號量來實現通信,雖然是輕量級的。
信號量函數的名字都以"sem_"打頭。線程使用的基本信號量函數有四個。
#include <semaphore.h> int sem_init(sem_t *sem , int pshared, unsigned int value);
這是對由sem指定的信號量進行初始化,設定好它的共享選項(linux只支援為0,即表示它是當前進程的局部信號量),然後給它一個初始值VALUE。
兩個原子操作函數:這兩個函數都要用一個由sem_init呼叫初始化的信號量物件的指標做參數。
int sem_wait(sem_t *sem); //给信号量减1,对一个值为0的信号量调用sem_wait,这个函数将会等待直到有其它线程使它不再是0为止。 int sem_post(sem_t *sem); //给信号量的值加1 int sem_destroy(sem_t *sem);
這個函數的作用是再我們用完信號量後都它進行清理。歸還自己佔有的一切資源。
用信號量實現生產者消費者:
這裡使用4個信號量,其中兩個信號量occupied和empty分別用於解決生產者和消費者線程之間的同步問題,pmut用於多個生產者之間互斥問題,cmut是用於多位消費者之間互斥問題。其中empty初始化為N(有界緩區的空間元數),occupied初始化為0,pmut和cmut初始化為1。
參考代碼:
#define BSIZE 64 typedef struct { char buf[BSIZE]; sem_t occupied; sem_t empty; int nextin; int nextout; sem_t pmut; sem_t cmut; }buffer_t; buffer_t buffer; void init(buffer_t * b) { sem_init(&b->occupied, 0, 0); sem_init(&b->empty,0, BSIZE); sem_init(&b->pmut, 0, 1); sem_init(&b->cmut, 0, 1); b->nextin = b->nextout = 0; } void producer(buffer_t *b, char item) { sem_wait(&b->empty); sem_wait(&b->pmut); b->buf[b->nextin] = item; b->nextin++; b->nextin %= BSIZE; sem_post(&b->pmut); sem_post(&b->occupied); } char consumer(buffer_t *b) { char item; sem_wait(&b->occupied); sem_wait(&b->cmut); item = b->buf[b->nextout]; b->nextout++; b->nextout %= BSIZE; sem_post(&b->cmut); sem_post(&b->empty); return item; }