ホームページ バックエンド開発 Python チュートリアル キューモジュールとソースコードの分析

キューモジュールとソースコードの分析

Nov 03, 2016 pm 05:31 PM
python queue

キュー モジュールは、キュー操作を提供するモジュールです。キューは、スレッド間でデータを交換する最も一般的に使用される形式です。このモジュールは 3 つのキューを提供します:

Queue.Queue(maxsize): 先入れ先出し、maxsize はキューのサイズであり、その値が正でない数値の場合、それはワイヤレス循環キュー

Queue です。 LifoQueue(maxsize): 後入れ先出し、スタックと同等

Queue.PriorityQueue(maxsize): 優先キュー。

その中で、LifoQueue と PriorityQueue は Queue のサブクラスです。 3 つは次の共通メソッドを持っています:

qsize(): おおよそのキュー サイズを返します。なぜ「約」という言葉を追加するのでしょうか?値が 0 より大きい場合、get() メソッドが同時実行中にブロックされないことが保証されないため、同様に put() メソッドに対しても有効です。

empty(): キューが空の場合は True を返し、そうでない場合は False を返します。

full(): キュー サイズが設定されている場合、キューがいっぱいの場合は True を返し、それ以外の場合は False を返します。

put(item[,block[,timeout]]): 要素 item をキューに追加します。 block が False に設定されている場合、キューがいっぱいの場合は Full 例外がスローされます。 block が True に設定され、timeout が None に設定されている場合、スペースが空くまで待機してからキューに追加されます。それ以外の場合は、timeout で設定されたタイムアウト値に基づいて完全例外がスローされます。

put_nowwait(item): put(item,False) と同等。 block が False に設定されている場合、キューが空の場合は、Empty 例外がスローされます。 block が True に設定され、timeout が None に設定されている場合、スペースができるまで待機してからキューに追加します。それ以外の場合は、timeout で設定されたタイムアウト値に基づいて空の例外がスローされます。

get([block[,timeout]]): キューから要素を削除し、その要素の値を返します。timeout が正の数の場合、最大 timeout 秒間ブロックされ、その範囲内に使用可能な項目がない場合は、ブロックされます。 time になると、空の例外がスローされます。

get_nowwait(): get(False) と同等

task_done(): キューに登録されているタスクが完了したことを示すシグナルを送信します。これはコンシューマー スレッドでよく使用されます。

join(): キューのすべての要素が処理されるまでブロックし、その後、他の操作を処理します。

(1) ソースコード分析

Queue モジュールの使い方は非常に簡単ですが、モジュールの関連するソースコードを投稿して分析する必要があると思います。 by the masters は美しく、なんと構造化され、モジュール化されているので、自分が書いたコードのことを考えると泣けてきます。ぜひ学びに来てください。長さを減らすため、ソースコードのコメント部分を削除しました。

from time import time as _time
try:
    import threading as _threading
except ImportError:
    import dummy_threading as _threading
from collections import deque
import heapq
 
__all__ = ['Empty', 'Full', 'Queue', 'PriorityQueue', 'LifoQueue']
 
class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass
 
class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass
 
class Queue:
    def __init__(self, maxsize=0):
        self.maxsize = maxsize
        self._init(maxsize)
        self.mutex = _threading.Lock()
        self.not_empty = _threading.Condition(self.mutex)
        self.not_full = _threading.Condition(self.mutex)
        self.all_tasks_done = _threading.Condition(self.mutex)
        self.unfinished_tasks = 
       
    def get_nowait(self):
        return self.get(False)
    def _init(self, maxsize):
        self.queue = deque()
    def _qsize(self, len=len):
        return len(self.queue)
    def _put(self, item):
        self.queue.append(item)
    def _get(self):
        return self.queue.popleft()
ログイン後にコピー

以下の関数の分析を通じて、Queue オブジェクトがコレクション モジュール (コレクション モジュールについては、Python: 統計とコレクション モジュールをカウントするためのカウンターの使用を参照) のキューに加えて、スレッド モジュールに基づいていることがわかります。ミューテックスロックと条件変数のカプセル化。

deque は両端キューであり、キューやスタックに非常に適しています。上記の Queue オブジェクトは先入れ先出しキューであるため、最初に _init() 関数が両端キューを定義し、次に、右側の _put() 関数と _get() 関数を定義します。同様に、左側の要素の追加と削除は、LifoQueue (後入れ先出しキュー) の実装を考えるのが簡単です。キューの右側が追加され、右側が削除されていることを確認してください。ソースコードを投稿してご覧ください。

