這裡舉一個通訊機制的例子:我們都很熟悉通信這個詞,例如一個人想打電話給他的女友。一旦通話建立,便會形成一個隱式的佇列(請注意這個術語)。此時這個人就會透過對話的方式不停的將訊息告訴女友,而這個人的女友也是在傾聽。我認為在大多數情況下,情況可能是倒過來的。
這裡可以將他們兩個人比作是兩個進程,"這個人"的進程需要將訊息發送給"女友"的進程,就需要一個隊列的幫助。由於女友需要時刻接收隊列中的信息,因此她可以同時進行其他事情,這意味著兩個進程之間的通信主要依賴隊列。
這個隊列可以支援發送訊息與接收訊息,「這個人"負責發送訊息,反之"女友」 負責的是接收訊息。
既然佇列才是重點,那麼就來看看佇列要如何建立。
仍使用 multiprocessing 模組,呼叫該模組的 Queue 函數來實現佇列的建立。
函數名稱 | 介紹 | 參數 | 傳回值 |
---|---|---|---|
Queue | 隊列的建立 | mac_count |
函數名稱 | #介紹 | ||
---|---|---|---|
put | 將訊息放入佇列 | message | |
get | 取得佇列訊息 | 無 |
put 函數功能介紹:將資料傳入。它有一個參數 message ,是一個字串類型。
get 函數功能介紹:用來接收佇列中的資料。 (其實這裡就是常用的json場景,有很多的資料傳輸都是字串的,佇列的插入與取得就是使用的字串,所以json 就非常適用這個場景。)
接下來就來練習一下隊列的使用。
程式碼範例如下:
# coding:utf-8 import json import multiprocessing class Work(object): # 定义一个 Work 类 def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue self.queue = queue def send(self, message): # 定义一个 send(发送) 函数,传入 message # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错] if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去 def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环 while 1: result = self.queue.get() # 获取 '队列对象' --> queue 传入的message # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获 try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res res = json.loads(result) except: res = result print('接收到的信息为:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},)) receive = multiprocessing.Process(target=work.receive) send.start() receive.start()
使用佇列建立進程間通訊遇到的異常
但是這裡會出現一個報錯,如下圖:
報錯截圖範例如下:
這裡的報錯提示是檔案沒有被發現的意思。其實這裡是我們使用 隊列做 put() 和 get()的時候 有一把無形的鎖加了上去,就是上圖中圈中的 .SemLock 。我們不需要去關心造成這個錯誤的具體原因,要解決這個問題其實也很簡單。
FileNotFoundError: [Errno 2] No such file or directory 異常的解決
需要阻塞進程的只是send 或receive 子進程中的一個,只要阻塞其中一個即可,這是理論上的情況。但是我們的 receive子程序是一個 while循環,它會一直執行,所以只需要給 send 子程序加上一個 join 即可。
解決示意圖如下:
PS:雖然解決了報錯問題,但是程式沒有正常退出。
其實由於我們的 receive 進程是個 while循環,並不知道要處理到什麼時候,沒有辦法立刻終止。所以我們需要在 receive 進程 使用 terminate() 函數來終結接收端。
運行結果如下:
新建一個函數,寫入for迴圈模擬批次新增要傳送的訊息
然後再給這個模擬批次發送資料的函數加入一個執行緒。
範例程式碼如下:
# coding:utf-8 import json import time import multiprocessing class Work(object): # 定义一个 Work 类 def __init__(self, queue): # 构造函数传入一个 '队列对象' --> queue self.queue = queue def send(self, message): # 定义一个 send(发送) 函数,传入 message # [这里有个隐藏的bug,就是只判断了传入的是否字符串类型;如果传入的是函数、类、集合等依然会报错] if not isinstance(message, str): # 判断传入的 message 是否为字符串,若不是,则进行 json 序列化 message = json.dumps(message) self.queue.put(message) # 利用 queue 的队列实例化对象将 message 发送出去 def send_all(self): # 定义一个 send_all(发送)函数,然后通过for循环模拟批量发送的 message for i in range(20): self.queue.put('第 {} 次循环,发送的消息为:{}'.format(i, i)) time.sleep(1) def receive(self): # 定义一个 receive(接收) 函数,不需传入参数,但是因为接收是一个源源不断的过程,所以需要使用 while 循环 while 1: result = self.queue.get() # 获取 '队列对象' --> queue 传入的message # 由于我们接收的 message 可能不是一个字符串,所以要进程异常的捕获 try: # 如果传入的 message 符合 JSON 格式将赋值给 res ;若不符合,则直接使用 result 赋值 res res = json.loads(result) except: res = result print('接收到的信息为:{}'.format(res)) if __name__ == '__main__': queue = multiprocessing.Queue() work = Work(queue) send = multiprocessing.Process(target=work.send, args=({'message': '这是一条测试的消息'},)) receive = multiprocessing.Process(target=work.receive) send_all = multiprocessing.Process(target=work.send_all,) send_all.start() # 这里因为 send 只执行了1次,然后就结束了。而 send_all 却要循环20次,它的执行时间是最长的,信息也是发送的最多的 send.start() receive.start() # send.join() # 使用 send 的阻塞会造成 send_all 循环还未结束 ,receive.terminate() 函数接收端就会终结。 send_all.join() # 所以我们只需要阻塞最长使用率的进程就可以了 receive.terminate()
運行結果如下:
#從上圖我們可以看到send 與send_all 兩個進程都可以透過queue這個實例化的Queue 物件傳送訊息,同樣的receive接收函數也會將兩個行程傳入的message 列印輸出出來。
在這一章節,我們成功運用佇列實現了跨進程通信,同時也掌握了佇列的操作技巧。一個隊列中,有一端(這裡我們示範的是 send端)透過 put方法實現添加相關的信息,另一端使用 get 方法獲取相關的信息;兩個進程相互配合達到一個進程通信的效果。
除了佇列,進程之間還可以使用管道、信號量和共享記憶體等方式進行通信,如果您有興趣,可以了解這些方法。可以自行拓展一下。
python提供了多種進程通訊的方式,包括訊號,管道,訊息佇列,信號量,共享內存,socket等
主要Queue和Pipe這兩種方式,Queue用於多個進程間實現通信,Pipe是兩個進程的通信。
1.管道:分為匿名管道和命名管道
匿名管道:在核心中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,一般使用fock函數實現父子程序的通訊
命名管道:在記憶體中申請一塊固定大小的緩衝區,程式擁有寫入和讀取的權利,沒有血緣關係的進程也可以進程間通訊
特點:面向位元組流;生命週期隨核心;自帶同步互斥機制;半雙工,單向通信,兩個管道實現雙向通信
#一種重寫方式是:在操作系統核心中建立一個佇列,佇列包含多個資料封包元素,多個行程可以透過特定句柄來存取該佇列。訊息佇列可以用來將資料從一個行程傳送到另一個行程。每個資料塊被認為是有一個類型,接收者進程接收的資料塊可以有不同的類型。訊息佇列也有管道一樣的不足,就是每個訊息的最大長度是有上限的,每個訊息佇列的總的位元組數是有上限的,系統上訊息佇列的總數也有一個上限
特點:訊息佇列可以被認為是一個全域的一個鍊錶,鍊錶節點中存放著資料封包的類型和內容,有訊息佇列的識別碼進行標記;訊息佇列允許一個或多個行程寫入或讀取訊息;訊息佇列的生命週期隨核心;訊息佇列可實現雙向通訊
3.信號量:在內核中建立一個信號量集合(本質上是數組),數組的元素(信號量)都是1,使用P操作進行-1,使用V操作1
P(sv):如果sv的值大於零,就給它減1;如果它的值為零,就掛起該程式的執行
V(sv):如果有其他進程因等待sv而被掛起,就讓它恢復運行,如果沒有進程因等待sv而掛起,就給它加1
#PV操作用於同一個進程,實現互斥;PV操作用於不同進程,實現同步
功能:對臨界資源進行保護
4.共享記憶體:將同一塊實體記憶體一塊映射到不同的進程的虛擬位址空間中,實現不同進程間對同一資源的共享。說到進程間通訊方式,共享記憶體可以說是最有用的,也是最快速的IPC形式
特點:不同從用戶態到內核態的頻繁切換和拷貝數據,直接從記憶體中讀取就可以;共享記憶體是臨界資源,所以需要操作時必須要確保原子性。使用信號量或互斥鎖都可以.
以上是Python進程間的通訊方式是什麼的詳細內容。更多資訊請關注PHP中文網其他相關文章!