Queue模組及原始碼分析
Queue模組是提供佇列操作的模組,佇列是執行緒間最常用的交換資料的形式。此模組提供了三種佇列:
Queue.Queue(maxsize):先進先出,maxsize是佇列的大小,其值為非正數時為無線循環佇列
Queue.LifoQueue(maxsize):後進先出,相當於堆疊
Queue.PriorityQueue(maxsize):優先權佇列。
其中LifoQueue,PriorityQueue是Queue的子類。三者擁有以下共同的方法:
qsize():傳回近似的隊列大小。為什麼要加「近似」二字呢?因為當該值大於0的時候並不保證並發執行的時候get()方法不被阻塞,同樣,對於put()方法有效。
empty():傳回布林值,佇列為空時,傳回True,反之回傳False。
full():當設定了佇列大小的時候,如果佇列滿了,則傳回True,否則傳回False。
put(item[,block[,timeout]]):向佇列裡加入元素item,block設定為False的時候,如果佇列滿了則拋出Full異常。如果block設定為True,timeout設定為None時,則會一種等到有空位的時候再加入進隊列;否則會根據timeout設定的逾時值拋出Full異常。
put_nowwait(item):等價與put(item,False)。 block設定為False的時候,如果隊列為空,則拋出Empty異常。如果block設定為True,timeout設定為None時,則會一種等到有空位的時候再加入進隊列;否則會根據timeout設定的逾時值拋出Empty異常。
get([block[,timeout]]):從佇列中刪除元素並傳回該元素的值,如果timeout是一個正數,它會阻塞最多超時秒數,並且如果在該時間內沒有可用的項目,則引發Empty異常。
get_nowwait():等價於get(False)
task_done():發送訊號表示入列任務已完成,經常在消費者執行緒中使用。
join():阻塞直至隊列所有元素處理完畢,然後再處理其它操作。
(一)原始碼分析
Queue模組用起來很簡單很簡單,但我覺得有必要把該模組的相關原始碼貼出來分析下,會學到不少東西,看看大神們寫的程式碼多麼美觀,多麼結構化模組化,再想想自己寫的程式碼,都是淚呀,來學習學習。為了縮減篇幅,原始碼的註解部分被刪減掉。
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
透過後面的幾個函數分析知道,Queue物件是在collections模組的queue基礎上(關於collections模組參考 Python:使用Counter進行計數統計及collections模組),加上threading模組模組互斥的。
deque是一個雙端佇列,很適用於佇列和堆疊。上面的Queue物件就是一個先進先出的佇列,所以首先_init()函數定義了一個雙端佇列,然後它的定義了_put()和_get()函數,它們分別是從雙端佇列右邊增加元素、左邊刪除元素,這就構成了一個先進先出隊列,同理很容易想到LifoQueue(後進先出隊列)的實現了,保證隊列右邊添加右邊刪除就可以。可以貼出原始碼看看。
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
雖然它的"queue"沒有用queue(),用列表也是一樣的,因為列表append()和pop()操作是在最右邊添加元素和刪除最右邊元素。
再來看看PriorityQueue,他是個優先權隊列,這裡用到了heapq模組的heappush()和heappop()兩個函數。 heapq模組對堆這種資料結構進行了模組化,可以建立這種資料結構,同時heapq模組也提供了相應的方法來對堆做操作。其中_init()函數裡self.queue=[]可以看成是建立了一個空堆。 heappush() 往堆中插入一條新的值 ,heappop() 從堆中彈出最小值 ,這就可以實現優先權(關於heapq模組這裡這是簡單的介紹)。原始碼如下:
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
基本的資料結構分析完了,接著分析其它的部分。
mutex 是個threading.Lock()對象,是互斥鎖;not_empty、 not_full 、all_tasks_done這三個都是threading.Condition()對象,條件變量,而且維護的是同一把鎖對象mutex(關於threading模組中Lock物件與Condition物件可參考上篇部落格文章Python:執行緒、進程與協程(2)-threading模組)。
其中:
self.mutex互斥鎖:任何獲取隊列的狀態(empty(),qsize()等),或者修改隊列的內容的操作(get,put等)都必須持有該互斥鎖。 acquire()取得鎖,release()釋放鎖。同時此互斥鎖被三個條件變數共同維護。
self.not_empty條件變數:執行緒新增資料到佇列後,會呼叫self.not_empty.notify()通知其它線程,然後喚醒一個移除元素的執行緒。
self.not_full條件變數:當一個元素被移除出佇列時,會喚醒一個已加入元素的執行緒。
self.all_tasks_done條件變數:在未完成任務的數量被刪除至0時,通知所有任務完成
self.unfinished_tasks :定義未完成任務數量
源代码如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full获得锁 try: if self.maxsize > 0: #如果队列长度有限制 if not block: #如果没阻塞 if self._qsize() == self.maxsize: #如果队列满了抛异常 raise Full elif timeout is None: #有阻塞且超时为空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到时后,抛异常 raise Full #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出 self.not_full.wait(remaining) self._put(item) #调用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任务+1 self.not_empty.notify() #通知非空,唤醒非空挂起的任务 finally: self.not_full.release() #not_full释放锁
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
(2)get()
源码如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty获得锁 try: if not block: #不阻塞时 if not self._qsize(): #队列为空时抛异常 raise Empty elif timeout is None: #不限时时,队列为空则会等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #调用_get方法,移除并获得项目 self.not_full.notify() #通知非满 return item #返回项目 finally: self.not_empty.release() #释放锁
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
(3)task_done()
源码如下:
def task_done(self): self.all_tasks_done.acquire() #获得锁 try: unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成 if unfinished <= 0: #是则进行通知,或在过量调用时报异常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否则未完成任务数量-1 finally: self.all_tasks_done.release() #最后释放锁
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
(4)join()
源码如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
(二)简单例子
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生产数据""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#队列长度小于等于7时添加元素 is_product = True for i in range(2): #每次向队列里添加两个元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #将数据依次存入队列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 将数据依次存入队列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#队列长度大于7时开始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()

