首页 > 后端开发 > Python教程 > 使用 ZeroMQ 在分布式系统中发送消息

使用 ZeroMQ 在分布式系统中发送消息

Barbara Streisand
发布: 2024-11-21 07:33:11
原创
494 人浏览过

使用 ZeroMQ 在分布式系统中发送消息

让我们使用 Python 来开发不同的消息传递模式。

您需要观看以下视频才能按照分步命令进行操作。

慢慢来;确保在运行命令之前仔细检查它们。

  • 以下视频演示了本教程中使用的命令。

Messaging in distributed systems using ZeroMQ

我在我的 GCP 虚拟机上运行本教程,但也可以在本地运行它 ✅

本教程使用 ZeroMQ 介绍 Python3 中套接字的概念。 ZeroMQ 是一种开发套接字的简单方法,允许分布式进程通过发送消息相互通信。

  • 最简单的形式是,一个套接字(节点)“监听”特定的 IP 端口,同时另一个套接字伸出来形成连接。使用套接字,我们可以拥有一对一、一对多和多对多连接模式。

我们今天将研究的消息传递模式如下:

  • 配对: 排他性、一对一的通信,两个同伴相互通信。通信是双向的,套接字中没有存储特定的状态。服务器监听某个端口,客户端连接到该端口。

Messaging in distributed systems using ZeroMQ

  • 客户端 – 服务器:客户端连接到一台或多台服务器。该模式允许请求-响应模式。客户端发送请求“zmq.REQ”并接收回复。

Messaging in distributed systems using ZeroMQ

  • 发布/订阅: 一种传统的通信模式,消息的发送者(称为发布者)将消息发送到特定的接收者(称为订阅者)。消息的发布无需知道该知识的订阅者是什么或是否存在。多个订阅者订阅由发布者发布的消息/主题,或者一个订阅者可以连接到多个发布者。

Messaging in distributed systems using ZeroMQ

  • 推拉套接字(又名管道):让您将消息分发给排列在管道中的多个工作人员。 Push 套接字会将发送的消息均匀分发到其 Pull 客户端。这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。

Messaging in distributed systems using ZeroMQ

注意: 使用套接字可能会很棘手,使用相同的端口号/相同的套接字一次又一次运行相同的代码,可能会导致连接“挂起”(服务器看起来像是正在运行,但它不能接受连接)。发生这种情况是因为我们没有正确关闭和销毁之前的连接。

解决这个问题最合适的方法是关闭套接字并销毁 ZeroMQ 上下文。有关更多详细信息,请参阅第 2 阶段和第 3 阶段的 try – catch 块。

在本教程中,您可能会遇到此类问题,例如,在同一端口中多次运行同一服务器。如果您遇到挂起问题,建议您终止 Python 进程,清理 TCP 端口号,然后再次运行服务器(请参阅步骤 11)。

第 1 阶段:将服务器与客户端配对

让我们首先创建一个新的虚拟机,然后我们将安装Python3。

  • 保留虚拟机内部 IP 的副本,在本教程中我们将使用内部 IP 地址。
    1. 打开一个新的终端连接并运行以下命令(一个接一个)。最后一个命令安装 ZeroMQ。
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
登录后复制
登录后复制
登录后复制
登录后复制

出现提示时输入:Y。

如今许多应用程序都包含跨网络的组件,因此消息传递至关重要。今天我们将使用 TCP 进行消息传输。

您可以使用 VSC 访问您的虚拟机,也可以使用 SSH 运行命令并使用 pico 编辑文件,在我的例子中,我将使用 SSH。

?确保仔细复制代码。

我们需要创建第一个ZeroMQ 服务器,该服务器一次只允许与一个客户端绑定。

  • 创建一个名为pair-server.py的新文件,然后输入以下代码。

  • 代码使用 zmq.PAIR 模式创建一个新套接字,然后将服务器绑定到特定的 IP 端口(我们已经在 GCP 中打开)。请注意,在我们停止服务器之前,服务器不会停止运行。

  • 查看评论以了解其工作原理。

  • 确保更改 ;这是 GCP 虚拟机的 内部 IP 地址;客户端端口应与服务器端口相同。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