class LifoQueue(Queue):
    '''Variant of Queue that retrieves most recently added entries first.'''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item):
        self.queue.append(item)
 
    def _get(self):
        return self.queue.pop()
ログイン後にコピー

その「キュー」は queue() を使用しませんが、リストの append() および Pop() 操作は右端の要素に要素を追加し、右端の要素を削除するため、リストでも同じです。

PriorityQueue を見てみましょう。ここでは heapq モジュールの heappush() 関数と heappop() 関数が使用されます。 heapq モジュールはヒープのデータ構造をモジュール化し、このデータ構造を構築できると同時に、ヒープを操作するための対応するメソッドも提供します。このうち、_init()関数のself.queue=[]は空のヒープを作成しているとみなすことができます。 heappush() は新しい値をヒープに挿入し、 heappop() はヒープから最小値をポップします。これにより優先度を達成できます (ここでは heapq モジュールの簡単な紹介を示します)。ソースコードは次のとおりです。

class PriorityQueue(Queue):
    '''Variant of Queue that retrieves open entries in priority order (lowest first).
 
    Entries are typically tuples of the form:  (priority number, data).
    '''
 
    def _init(self, maxsize):
        self.queue = []
 
    def _qsize(self, len=len):
        return len(self.queue)
 
    def _put(self, item, heappush=heapq.heappush):
        heappush(self.queue, item)
 
    def _get(self, heappop=heapq.heappop):
        return heappop(self.queue)
ログイン後にコピー

基本的なデータ構造が分析され、その後、他の部分が分析されます。

mutex は threading.Lock() オブジェクト、ミューテックス ロックです。not_empty、not_full、および all_tasks_done はすべて threading.Condition() オブジェクトと条件変数であり、同じロック オブジェクトのミューテックスを維持します (ロック オブジェクトのスレッド モジュールについて)および Condition オブジェクトについては、以前のブログ投稿「Python: スレッド、プロセス、コルーチン (2) - スレッド モジュール」を参照してください。

その中には:

self.mutex mutex lock: キューのステータスを取得する操作 (empty()、qsize() など)、またはキューの内容を変更する操作 (get、put など) は、このミューテックス ロックを保持します。 acquire() はロックを取得し、release() はロックを解放します。同時に、ミューテックス ロックは 3 つの条件変数によって共同で維持されます。

self.not_empty 条件変数: スレッドはデータをキューに追加した後、self.not_empty.notify() を呼び出して他のスレッドに通知し、要素を削除するスレッドを起動します。

self.not_full 条件変数: 要素がキューから削除されると、要素を追加するスレッドが起動されます。

self.all_tasks_done 条件変数: 未完了のタスクの数が削除されて 0 になったら、すべてのタスクに完了を通知します

self.unfinished_tasks : 未完了のタスクの数を定義します


main メソッドをもう一度見てみましょう:

(1 )put()

源代码如下:

def put(self, item, block=True, timeout=None):
        self.not_full.acquire()                  #not_full获得锁
        try:
            if self.maxsize > 0:                 #如果队列长度有限制
                if not block:                    #如果没阻塞
                    if self._qsize() == self.maxsize:   #如果队列满了抛异常
                        raise Full
                elif timeout is None:           #有阻塞且超时为空,等待
                    while self._qsize() == self.maxsize:
                        self.not_full.wait()
                elif timeout < 0:
                    raise ValueError("&#39;timeout&#39; must be a non-negative number")
                else:        #如果有阻塞,且超时非负时,结束时间=当前时间+超时时间
                    endtime = _time() + timeout
                    while self._qsize() == self.maxsize:
                        remaining = endtime - _time()
                        if remaining <= 0.0:       #到时后,抛异常
                            raise Full
                            #如果没到时,队列是满的就会一直被挂起,直到有“位置”腾出
                        self.not_full.wait(remaining)
            self._put(item)                    #调用_put方法,添加元素
            self.unfinished_tasks += 1         #未完成任务+1
            self.not_empty.notify()             #通知非空,唤醒非空挂起的任务
        finally:
            self.not_full.release()            #not_full释放锁
ログイン後にコピー