熱AI工具

Undresser.AI Undress
人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover
用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool
免費脫衣圖片

Clothoff.io
AI脫衣器

AI Hentai Generator
免費產生 AI 無盡。

熱門文章

熱工具

記事本++7.3.1
好用且免費的程式碼編輯器

SublimeText3漢化版
中文版,非常好用

禪工作室 13.0.1
強大的PHP整合開發環境

Dreamweaver CS6
視覺化網頁開發工具

SublimeText3 Mac版
神級程式碼編輯軟體(SublimeText3)

熱門話題

HadiDB:輕量級、高水平可擴展的Python數據庫HadiDB(hadidb)是一個用Python編寫的輕量級數據庫,具備高度水平的可擴展性。安裝HadiDB使用pip安裝:pipinstallhadidb用戶管理創建用戶:createuser()方法創建一個新用戶。 authentication()方法驗證用戶身份。 fromhadidb.operationimportuseruser_obj=user("admin","admin")user_obj.

直接通過 Navicat 查看 MongoDB 密碼是不可能的,因為它以哈希值形式存儲。取回丟失密碼的方法:1. 重置密碼;2. 檢查配置文件(可能包含哈希值);3. 檢查代碼(可能硬編碼密碼)。

2小時內可以學會Python的基本編程概念和技能。 1.學習變量和數據類型,2.掌握控制流(條件語句和循環),3.理解函數的定義和使用,4.通過簡單示例和代碼片段快速上手Python編程。

Python在web開發、數據科學、機器學習、自動化和腳本編寫等領域有廣泛應用。 1)在web開發中,Django和Flask框架簡化了開發過程。 2)數據科學和機器學習領域,NumPy、Pandas、Scikit-learn和TensorFlow庫提供了強大支持。 3)自動化和腳本編寫方面,Python適用於自動化測試和系統管理等任務。

MySQL數據庫性能優化指南在資源密集型應用中,MySQL數據庫扮演著至關重要的角色,負責管理海量事務。然而,隨著應用規模的擴大,數據庫性能瓶頸往往成為製約因素。本文將探討一系列行之有效的MySQL性能優化策略,確保您的應用在高負載下依然保持高效響應。我們將結合實際案例,深入講解索引、查詢優化、數據庫設計以及緩存等關鍵技術。 1.數據庫架構設計優化合理的數據庫架構是MySQL性能優化的基石。以下是一些核心原則:選擇合適的數據類型選擇最小的、符合需求的數據類型,既能節省存儲空間,又能提升數據處理速度

作為數據專業人員,您需要處理來自各種來源的大量數據。這可能會給數據管理和分析帶來挑戰。幸運的是,兩項 AWS 服務可以提供幫助:AWS Glue 和 Amazon Athena。

啟動 Redis 服務器的步驟包括:根據操作系統安裝 Redis。通過 redis-server(Linux/macOS)或 redis-server.exe(Windows)啟動 Redis 服務。使用 redis-cli ping(Linux/macOS)或 redis-cli.exe ping(Windows)命令檢查服務狀態。使用 Redis 客戶端,如 redis-cli、Python 或 Node.js,訪問服務器。

要從 Redis 讀取隊列,需要獲取隊列名稱、使用 LPOP 命令讀取元素,並處理空隊列。具體步驟如下:獲取隊列名稱:以 "queue:" 前綴命名,如 "queue:my-queue"。使用 LPOP 命令:從隊列頭部彈出元素並返回其值,如 LPOP queue:my-queue。處理空隊列:如果隊列為空,LPOP 返回 nil,可先檢查隊列是否存在再讀取元素。