登录后复制
登录后复制
登录后复制
登录后复制

先不要运行服务器,首先让我们创建客户端。

创建客户端并花一点时间检查评论。我将其命名为pair-client.py。

确保更改 ;端口应与服务器中的端口相同。

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
登录后复制
登录后复制
登录后复制
登录后复制

我们需要两个个终端窗口来运行PAIR示例。我们将在一个窗口上运行服务器,在另一个窗口上运行客户端。现在,按如下方式运行它。

  • 运行服务器
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
登录后复制
登录后复制
登录后复制
登录后复制
  • 运行客户端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
登录后复制
登录后复制
登录后复制

检查输出,我们刚刚创建了一个新的 PAIR 套接字。

  • 当客户端完成连接时,脚本将终止。然后停止服务器(ctrl c)并杀死它。

在再次运行之前,我们需要清除 TCP 连接。为此,请使用以下命令。

$ python3 pair-server.py
登录后复制
登录后复制
登录后复制

?备注:

  • 我们一次只能运行一个PAIR,这意味着我们不能有多个客户端,记住这是一个PAIR,第一个客户端将锁定套接字.

  • 如果我们运行服务器一次,客户端运行两次,第二个客户端将“挂起”,这意味着第二个客户端将等待新服务器连接。

  • 如果我们想要多次运行该对,我们需要终止服务器并清除 TCP 连接。

  • PAIR 当客户端需要独占访问服务器时是理想的选择。

  • 我们可以将多个服务器作为一对连接到多个客户端,但我们需要使用不同的端口号进行连接。

每个阶段都是相互独立的,因此,停止服务器,清除 TCP 端口,然后进入下一阶段。

第 2 阶段:将服务器与多个客户端配对

让我们创建一个客户端-服务器连接,其中多个客户端将连接到单个服务器。这是最流行的消息传递模式。

  • 让我们在 REP-REQ(回复请求)模式的上下文中创建一个服务器。
  • 我们将调用服务器rep-server.py,使用端口5555。
$ python3 pair-client.py
登录后复制
登录后复制
登录后复制

现在我们将开发两个功能相同的客户端。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
登录后复制
登录后复制
登录后复制
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
登录后复制
登录后复制
登录后复制

让我们创建该客户端的副本并进行相应的编辑。运行以下命令来制作新副本。

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
登录后复制
登录后复制
登录后复制

然后编辑req-client2.py并将客户端1更改为客户端2。

让我们编辑打印和套接字消息(第 8 行和第 9 行)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
登录后复制
登录后复制

要运行此示例,我们需要三个 个终端窗口,一个用于服务器,两个用于客户端。在第一个终端中运行以下命令。

  • 让我们启动服务器
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
登录后复制
登录后复制
登录后复制
登录后复制
  • 让我们启动第一个客户端
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
登录后复制
登录后复制
登录后复制
登录后复制
  • 让我们启动第二个客户端
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
登录后复制
登录后复制
登录后复制

检查窗口的输出,我们刚刚创建了两个与一台服务器通信的客户端。您可以拥有任意数量的客户端,您将需要创建客户端,即使具有连接到一台服务器的不同功能。

备注:

  • 客户端-服务器是最广泛使用的模式,当我们安装和运行 Apache HTTP 服务器时,我们已经在第 1 类中使用了它。

  • 停止服务器并清理 TCP 端口 5555

    • 杀死服务器:


重击
$ sudo fusion -k 5555/tcp

第 3 阶段:将服务器与客户端配对

发布-订阅模式是一种非常常见的方法,用于控制向订阅上下文的许多客户端广播数据,服务器将数据发送到一个或多个客户端。

$ python3 pair-server.py
登录后复制
登录后复制
登录后复制

让我们首先创建一个简单的示例。

$ python3 pair-client.py
登录后复制
登录后复制
登录后复制

