Queue模块及源码分析
Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式。该模块提供了三种队列:
Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列
Queue.LifoQueue(maxsize):后进先出,相当于栈
Queue.PriorityQueue(maxsize):优先级队列。
其中LifoQueue,PriorityQueue是Queue的子类。三者拥有以下共同的方法:
qsize():返回近似的队列大小。为什么要加“近似”二字呢?因为当该值大于0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。
empty():返回布尔值,队列为空时,返回True,反之返回False。
full():当设定了队列大小的时候,如果队列满了,则返回True,否则返回False。
put(item[,block[,timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。
put_nowwait(item):等价与put(item,False)。block设置为False的时候,如果队列为空,则抛出Empty异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Empty异常。
get([block[,timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。
get_nowwait():等价于get(False)
task_done():发送信号表明入列任务已完成,经常在消费者线程中用到。
join():阻塞直至队列所有元素处理完毕,然后再处理其它操作。
(一)源码分析
Queue模块用起来很简单很简单,但我觉得有必要把该模块的相关源代码贴出来分析下,会学到不少东西,看看大神们写的代码多么美观,多么结构化模块化,再想想自己写的代码,都是泪呀,来学习学习。为了缩减篇幅,源码的注释部分被删减掉。
from time import time as _time try: import threading as _threading except ImportError: import dummy_threading as _threading from collections import deque import heapq __all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue'] class Empty(Exception): "Exception raised by Queue.get(block=0)/get_nowait()." pass class Full(Exception): "Exception raised by Queue.put(block=0)/put_nowait()." pass class Queue: def __init__(self, maxsize=0): self.maxsize = maxsize self._init(maxsize) self.mutex = _threading.Lock() self.not_empty = _threading.Condition(self.mutex) self.not_full = _threading.Condition(self.mutex) self.all_tasks_done = _threading.Condition(self.mutex) self.unfinished_tasks = def get_nowait(self): return self.get(False) def _init(self, maxsize): self.queue = deque() def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.popleft()
通过后面的几个函数分析知道,Queue对象是在collections模块的queue基础上(关于collections模块参考 Python:使用Counter进行计数统计及collections模块),加上threading模块互斥锁和条件变量封装的。
deque是一个双端队列,很适用于队列和栈。上面的Queue对象就是一个先进先出的队列,所以首先_init()函数定义了一个双端队列,然后它的定义了_put()和_get()函数,它们分别是从双端队列右边添加元素、左边删除元素,这就构成了一个先进先出队列,同理很容易想到LifoQueue(后进先出队列)的实现了,保证队列右边添加右边删除就可以。可以贴出源代码看看。
class LifoQueue(Queue): '''Variant of Queue that retrieves most recently added entries first.''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item): self.queue.append(item) def _get(self): return self.queue.pop()
虽然它的"queue"没有用queue(),用列表也是一样的,因为列表append()和pop()操作是在最右边添加元素和删除最右边元素。
再来看看PriorityQueue,他是个优先级队列,这里用到了heapq模块的heappush()和heappop()两个函数。heapq模块对堆这种数据结构进行了模块化,可以建立这种数据结构,同时heapq模块也提供了相应的方法来对堆做操作。其中_init()函数里self.queue=[]可以看作是建立了一个空堆。heappush() 往堆中插入一条新的值 ,heappop() 从堆中弹出最小值 ,这就可以实现优先级(关于heapq模块这里这是简单的介绍)。源代码如下:
class PriorityQueue(Queue): '''Variant of Queue that retrieves open entries in priority order (lowest first). Entries are typically tuples of the form: (priority number, data). ''' def _init(self, maxsize): self.queue = [] def _qsize(self, len=len): return len(self.queue) def _put(self, item, heappush=heapq.heappush): heappush(self.queue, item) def _get(self, heappop=heapq.heappop): return heappop(self.queue)
基本的数据结构分析完了,接着分析其它的部分。
mutex 是个threading.Lock()对象,是互斥锁;not_empty、 not_full 、all_tasks_done这三个都是threading.Condition()对象,条件变量,而且维护的是同一把锁对象mutex(关于threading模块中Lock对象和Condition对象可参考上篇博文Python:线程、进程与协程(2)——threading模块)。
其中:
self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。acquire()获取锁,release()释放锁。同时该互斥锁被三个条件变量共同维护。
self.not_empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,然后唤醒一个移除元素的线程。
self.not_full条件变量:当一个元素被移除出队列时,会唤醒一个添加元素的线程。
self.all_tasks_done条件变量 :在未完成任务的数量被删除至0时,通知所有任务完成
self.unfinished_tasks : 定义未完成任务数量
再来看看主要方法:
(1)put()
源代码如下:
def put(self, item, block=True, timeout=None): self.not_full.acquire() #not_full获得锁 try: if self.maxsize > 0: #如果队列长度有限制 if not block: #如果没阻塞 if self._qsize() == self.maxsize: #如果队列满了抛异常 raise Full elif timeout is None: #有阻塞且超时为空,等待 while self._qsize() == self.maxsize: self.not_full.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间 endtime = _time() + timeout while self._qsize() == self.maxsize: remaining = endtime - _time() if remaining <= 0.0: #到时后,抛异常 raise Full #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出 self.not_full.wait(remaining) self._put(item) #调用_put方法,添加元素 self.unfinished_tasks += 1 #未完成任务+1 self.not_empty.notify() #通知非空,唤醒非空挂起的任务 finally: self.not_full.release() #not_full释放锁
默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。
如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。
(2)get()
源码如下:
def get(self, block=True, timeout=None): self.not_empty.acquire() #not_empty获得锁 try: if not block: #不阻塞时 if not self._qsize(): #队列为空时抛异常 raise Empty elif timeout is None: #不限时时,队列为空则会等待 while not self._qsize(): self.not_empty.wait() elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: endtime = _time() + timeout while not self._qsize(): remaining = endtime - _time() if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) item = self._get() #调用_get方法,移除并获得项目 self.not_full.notify() #通知非满 return item #返回项目 finally: self.not_empty.release() #释放锁
逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。
不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。
(3)task_done()
源码如下:
def task_done(self): self.all_tasks_done.acquire() #获得锁 try: unfinished = self.unfinished_tasks - 1 #判断队列中一个线程的任务是否全部完成 if unfinished <= 0: #是则进行通知,或在过量调用时报异常 if unfinished < 0: raise ValueError('task_done() called too many times') self.all_tasks_done.notify_all() self.unfinished_tasks = unfinished #否则未完成任务数量-1 finally: self.all_tasks_done.release() #最后释放锁
这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。
(4)join()
源码如下:
def join(self): self.all_tasks_done.acquire() try: while self.unfinished_tasks: #如果有未完成的任务,将调用wait()方法等待 self.all_tasks_done.wait() finally: self.all_tasks_done.release()
阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。
其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。
(二)简单例子
实现一个线程不断生成一个随机数到一个队列中
实现一个线程从上面的队列里面不断的取出奇数
实现另外一个线程从上面的队列里面不断取出偶数
import random,threading,time from Queue import Queue is_product = True class Producer(threading.Thread): """生产数据""" def __init__(self, t_name, queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.full(): global is_product is_product = False else: if self.data.qsize() <= 7:#队列长度小于等于7时添加元素 is_product = True for i in range(2): #每次向队列里添加两个元素 randomnum=random.randint(1,99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) #将数据依次存入队列 time.sleep(1) print "deque length is %s"%self.data.qsize() else: if is_product: for i in range(2): # randomnum = random.randint(1, 99) print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum) self.data.put(randomnum,False) # 将数据依次存入队列 time.sleep(1) print "deque length is %s" % self.data.qsize() else: pass print "%s: %s finished!" %(time.ctime(), self.getName()) #Consumer thread class Consumer_even(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self,name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7:#队列长度大于7时开始取元素 val_even = self.data.get(False) if val_even%2==0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even) time.sleep(2) else: self.data.put(val_even) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass class Consumer_odd(threading.Thread): def __init__(self,t_name,queue): threading.Thread.__init__(self, name=t_name) self.data=queue def run(self): while 1: if self.data.qsize() > 7: val_odd = self.data.get(False) if val_odd%2!=0: print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd) time.sleep(2) else: self.data.put(val_odd) time.sleep(2) print "deque length is %s" % self.data.qsize() else: pass #Main thread def main(): queue = Queue(20) producer = Producer('Pro.', queue) consumer_even = Consumer_even('Con_even.', queue) consumer_odd = Consumer_odd('Con_odd.',queue) producer.start() consumer_even.start() consumer_odd.start() producer.join() consumer_even.join() consumer_odd.join() if __name__ == '__main__': main()

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



MySQL mempunyai versi komuniti percuma dan versi perusahaan berbayar. Versi komuniti boleh digunakan dan diubahsuai secara percuma, tetapi sokongannya terhad dan sesuai untuk aplikasi dengan keperluan kestabilan yang rendah dan keupayaan teknikal yang kuat. Edisi Enterprise menyediakan sokongan komersil yang komprehensif untuk aplikasi yang memerlukan pangkalan data yang stabil, boleh dipercayai, berprestasi tinggi dan bersedia membayar sokongan. Faktor yang dipertimbangkan apabila memilih versi termasuk kritikal aplikasi, belanjawan, dan kemahiran teknikal. Tidak ada pilihan yang sempurna, hanya pilihan yang paling sesuai, dan anda perlu memilih dengan teliti mengikut keadaan tertentu.

Hadidb: Pangkalan data Python yang ringan, tinggi, Hadidb (Hadidb) adalah pangkalan data ringan yang ditulis dalam Python, dengan tahap skalabilitas yang tinggi. Pasang HadIdb menggunakan pemasangan PIP: Pengurusan Pengguna PipInstallHadidB Buat Pengguna: CreateUser () Kaedah untuk membuat pengguna baru. Kaedah pengesahan () mengesahkan identiti pengguna. dariHadidb.OperationImportuserer_Obj = user ("admin", "admin") user_obj.

Tidak mustahil untuk melihat kata laluan MongoDB secara langsung melalui Navicat kerana ia disimpan sebagai nilai hash. Cara mendapatkan kata laluan yang hilang: 1. Tetapkan semula kata laluan; 2. Periksa fail konfigurasi (mungkin mengandungi nilai hash); 3. Semak Kod (boleh kata laluan Hardcode).

MySQL boleh berjalan tanpa sambungan rangkaian untuk penyimpanan dan pengurusan data asas. Walau bagaimanapun, sambungan rangkaian diperlukan untuk interaksi dengan sistem lain, akses jauh, atau menggunakan ciri -ciri canggih seperti replikasi dan clustering. Di samping itu, langkah -langkah keselamatan (seperti firewall), pengoptimuman prestasi (pilih sambungan rangkaian yang betul), dan sandaran data adalah penting untuk menyambung ke Internet.

Sambungan MySQL mungkin disebabkan oleh sebab -sebab berikut: Perkhidmatan MySQL tidak dimulakan, firewall memintas sambungan, nombor port tidak betul, nama pengguna atau kata laluan tidak betul, alamat pendengaran di my.cnf dikonfigurasi dengan tidak wajar, dan lain -lain. Langkah -langkah penyelesaian masalah termasuk: 1. 2. Laraskan tetapan firewall untuk membolehkan MySQL mendengar port 3306; 3. Sahkan bahawa nombor port adalah konsisten dengan nombor port sebenar; 4. Periksa sama ada nama pengguna dan kata laluan betul; 5. Pastikan tetapan alamat mengikat di my.cnf betul.

MySQL Workbench boleh menyambung ke MariaDB, dengan syarat bahawa konfigurasi adalah betul. Mula -mula pilih "MariaDB" sebagai jenis penyambung. Dalam konfigurasi sambungan, tetapkan host, port, pengguna, kata laluan, dan pangkalan data dengan betul. Apabila menguji sambungan, periksa bahawa perkhidmatan MariaDB dimulakan, sama ada nama pengguna dan kata laluan betul, sama ada nombor port betul, sama ada firewall membenarkan sambungan, dan sama ada pangkalan data itu wujud. Dalam penggunaan lanjutan, gunakan teknologi penyatuan sambungan untuk mengoptimumkan prestasi. Kesilapan biasa termasuk kebenaran yang tidak mencukupi, masalah sambungan rangkaian, dan lain -lain. Apabila kesilapan debugging, dengan teliti menganalisis maklumat ralat dan gunakan alat penyahpepijatan. Mengoptimumkan konfigurasi rangkaian dapat meningkatkan prestasi

Panduan Pengoptimuman Prestasi Pangkalan Data MySQL Dalam aplikasi yang berintensifkan sumber, pangkalan data MySQL memainkan peranan penting dan bertanggungjawab untuk menguruskan urus niaga besar-besaran. Walau bagaimanapun, apabila skala aplikasi berkembang, kemunculan prestasi pangkalan data sering menjadi kekangan. Artikel ini akan meneroka satu siri strategi pengoptimuman prestasi MySQL yang berkesan untuk memastikan aplikasi anda tetap cekap dan responsif di bawah beban tinggi. Kami akan menggabungkan kes-kes sebenar untuk menerangkan teknologi utama yang mendalam seperti pengindeksan, pengoptimuman pertanyaan, reka bentuk pangkalan data dan caching. 1. Reka bentuk seni bina pangkalan data dan seni bina pangkalan data yang dioptimumkan adalah asas pengoptimuman prestasi MySQL. Berikut adalah beberapa prinsip teras: Memilih jenis data yang betul dan memilih jenis data terkecil yang memenuhi keperluan bukan sahaja dapat menjimatkan ruang penyimpanan, tetapi juga meningkatkan kelajuan pemprosesan data.

Sebagai profesional data, anda perlu memproses sejumlah besar data dari pelbagai sumber. Ini boleh menimbulkan cabaran kepada pengurusan data dan analisis. Nasib baik, dua perkhidmatan AWS dapat membantu: AWS Glue dan Amazon Athena.
