讓我們使用 Python 來發展不同的訊息傳遞模式。
您需要觀看以下影片才能按照逐步命令進行操作。
慢慢來;確保在運行命令之前仔細檢查它們。
我在我的 GCP 虛擬機上運行本教程,但也可以在本地運行它 ✅
本教學使用 ZeroMQ 介紹 Python3 中套接字的概念。 ZeroMQ 是一種開發套接字的簡單方法,允許分散式進程透過發送訊息相互通訊。
我們今天將研究的訊息傳遞模式如下:
? 注意: 使用套接字可能會很棘手,使用相同的連接埠號碼/相同的套接字一次又一次運行相同的程式碼,可能會導致連接「掛起」(伺服器看起來像是正在運行,但它不能接受連接)。發生這種情況是因為我們沒有正確關閉和銷毀之前的連接。
解決這個問題最合適的方法是關閉套接字並銷毀 ZeroMQ 上下文。有關更多詳細信息,請參閱第 2 階段和第 3 階段的 try – catch 區塊。
在本教學中,您可能會遇到此類問題,例如,在同一連接埠中多次執行相同伺服器。如果您遇到掛起問題,建議您終止 Python 進程,清理 TCP 連接埠號碼,然後再次執行伺服器(請參閱步驟 11)。
讓我們先建立一個新的虛擬機,然後我們將安裝Python3。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
出現提示時輸入:Y。
如今許多應用程式都包含跨網路的元件,因此訊息傳遞至關重要。今天我們將使用 TCP 進行訊息傳輸。
您可以使用 VSC 存取您的虛擬機,也可以使用 SSH 執行命令並使用 pico 編輯文件,在我的例子中,我將使用 SSH。
?確保仔細複製代碼。
我們需要建立第一個ZeroMQ 伺服器,該伺服器一次只允許與一個客戶端綁定。
建立一個名為pair-server.py的新文件,然後輸入以下程式碼。
程式碼使用 zmq.PAIR 模式建立一個新套接字,然後將伺服器綁定到特定的 IP 連接埠(我們已經在 GCP 中開啟)。請注意,在我們停止伺服器之前,伺服器不會停止運作。
查看評論以了解其工作原理。
確保更改 ;這是 GCP 虛擬機器的 內部 IP 位址;用戶端連接埠應與伺服器連接埠相同。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
先不要運行伺服器,首先讓我們建立客戶端。
建立客戶端並花一點時間檢查評論。我將其命名為pair-client.py。
確保更改 ;連接埠應與伺服器中的連接埠相同。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
我們需要兩個個終端視窗來運作PAIR範例。我們將在一個視窗上運行伺服器,在另一個視窗上運行客戶端。現在,按如下方式運行它。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
檢查輸出,我們剛剛建立了一個新的 PAIR 套接字。
在再次運行之前,我們需要清除 TCP 連線。為此,請使用以下命令。
$ python3 pair-server.py
?備註:
我們一次只能運行一個PAIR,這意味著我們不能有多個客戶端,記住這是一個PAIR,第一個客戶端將鎖定套接字.
如果我們運行伺服器一次,客戶端運行兩次,第二個客戶端將“掛起”,這意味著第二個客戶端將等待新伺服器連線。
如果我們想要多次運行該對,我們需要終止伺服器並清除 TCP 連線。
PAIR 當客戶端需要獨佔存取伺服器時是理想的選擇。
我們可以將多個伺服器作為一對連接到多個客戶端,但我們需要使用不同的連接埠號碼進行連接。
每個階段都是相互獨立的,因此,停止伺服器,清除 TCP 端口,然後進入下一階段。
讓我們建立一個客戶端-伺服器連接,其中多個客戶端將連接到單一伺服器。這是最受歡迎的訊息傳遞模式。
$ python3 pair-client.py
現在我們將開發兩個功能相同的客戶端。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
讓我們建立該客戶端的副本並進行對應的編輯。執行以下命令來製作新副本。
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
然後編輯req-client2.py並將客戶端1改為客戶端2。
讓我們編輯列印和套接字訊息(第 8 行和第 9 行)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
要執行此範例,我們需要三個 個終端窗口,一個用於伺服器,兩個用於客戶端。在第一個終端機中執行以下命令。
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
檢查視窗的輸出,我們剛剛建立了兩個與一台伺服器通訊的客戶端。您可以擁有任意數量的客戶端,您將需要建立客戶端,即使具有連接到一台伺服器的不同功能。
? 備註:
客戶端-伺服器是最廣泛使用的模式,當我們安裝和執行 Apache HTTP 伺服器時,我們已經在第 1 類中使用了它。
停止伺服器並清理 TCP 連接埠 5555
- 殺死伺服器:
重擊
$ sudo fusion -k 5555/tcp
發布-訂閱模式是一種非常常見的方法,用於控制向訂閱上下文的許多客戶端廣播數據,伺服器將數據發送到一個或多個客戶端。
$ python3 pair-server.py
讓我們先建立一個簡單的範例。
$ python3 pair-client.py
讓我們建立一個新文件,命名為 pub_server.py。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
建立一個新檔案 pub_client.py。
* 此腳本接受來自命令列的三個參數(即 IP 和兩個連接埠)。
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
我們已準備好運行我們的pub-sub應用程式!我們需要三個個終端視窗。在第一個終端機中運作:
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
讓我們運行客戶端以透過郵遞區號連接並訂閱數據,例如 10001 (NYC)。請記住,客戶端腳本訂閱了兩個伺服器實例。執行下一個指令:
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
推/拉套接字可讓您將訊息分發給排列在管道中的多個工作人員。這對於並行運行程式碼非常有用。 Push 套接字會將訊息均勻分發到其 Pull 用戶端,客戶端將回應傳送到另一個稱為收集器的伺服器。
這相當於生產者/消費者模型,但是消費者計算的結果不會發送到上游,而是下游到另一個拉取/消費者套接字。
我們將實現以下功能。
生產者將向消費者推送 0 到 10 的隨機數。
同一消費者的兩個實例將拉取數字並執行繁重的任務。
任務可以是任何繁重的計算,例如矩陣乘法。
為了簡單起見,我們的「繁重任務」將只傳回相同的數字。
消費者會將各個結果(繁重的任務計算)推送到結果收集器,該收集器將匯總結果。
為了簡單起見,結果收集器的實例將拉取結果併計算每個消費者的部分總和。如果需要,我們可以輕鬆地將兩個部分和加起來。
讓我們來看一個簡單的例子。
此範例示範了分散式處理並行處理的潛力。
首先,讓我們建立在連接埠 5555 上運行的名為 Producer.py 的生產者,確保您調整了您的 .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
然後建立consumer.py如下。不要忘記更改程式碼中的兩個 s。
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
最後,讓我們開發collector.py,再改.
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
確保沒有縮排錯誤!
$ python3 pair-server.py
首先,我們需要運行collector.py,收集器將等待資料被收集,直到我們啟動生產者。
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
幹得好! ?您使用 ZeroMQ 來開發訊息傳遞模式!
以上是使用 ZeroMQ 在分散式系統中傳送訊息的詳細內容。更多資訊請關注PHP中文網其他相關文章!