キュー メッセージの取得#None | #str |
|
put 関数の概要: データを渡します。これには、文字列型のパラメーター メッセージが 1 つあります。
get 関数の概要: キュー内のデータを受信するために使用されます。 (実際、これは一般的な json シナリオです。多くのデータ送信は文字列です。キューの挿入と取得には文字列が使用されるため、json はこのシナリオに非常に適しています。)
次のステップは、キューの使用を練習してみましょう。
プロセス間通信 - キューのデモ ケース
コード例は次のとおりです。
# coding:utf-8
import json
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send.start()
receive.start()
ログイン後にコピー
キューを使用してプロセス間通信を確立するときに例外が発生しました
しかし、ここでは以下に示すようなエラー メッセージが表示されます:
エラー スクリーンショットの例は次のとおりです:
ここでのエラー メッセージは、ファイルがは見つかりませんでした。実際、キューを使用して put() と get() を実行すると、目に見えないロックが追加されます。これは、上の図の円内の .SemLock です。このエラーの具体的な原因を気にする必要はなく、この問題を解決するのは実際には非常に簡単です。
FileNotFoundError: [Errno 2] そのようなファイルまたはディレクトリはありません例外解決策
プロセスをブロックする必要があるのは、送信または受信サブプロセスの 1 つだけです。そのうちの 1 つをブロックするだけです。 . これが理論上の状況です。ただし、受信サブプロセスは while ループであり、常に実行されるため、送信サブプロセスに結合を追加するだけで済みます。
解決図は次のとおりです。
PS: エラーの問題は解決されましたが、プログラムは正常に終了しませんでした。
実際、受信プロセスは while ループであるため、いつ処理されるかわかりませんし、すぐに終了する方法もありません。したがって、受信プロセスで terminate() 関数を使用して受信側を終了する必要があります。
実行結果は次のとおりです。
send 関数にデータをバッチで追加します。
新しい関数を作成して、メッセージ送信のためのバッチ追加をシミュレートする for ループ
次に、バッチでのデータ送信をシミュレートするスレッドをこの関数に追加します。
サンプル コードは次のとおりです。
# coding:utf-8
import json
import time
import multiprocessing
class Work(object): # 定义一个 Work 类
def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue
self.queue = queue
def send(self, message): # 定义一个 send(发送) 函数,传入 message
# [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错]
if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化
message = json.dumps(message)
self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去
def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message
for i in range(20):
self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i))
time.sleep(1)
def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环
while 1:
result = self.queue.get() # 获取 '队列对象' --> queue 传入的message
# 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获
try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res
res = json.loads(result)
except:
res = result
print('接收到的信息为:{}'.format(res))
if __name__ == '__main__':
queue = multiprocessing.Queue()
work = Work(queue)
send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},))
receive = multiprocessing.Process(target=work.receive)
send_all = multiprocessing.Process(target=work.send_all,)
send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的
send.start()
receive.start()
# send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。
send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了
receive.terminate()
ログイン後にコピー
実行結果は次のとおりです。
上の図から、 2 つのプロセス send と send_all インスタンス化された queue の Queue オブジェクトを介してメッセージを送信でき、同じ受信関数によって 2 つのプロセスによって渡されたメッセージも出力されます。
セクション
この章では、キューを使用してプロセス間通信を実現することに成功し、キューの操作スキルも習得しました。キューでは、一方の端 (ここでは送信側を示しています) が put メソッドを通じて関連情報を追加し、もう一方の端が get メソッドを使用して関連情報を取得します。2 つのプロセスは相互に連携して 1 つのプロセスの効果を達成します。コミュニケーション。
キューに加えて、プロセスはパイプ、セマフォ、共有メモリを使用して通信することもできます。興味がある場合は、これらの方法について学ぶことができます。自分で拡張することもできます。
プロセス間通信のその他の方法 - 補足
Python は、シグナル、パイプ、メッセージ キュー、セマフォ、共有メモリ、ソケットなどを含む、プロセス間で通信するためのさまざまな方法を提供します。
##大きく分けて Queue と Pipe の 2 つのメソッドがあり、Queue は複数のプロセス間の通信を実現するために使用され、Pipe は 2 つのプロセス間の通信を実現します。 1. パイプ: 匿名パイプと名前付きパイプに分けられます匿名パイプ: カーネル内の固定サイズのバッファに適用されます。プログラムは書き込みと読み取りの権限を持ちます。一般に、fockを使用します。親プロセスと子プロセス間の通信を実装する関数です。名前付きパイプ: メモリ上の固定サイズのバッファを適用します。プログラムは書き込みと読み取りの権限を持ちます。血のつながりのないプロセス間でも通信できます。プロセス間特長: バイトストリーム指向、ライフサイクルはカーネルに従う、同期相互排他機構内蔵、半二重一方向通信、2本のパイプで双方向通信を実現 #1 つの書き換え方法は次のとおりです: 運用中 システム カーネル内にキューが確立され、複数のデータグラム要素が含まれており、複数のプロセスが特定のハンドルを介してキューにアクセスできます。メッセージ キューを使用して、あるプロセスから別のプロセスにデータを送信できます。各データ ブロックにはタイプがあるとみなされ、受信プロセスによって受信されるデータ ブロックは異なるタイプを持つ可能性があります。メッセージ キューにもパイプと同じ欠点があります。つまり、各メッセージの最大長には上限があり、各メッセージ キューの合計バイト数には上限があり、メッセージ キューの合計バイト数にも上限があります。システム上のメッセージ キューの総数
特徴: メッセージ キューはグローバル リンク リストとして考えることができます。リンク リスト ノードはデータグラムのタイプと内容を保存し、メッセージの識別子でマークされます。キュー; メッセージ キューを使用すると、1 つ以上のプロセスがメッセージの書き込みまたは読み取りを行うことができます。メッセージ キューのライフ サイクルはカーネルによって異なります。メッセージ キューは双方向通信を実現できます
3. セマフォ: カーネル内にセマフォ コレクション (基本的に配列) を作成します。配列 (セマフォ) の要素はすべて 1 です。-1 を実行するには P 演算を使用し、1 を実行するには V 演算を使用します。
P(sv): sv の値が 0 より大きい場合は 1 減算し、値が 0 の場合はプログラムの実行を一時停止します
V(sv): 他にある場合SV を待っているプロセスが一時停止されている場合は、実行を再開します。SV を待っているためにプロセスが一時停止されていない場合は、それに 1 を追加します。
PV 操作は、相互排他を実現するために同じプロセスに使用されます。同期を実現するプロセス
機能: 重要なリソースを保護
4. 共有メモリ: 同じ物理メモリを異なるプロセスの仮想アドレス空間にマッピングして、同期を実現します。異なるプロセス間の同期 同じリソースの共有。プロセス間通信方法に関しては、共有メモリが IPC の最も便利で最速の形式であると言えます。
特徴: ユーザー モードからカーネル モードへのデータの頻繁な切り替えやコピーとは異なり、データを直接読み取ります。メモリ それは問題ありません。共有メモリは重要なリソースであるため、操作が必要な場合にはアトミック性が保証される必要があります。セマフォまたはミューテックス ロックを使用できます。