让我们创建一个新文件,命名为 pub_server.py。

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
登录后复制
登录后复制
登录后复制
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
登录后复制
登录后复制
登录后复制
  • 该命令将指示 python 以特定的方式运行服务器
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
登录后复制
登录后复制
登录后复制

创建一个新文件 pub_client.py。
* 该脚本接受来自命令行的三个参数(即 IP 和两个端口)。

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
登录后复制
登录后复制

我们已准备好运行我们的pub-sub应用程序!我们需要三个个终端窗口。在第一个终端中运行:

$ cp req-client1.py req-client2.py
登录后复制
  • 在第二个终端中运行:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
登录后复制
  • 每个服务器都会生成天气数据。例如:
    • 邮政编码,例如:10001
    • 温带,例如:-68

让我们运行客户端以通过邮政编码连接并订阅数据,例如 10001 (NYC)。请记住,客户端脚本订阅了两个服务器实例。运行下一个命令:

$ python3 rep-server.py
登录后复制
  • 完成杀死服务器(ctrl z)并清除 TCP 端口后,运行以下命令:
$ python3 req-client1.py
登录后复制
$ python3 req-client2.py
登录后复制
第 4 阶段:推/拉:使用管道模式**

推/拉套接字可让您将消息分发给排列在管道中的多个工作人员。这对于并行运行代码非常有用。 Push 套接字会将消息均匀分发到其 Pull 客户端,客户端将响应发送到另一个称为收集器的服务器。

Messaging in distributed systems using ZeroMQ

  • 这相当于生产者/消费者模型,但是消费者计算的结果不会发送到上游,而是下游到另一个拉取/消费者套接字。

  • 我们将实现以下功能。

  • 生产者将向消费者推送 0 到 10 的随机数。

  • 同一消费者的两个实例将拉取数字并执行繁重的任务。

  • 任务可以是任何繁重的计算,例如矩阵乘法。

  • 为了简单起见,我们的“繁重任务”将只返回相同的数字。

  • 消费者会将各个结果(繁重的任务计算)推送到结果收集器,该收集器将汇总结果。

  • 为了简单起见,结果收集器的实例将拉取结果并计算每个消费者的部分总和。如果需要,我们可以轻松地将两个部分和相加。

  • 让我们看一个简单的例子。

    • 生产者生成 [1,2,3,4,5]。
    • 消费者1接收到[2,4],然后计算一个繁重的任务并将结果转发给结果收集器。
    • 消费者2收到[1,3,5],然后计算一个繁重的任务,并将结果转发给结果收集器。
    • 结果收集器计算计数和部分总和,例如:
    • Consumer1[2,4],这意味着从 Consumer1 收到 2 个数字,它们的总和为 6
    • Consumer2[1,3,5],表示从该 Consumer2 收到 3 个数字,其总和为 9
  • 此示例演示了分布式处理并行处理的潜力。

首先,让我们创建在端口 5555 上运行的名为 Producer.py 的生产者,确保您调整了您的 .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
登录后复制
登录后复制
登录后复制
登录后复制

然后创建consumer.py如下。不要忘记更改代码中的两个 s。

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
登录后复制
登录后复制
登录后复制
登录后复制

最后,让我们开发collector.py,再次更改.

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
登录后复制
登录后复制
登录后复制

确保没有缩进错误!

$ python3 pair-server.py
登录后复制
登录后复制
登录后复制

首先,我们需要运行collector.py,收集器将等待数据被收集,直到我们启动生产者。

$ python3 pair-client.py
登录后复制
登录后复制
登录后复制
  • 然后,我们将一一启动消费者,在不同的终端窗口中运行每个命令。
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
登录后复制
登录后复制
登录后复制
  • 在另一个终端中运行相同的命令。
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
登录后复制
登录后复制
登录后复制
  • 最后,我们将启动生产者,开始将数据发送到我们的管道。
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
登录后复制
登录后复制
登录后复制

干得好! ?您使用 ZeroMQ 来开发消息传递模式!

以上是使用 ZeroMQ 在分布式系统中发送消息的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:dev.to
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
作者最新文章
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板