1. The three most important synchronization mechanisms of threads
1. Semaphores
2. Mutex locks
3. Condition variables
2. Implement a wrapper class for each of the three synchronization mechanisms
#ifdef LOCKER_H #define LOCKER_H #include <pthread.h> #include <semaphore.h> /*信号量的封装*/ class sem { public: sem() { if( sem_init( &sem_like, 0, 0)) { throw std::exception(); } } ~sem() { sem_destroy( &sem_like); } bool wait() { return sem_wait( &sem_like)== 0; } bool post() { return sem_post( &sem_like)== 0; } private: sem_t sem_like; } /*互斥锁的封装*/ class locker { public: locker() { if( pthread_mutex_init( &mutex_like,NULL) !=0) { throw std::exception(); } } ~locker() { pthread_mutex_destroy( &mutex_like); } bool lock() { return pthread_mutex_lock( &mutex_like)== 0; } bool unlock() { return pthread_mutex_unlock( &mutex_like); } private: pthread_mutex_t mutex_like; } /*条件变量的封装*/ class cond { public: cond() { if( pthread_mutex_init( &mutex_like,NULL)!= 0) { throw std::exception; } if( pthread_cond_init( &cond_like, NULL)!= 0) { //释放对应的互斥锁 pthread_mutex_destroy( &mutex_like); throw std::exception; } } ~cond() { pthread_mutex_destroy( &mutex_like); pthread_cond_destroy( &cond_like); } bool wait() { int flag= 0; pthread_mutex_lock( &mutex_like); flag= pthread_cond_wait( &cond_like, &mutex_like); pthread_mutex_unlock( &mutex_like); return flag== 0; } bool signal() { return pthread_cond_signal( &cond_like)== 0; } private: pthread_mutex_t mutex_like; pthread_cond_t cond_like; } #endif
here Use a list container to store all requests, and request processing is in the order of fifo
#ifndef THREADPOOL_H #define THREADPOOL_H #include <list> #include <cstdio> #include <exception> #include <pthread.h> #include "locker.h" template< typename T > class threadpool { public: threadpool( int thread_number = 8, int max_requests = 10000 ); ~threadpool(); bool append( T* request ); private: static void* worker( void* arg ); void run(); private: int thread_number_like;//当前线程池中的线程个数 int max_requests_like;//最大请求数 //pthread_t* threads_like; vector< pthread> threads_like;//线程容器 std::list< T* > workqueue_like;//请求队列 locker queuelocker_like;//请求队列的访问互斥锁 sem queuestat_like;//用于请求队列与空闲线程同步的信号量 bool stop_like;//结束所有线程,线程池此时没有线程 }; template< typename T > threadpool< T >::threadpool( int thread_number, int max_requests ) : m_thread_number( thread_number ), m_max_requests( max_requests ), m_stop( false ), m_threads( NULL ) { if( ( thread_number <= 0 ) || ( max_requests <= 0 ) ) { throw std::exception(); } threads_like.resize( thread_number_like); if( thread_number_like!= threads_like.size() ) { throw std::exception(); } for ( int i = 0; i < thread_number_like; ++i ) { printf( "create the %dth thread\n", i ); if( pthread_create( &threads_like [i], NULL, worker, this ) != 0 )//创建线程 { threads_like.resize(0); throw std::exception(); } if( pthread_detach( m_threads[i] ) )//设置为脱离线程 { threads_like.resize(0); throw std::exception(); } } } template< typename T > threadpool< T >::~threadpool() { stop_like = true; } template< typename T > bool threadpool< T >::append( T* request ) { queuelocker_like.lock(); if ( workqueue_like.size() > max_requests_like ) { queuelocker_like.unlock(); return false; } workqueue_like.push_back( request ); queuelocker_like.unlock(); queuestat_like.post(); return true; } template< typename T > void* threadpool< T >::worker( void* arg ) { threadpool* pool = ( threadpool* )arg;//静态函数要调用动态成员run,必须通过参数arg得到 pool->run();//线程的执行体 return pool; } template< typename T > void threadpool< T >::run() { while ( ! m_stop ) { queuestat_like.wait(); queuelocker_like.lock(); if ( workqueue_like.empty() ) { queuelocker_like.unlock(); continue; } T* request = workqueue_like.front(); workqueue_like.pop_front(); queuelocker_like.unlock(); if ( ! request ) { continue; } request->process();//执行当前请求所对应的处理函数 } } #endif
Note: 1. In the thread pool model here, each thread corresponds to a request
2. This method ensures the timeliness of user requests processing, the performance requirements for the request processing function are smaller, because this model does not require the request processing process to be non-blocking, because the processing delay of one request will not affect the system's processing of other requests (of course the number of threads must be dynamic Increase).
3. This method is not optimal for high-concurrency servers. A method similar to nginx in which one process responds to multiple user requests is more advantageous. The nginx model has two main advantages: 1. The number of processes is fixed and does not It will take up too much memory because there are many threads or processes at the same time. 2: The number of nginx working processes is generally consistent with the number of CPU cores, and a process can be bound to a core, thus saving the system overhead caused by process switching or thread switching