首页 后端开发 Python教程 Queue模块及源码分析

Queue模块及源码分析

Nov 03, 2016 pm 05:31 PM
python queue

    Queue模块是提供队列操作的模块,队列是线程间最常用的交换数据的形式。该模块提供了三种队列:

Queue.Queue(maxsize):先进先出,maxsize是队列的大小,其值为非正数时为无线循环队列

Queue.LifoQueue(maxsize):后进先出,相当于栈

Queue.PriorityQueue(maxsize):优先级队列。

其中LifoQueue,PriorityQueue是Queue的子类。三者拥有以下共同的方法:

qsize():返回近似的队列大小。为什么要加“近似”二字呢?因为当该值大于0的时候并不保证并发执行的时候get()方法不被阻塞,同样,对于put()方法有效。

empty():返回布尔值,队列为空时,返回True,反之返回False。

full():当设定了队列大小的时候,如果队列满了,则返回True,否则返回False。

put(item[,block[,timeout]]):向队列里添加元素item,block设置为False的时候,如果队列满了则抛出Full异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Full异常。

put_nowwait(item):等价与put(item,False)。block设置为False的时候,如果队列为空,则抛出Empty异常。如果block设置为True,timeout设置为None时,则会一种等到有空位的时候再添加进队列;否则会根据timeout设定的超时值抛出Empty异常。

get([block[,timeout]]):从队列中删除元素并返回该元素的值,如果timeout是一个正数,它会阻塞最多超时秒数,并且如果在该时间内没有可用的项目,则引发Empty异常。

get_nowwait():等价于get(False)

task_done():发送信号表明入列任务已完成,经常在消费者线程中用到。

join():阻塞直至队列所有元素处理完毕,然后再处理其它操作。

(一)源码分析

    Queue模块用起来很简单很简单,但我觉得有必要把该模块的相关源代码贴出来分析下,会学到不少东西,看看大神们写的代码多么美观,多么结构化模块化,再想想自己写的代码,都是泪呀,来学习学习。为了缩减篇幅,源码的注释部分被删减掉。

from time import time as _time
try:
    import threading as _threading
except ImportError:
    import dummy_threading as _threading
from collections import deque
import heapq
 
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
 
class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass
 
class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass
 
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = _threading.Lock()
        self.not_empty = _threading.Condition(self.mutex)
        self.not_full = _threading.Condition(self.mutex)
        self.all_tasks_done = _threading.Condition(self.mutex)
        self.unfinished_tasks = 
       
    def get_nowait(self):
        return self.get(False)
    def _init(self, maxsize):
        self.queue = deque()
    def _qsize(self, len=len):
        return len(self.queue)
    def _put(self, item):
        self.queue.append(item)
    def _get(self):
        return self.queue.popleft()
登录后复制

通过后面的几个函数分析知道,Queue对象是在collections模块的queue基础上(关于collections模块参考 Python:使用Counter进行计数统计及collections模块),加上threading模块互斥锁和条件变量封装的。

deque是一个双端队列,很适用于队列和栈。上面的Queue对象就是一个先进先出的队列,所以首先_init()函数定义了一个双端队列,然后它的定义了_put()和_get()函数,它们分别是从双端队列右边添加元素、左边删除元素,这就构成了一个先进先出队列,同理很容易想到LifoQueue(后进先出队列)的实现了,保证队列右边添加右边删除就可以。可以贴出源代码看看。

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item):
        self.queue.append(item)
 
    def _get(self):
        return self.queue.pop()
登录后复制

虽然它的"queue"没有用queue(),用列表也是一样的,因为列表append()和pop()操作是在最右边添加元素和删除最右边元素。