默认情况下block为True,timeout为None。如果队列满则会等待,未满则会调用_put方法将进程加入deque中(后面介绍),并且未完成任务加1还会通知队列非空。

如果设置block参数为Flase,队列满时则会抛异常。如果设置了超时那么在时间到之前进行阻塞,时间一到抛异常。这个方法使用not_full对象进行操作。

(2)get()

源码如下:

def get(self, block=True, timeout=None):
         
        self.not_empty.acquire()                #not_empty获得锁
        try:
            if not block:                       #不阻塞时
                if not self._qsize():           #队列为空时抛异常
                    raise Empty
            elif timeout is None:               #不限时时,队列为空则会等待
                while not self._qsize():
                    self.not_empty.wait()
            elif timeout < 0:
                raise ValueError("&#39;timeout&#39; must be a non-negative number")
            else:
                endtime = _time() + timeout
                while not self._qsize():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()                  #调用_get方法,移除并获得项目
            self.not_full.notify()              #通知非满
            return item                        #返回项目
        finally:
            self.not_empty.release()            #释放锁
ログイン後にコピー

逻辑跟put()函数一样,参数默认情况下队列空了则会等待,否则将会调用_get方法(往下看)移除并获得一个项,最后返回这个项。这个方法使用not_empty对象进行操作。

不过我觉得put()与get()两个函数结合起来理解比较好。not_full与not_empty代表的是两种不同操作类型的线程,not_full可以理解成is-not-full,即队列是否满了,默认是没有满,没有满时not_full这个条件变量才能获取锁,并做一些条件判断,只有符合条件才能向队列里加元素,添加成功后就会通知not_empty条件变量队列里不是空的,“我”刚刚添加进了一个元素,满足可以执行删除动作的基本条件了(队列不是空的,想想如果是空的执行删除动作就没有意义了),同时唤醒一些被挂起的执行移除动作的线程,让这些线程重新判断条件,如果条件准许就会执行删除动作,然后又通知not_full条件变量,告诉“它”队列不是满的,因为“我”刚才删除了一个元素(想想如果队列满了添加元素就添加不进呀,就没意义了),满足了添加元素的基本条件(队列不是满的),同时唤醒一些被挂起的执行添加动作的线程,这些线程又会进行条件判断,符合条件就会添加元素,否则继续挂起,依次类推,同时这样也保证了线程的安全。正与前面所说,当一个元素被移除出队列时,会唤醒一个添加元素的线程;当添加一个元素时会唤醒一个删除元素的线程。

(3)task_done()

源码如下:

def task_done(self):
    
        self.all_tasks_done.acquire()       #获得锁
        try:
            unfinished = self.unfinished_tasks - 1  #判断队列中一个线程的任务是否全部完成
            if unfinished <= 0:                     #是则进行通知,或在过量调用时报异常
                if unfinished < 0:
                    raise ValueError(&#39;task_done() called too many times&#39;)
                self.all_tasks_done.notify_all()
            self.unfinished_tasks = unfinished      #否则未完成任务数量-1
        finally:
            self.all_tasks_done.release()           #最后释放锁
ログイン後にコピー

这个方法判断队列中一个线程的任务是否全部完成,首先会通过all_tasks_done对象获得锁,如果是则进行通知,最后释放锁。


(4)join()

源码如下:

def join(self):
 
    self.all_tasks_done.acquire()
    try:
        while self.unfinished_tasks:        #如果有未完成的任务,将调用wait()方法等待
            self.all_tasks_done.wait()
    finally:
        self.all_tasks_done.release()
ログイン後にコピー

阻塞方法,当队列中有未完成进程时,调用join方法来阻塞,直到他们都完成。


其它的方法都比较简单,也比较好理解,有兴趣可以去看看Queue.py里的源码,要注意的是任何获取队列的状态(empty(),qsize()等),或者修改队列的内容的操作(get,put等)都必须持有互斥锁mutex。

(二)简单例子

实现一个线程不断生成一个随机数到一个队列中

实现一个线程从上面的队列里面不断的取出奇数

实现另外一个线程从上面的队列里面不断取出偶数

import random,threading,time
from Queue import Queue
is_product = True
class Producer(threading.Thread):
    """生产数据"""
    def __init__(self, t_name, queue):
       threading.Thread.__init__(self,name=t_name)
       self.data=queue
    def run(self):
        while 1:
 
            if self.data.full():
                global is_product
                is_product = False
            else:
                if self.data.qsize() <= 7:#队列长度小于等于7时添加元素
                    is_product = True
                    for i in range(2): #每次向队列里添加两个元素
 
                        randomnum=random.randint(1,99)
                        print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                        self.data.put(randomnum,False) #将数据依次存入队列
                        time.sleep(1)
                        print "deque length is %s"%self.data.qsize()
                else:
                    if is_product:
                        for i in range(2):  #
 
                            randomnum = random.randint(1, 99)
                            print "%s: %s is producing %d to the queue!" % (time.ctime(), self.getName(), randomnum)
                            self.data.put(randomnum,False)  # 将数据依次存入队列
                            time.sleep(1)
                            print "deque length is %s" % self.data.qsize()
                    else:
                        pass
 
        print "%s: %s finished!" %(time.ctime(), self.getName())
 
#Consumer thread
class Consumer_even(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self,name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:#队列长度大于7时开始取元素
                val_even = self.data.get(False)
                if val_even%2==0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(),self.getName(),val_even)
                    time.sleep(2)
                else:
                    self.data.put(val_even)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
 
class Consumer_odd(threading.Thread):
    def __init__(self,t_name,queue):
        threading.Thread.__init__(self, name=t_name)
        self.data=queue
    def run(self):
        while 1:
            if self.data.qsize() > 7:
                val_odd = self.data.get(False)
                if val_odd%2!=0:
                    print "%s: %s is consuming. %d in the queue is consumed!" % (time.ctime(), self.getName(), val_odd)
                    time.sleep(2)
                else:
                    self.data.put(val_odd)
                    time.sleep(2)
                print "deque length is %s" % self.data.qsize()
            else:
                pass
 
#Main thread
def main():
    queue = Queue(20)
    producer = Producer(&#39;Pro.&#39;, queue)
    consumer_even = Consumer_even(&#39;Con_even.&#39;, queue)
    consumer_odd = Consumer_odd(&#39;Con_odd.&#39;,queue)
    producer.start()
    consumer_even.start()
    consumer_odd.start()
    producer.join()
    consumer_even.join()
    consumer_odd.join()
 
if __name__ == &#39;__main__&#39;:
    main()
ログイン後にコピー


このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

AI Hentai Generator

AI Hentai Generator

AIヘンタイを無料で生成します。

ホットツール

メモ帳++7.3.1

メモ帳++7.3.1

使いやすく無料のコードエディター

SublimeText3 中国語版

SublimeText3 中国語版

中国語版、とても使いやすい

ゼンドスタジオ 13.0.1

ゼンドスタジオ 13.0.1

強力な PHP 統合開発環境

ドリームウィーバー CS6

ドリームウィーバー CS6

ビジュアル Web 開発ツール

SublimeText3 Mac版

SublimeText3 Mac版

神レベルのコード編集ソフト(SublimeText3)

mysqlは支払う必要がありますか mysqlは支払う必要がありますか Apr 08, 2025 pm 05:36 PM

MySQLには、無料のコミュニティバージョンと有料エンタープライズバージョンがあります。コミュニティバージョンは無料で使用および変更できますが、サポートは制限されており、安定性要件が低く、技術的な能力が強いアプリケーションに適しています。 Enterprise Editionは、安定した信頼性の高い高性能データベースを必要とするアプリケーションに対する包括的な商業サポートを提供し、サポートの支払いを喜んでいます。バージョンを選択する際に考慮される要因には、アプリケーションの重要性、予算編成、技術スキルが含まれます。完璧なオプションはなく、最も適切なオプションのみであり、特定の状況に応じて慎重に選択する必要があります。

hadidb:pythonの軽量で水平方向にスケーラブルなデータベース hadidb:pythonの軽量で水平方向にスケーラブルなデータベース Apr 08, 2025 pm 06:12 PM

hadidb:軽量で高レベルのスケーラブルなPythonデータベースHadIDB(HadIDB)は、Pythonで記述された軽量データベースで、スケーラビリティが高くなっています。 PIPインストールを使用してHADIDBをインストールする:PIPINSTALLHADIDBユーザー管理CREATEユーザー:CREATEUSER()メソッド新しいユーザーを作成します。 Authentication()メソッドは、ユーザーのIDを認証します。 fromhadidb.operationimportuseruser_obj = user( "admin"、 "admin")user_obj。

MongoDBデータベースパスワードを表示するNAVICATの方法 MongoDBデータベースパスワードを表示するNAVICATの方法 Apr 08, 2025 pm 09:39 PM

Hash値として保存されているため、Navicatを介してMongoDBパスワードを直接表示することは不可能です。紛失したパスワードを取得する方法:1。パスワードのリセット。 2。構成ファイルを確認します(ハッシュ値が含まれる場合があります)。 3.コードを確認します(パスワードをハードコードできます)。

mysqlはインターネットが必要ですか? mysqlはインターネットが必要ですか? Apr 08, 2025 pm 02:18 PM

MySQLは、基本的なデータストレージと管理のためにネットワーク接続なしで実行できます。ただし、他のシステムとのやり取り、リモートアクセス、または複製やクラスタリングなどの高度な機能を使用するには、ネットワーク接続が必要です。さらに、セキュリティ対策(ファイアウォールなど)、パフォーマンスの最適化(適切なネットワーク接続を選択)、およびデータバックアップは、インターネットに接続するために重要です。

MySQLを解く方法は、ローカルホストに接続できません MySQLを解く方法は、ローカルホストに接続できません Apr 08, 2025 pm 02:24 PM

MySQL接続は、次の理由が原因である可能性があります。MySQLサービスは開始されず、ファイアウォールは接続をインターセプトし、ポート番号が間違っています。ユーザー名またはパスワードが間違っています。My.cnfのリスニングアドレスは不適切に構成されています。トラブルシューティング手順には以下が含まれます。 2.ファイアウォール設定を調整して、MySQLがポート3306をリッスンできるようにします。 3.ポート番号が実際のポート番号と一致していることを確認します。 4.ユーザー名とパスワードが正しいかどうかを確認します。 5. my.cnfのバインドアドレス設定が正しいことを確認してください。

MySQLワークベンチはMariadBに接続できますか MySQLワークベンチはMariadBに接続できますか Apr 08, 2025 pm 02:33 PM

MySQLワークベンチは、構成が正しい場合、MariadBに接続できます。最初にコネクタタイプとして「mariadb」を選択します。接続構成では、ホスト、ポート、ユーザー、パスワード、およびデータベースを正しく設定します。接続をテストするときは、ユーザー名とパスワードが正しいかどうか、ポート番号が正しいかどうか、ファイアウォールが接続を許可するかどうか、データベースが存在するかどうか、MariadBサービスが開始されていることを確認してください。高度な使用法では、接続プーリングテクノロジーを使用してパフォーマンスを最適化します。一般的なエラーには、不十分な権限、ネットワーク接続の問題などが含まれます。エラーをデバッグするときは、エラー情報を慎重に分析し、デバッグツールを使用します。ネットワーク構成を最適化すると、パフォーマンスが向上する可能性があります

高負荷アプリケーションのMySQLパフォーマンスを最適化する方法は? 高負荷アプリケーションのMySQLパフォーマンスを最適化する方法は? Apr 08, 2025 pm 06:03 PM

MySQLデータベースパフォーマンス最適化ガイドリソース集約型アプリケーションでは、MySQLデータベースが重要な役割を果たし、大規模なトランザクションの管理を担当しています。ただし、アプリケーションのスケールが拡大すると、データベースパフォーマンスのボトルネックが制約になることがよくあります。この記事では、一連の効果的なMySQLパフォーマンス最適化戦略を検討して、アプリケーションが高負荷の下で効率的で応答性の高いままであることを保証します。実際のケースを組み合わせて、インデックス作成、クエリ最適化、データベース設計、キャッシュなどの詳細な主要なテクノロジーを説明します。 1.データベースアーキテクチャの設計と最適化されたデータベースアーキテクチャは、MySQLパフォーマンスの最適化の基礎です。いくつかのコア原則は次のとおりです。適切なデータ型を選択し、ニーズを満たす最小のデータ型を選択すると、ストレージスペースを節約するだけでなく、データ処理速度を向上させることもできます。

Amazon AthenaでAWS接着クローラーの使用方法 Amazon AthenaでAWS接着クローラーの使用方法 Apr 09, 2025 pm 03:09 PM

データの専門家として、さまざまなソースから大量のデータを処理する必要があります。これは、データ管理と分析に課題をもたらす可能性があります。幸いなことに、AWS GlueとAmazon Athenaの2つのAWSサービスが役立ちます。

See all articles