再来看看PriorityQueue,他是个优先级队列,这里用到了heapq模块的heappush()和heappop()两个函数。heapq模块对堆这种数据结构进行了模块化,可以建立这种数据结构,同时heapq模块也提供了相应的方法来对堆做操作。其中_init()函数里self.queue=[]可以看作是建立了一个空堆。heappush() 往堆中插入一条新的值 ,heappop() 从堆中弹出最小值 ,这就可以实现优先级(关于heapq模块这里这是简单的介绍)。源代码如下:

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).
 
    Entries are typically tuples of the form:  (priority number, data).
    '''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)
 
    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)
登录后复制

基本的数据结构分析完了,接着分析其它的部分。

mutex 是个threading.Lock()对象,是互斥锁;not_empty、 not_full 、all_tasks_done这三个都是threading.Condition()对象,条件变量,而且维护的是同一把锁对象mutex(关于threading模块中Lock对象和Condition对象可参考上篇博文Python:线程、进程与协程(2)——threading模块)。

其中:

self.mutex互斥锁:任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有该互斥锁。acquire()获取锁,release()释放锁。同时该互斥锁被三个条件变量共同维护。

self.not_empty条件变量:线程添加数据到队列中后,会调用self.not_empty.notify()通知其它线程,然后唤醒一个移除元素的线程。

self.not_full条件变量:当一个元素被移除出队列时,会唤醒一个添加元素的线程。

self.all_tasks_done条件变量 :在未完成任务的数量被删除至0时,通知所有任务完成

self.unfinished_tasks : 定义未完成任务数量


再来看看主要方法:

(1)put()

源代码如下:

def put(self, item, block=True, timeout=None):
        self.not_full.acquire()                  #not_full获得锁
        try:
            if self.maxsize > 0:                 #如果队列长度有限制
                if not block:                    #如果没阻塞
                    if self._qsize() == self.maxsize:   #如果队列满了抛异常
                        raise Full
                elif timeout is None:           #有阻塞且超时为空,等待
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("&#39;timeout&#39; must be a non-negative number")
                else:        #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:       #到时后,抛异常
                            raise Full
                            #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出
                        self.not_full.wait(remaining)
            self._put(item)                    #调用_put方法,添加元素
            self.unfinished_tasks += 1         #未完成任务+1
            self.not_empty.notify()             #通知非空,唤醒非空挂起的任务
        finally:
            self.not_full.release()            #not_full释放锁
登录后复制

默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。

如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。

(2)get()

源码如下:

def get(self, block=True, timeout=None):
         
        self.not_empty.acquire()                #not_empty获得锁
        try:
            if not block:                       #不阻塞时
                if not self._qsize():           #队列为空时抛异常
                    raise Empty
            elif timeout is None:               #不限时时,队列为空则会等待
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("&#39;timeout&#39; must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()                  #调用_get方法,移除并获得项目
            self.not_full.notify()              #通知非满
            return item                        #返回项目
        finally:
            self.not_empty.release()            #释放锁
登录后复制

逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。

不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。

(3)task_done()

源码如下:

def task_done(self):
    
        self.all_tasks_done.acquire()       #获得锁
        try:
            unfinished = self.unfinished_tasks - 1  #判断队列中一个线程的任务是否全部完成
            if unfinished <= 0:                     #是则进行通知,或在过量调用时报异常
                if unfinished < 0:
                    raise ValueError(&#39;task_done() called too many times&#39;)
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished      #否则未完成任务数量-1
        finally:
            self.all_tasks_done.release()           #最后释放锁
登录后复制

这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。


(4)join()

源码如下:

def join(self):
 
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:        #如果有未完成的任务,将调用wait()方法等待
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()
登录后复制

阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。


其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。

(二)简单例子

实现一个线程不断生成一个随机数到一个队列中

实现一个线程从上面的队列里面不断的取出奇数

实现另外一个线程从上面的队列里面不断取出偶数

import random,threading,time
from Queue import Queue
is_product = True
class Producer(threading.Thread):
    """生产数据"""
    def __init__(self, t_name, queue):
       threading.Thread.__init__(self,name=t_name)
       self.data=queue
    def run(self):
        while 1:
 
            if self.data.full():
                global is_product
                is_product = False
            else:
                if self.data.qsize() <= 7:#队列长度小于等于7时添加元素
                    is_product = True
                    for i in range(2): #每次向队列里添加两个元素
 
                        randomnum=random.randint(1,99)
                        print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                        self.data.put(randomnum,False) #将数据依次存入队列
                        time.sleep(1)
                        print "deque length is %s"%self.data.qsize()
                else:
                    if is_product:
                        for i in range(2):  #
 
                            randomnum = random.randint(1, 99)
                            print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                            self.data.put(randomnum,False)  # 将数据依次存入队列
                            time.sleep(1)
                            print "deque length is %s" % self.data.qsize()
                    else:
                        pass
 
        print "%s: %s finished!" %(time.ctime(), self.getName())
 
#Consumer thread
class Consumer_even(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:#队列长度大于7时开始取元素
                val_even = self.data.get(False)
                if val_even%2==0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                    time.sleep(2)
                else:
                    self.data.put(val_even)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
 
class Consumer_odd(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:
                val_odd = self.data.get(False)
                if val_odd%2!=0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                    time.sleep(2)
                else:
                    self.data.put(val_odd)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
#Main thread
def main():
    queue = Queue(20)
    producer = Producer(&#39;Pro.&#39;, queue)
    consumer_even = Consumer_even(&#39;Con_even.&#39;, queue)
    consumer_odd = Consumer_odd(&#39;Con_odd.&#39;,queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
 
if __name__ == &#39;__main__&#39;:
    main()
登录后复制


本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

mysql 是否要付费 mysql 是否要付费 Apr 08, 2025 pm 05:36 PM

MySQL 有免费的社区版和收费的企业版。社区版可免费使用和修改,但支持有限,适合稳定性要求不高、技术能力强的应用。企业版提供全面商业支持,适合需要稳定可靠、高性能数据库且愿意为支持买单的应用。选择版本时考虑的因素包括应用关键性、预算和技术技能。没有完美的选项,只有最合适的方案,需根据具体情况谨慎选择。

mysql安装后怎么使用 mysql安装后怎么使用 Apr 08, 2025 am 11:48 AM

文章介绍了MySQL数据库的上手操作。首先,需安装MySQL客户端,如MySQLWorkbench或命令行客户端。1.使用mysql-uroot-p命令连接服务器,并使用root账户密码登录;2.使用CREATEDATABASE创建数据库,USE选择数据库;3.使用CREATETABLE创建表,定义字段及数据类型;4.使用INSERTINTO插入数据,SELECT查询数据,UPDATE更新数据,DELETE删除数据。熟练掌握这些步骤,并学习处理常见问题和优化数据库性能,才能高效使用MySQL。

mySQL下载完安装不了 mySQL下载完安装不了 Apr 08, 2025 am 11:24 AM

MySQL安装失败的原因主要有:1.权限问题,需以管理员身份运行或使用sudo命令;2.依赖项缺失,需安装相关开发包;3.端口冲突,需关闭占用3306端口的程序或修改配置文件;4.安装包损坏,需重新下载并验证完整性;5.环境变量配置错误,需根据操作系统正确配置环境变量。解决这些问题,仔细检查每个步骤,就能顺利安装MySQL。

mysql下载文件损坏无法安装的修复方案 mysql下载文件损坏无法安装的修复方案 Apr 08, 2025 am 11:21 AM

MySQL下载文件损坏,咋整?哎,下载个MySQL都能遇到文件损坏,这年头真是不容易啊!这篇文章就来聊聊怎么解决这个问题,让大家少走弯路。读完之后,你不仅能修复损坏的MySQL安装包,还能对下载和安装过程有更深入的理解,避免以后再踩坑。先说说为啥下载文件会损坏这原因可多了去了,网络问题是罪魁祸首,下载过程中断、网络不稳定都可能导致文件损坏。还有就是下载源本身的问题,服务器文件本身就坏了,你下载下来当然也是坏的。另外,一些杀毒软件过度“热情”的扫描也可能造成文件损坏。诊断问题:确定文件是否真的损坏

如何针对高负载应用程序优化 MySQL 性能? 如何针对高负载应用程序优化 MySQL 性能? Apr 08, 2025 pm 06:03 PM

MySQL数据库性能优化指南在资源密集型应用中,MySQL数据库扮演着至关重要的角色,负责管理海量事务。然而,随着应用规模的扩大,数据库性能瓶颈往往成为制约因素。本文将探讨一系列行之有效的MySQL性能优化策略,确保您的应用在高负载下依然保持高效响应。我们将结合实际案例,深入讲解索引、查询优化、数据库设计以及缓存等关键技术。1.数据库架构设计优化合理的数据库架构是MySQL性能优化的基石。以下是一些核心原则:选择合适的数据类型选择最小的、符合需求的数据类型,既能节省存储空间,又能提升数据处理速度

mysql安装后怎么优化数据库性能 mysql安装后怎么优化数据库性能 Apr 08, 2025 am 11:36 AM

MySQL性能优化需从安装配置、索引及查询优化、监控与调优三个方面入手。1.安装后需根据服务器配置调整my.cnf文件,例如innodb_buffer_pool_size参数,并关闭query_cache_size;2.创建合适的索引,避免索引过多,并优化查询语句,例如使用EXPLAIN命令分析执行计划;3.利用MySQL自带监控工具(SHOWPROCESSLIST,SHOWSTATUS)监控数据库运行状况,定期备份和整理数据库。通过这些步骤,持续优化,才能提升MySQL数据库性能。

mysql 需要互联网吗 mysql 需要互联网吗 Apr 08, 2025 pm 02:18 PM

MySQL 可在无需网络连接的情况下运行,进行基本的数据存储和管理。但是,对于与其他系统交互、远程访问或使用高级功能(如复制和集群)的情况,则需要网络连接。此外,安全措施(如防火墙)、性能优化(选择合适的网络连接)和数据备份对于连接到互联网的 MySQL 数据库至关重要。

Navicat查看MongoDB数据库密码的方法 Navicat查看MongoDB数据库密码的方法 Apr 08, 2025 pm 09:39 PM

直接通过 Navicat 查看 MongoDB 密码是不可能的,因为它以哈希值形式存储。取回丢失密码的方法:1. 重置密码;2. 检查配置文件(可能包含哈希值);3. 检查代码(可能硬编码密码)。

See all articles