Home > Backend Development > Python Tutorial > Messaging in distributed systems using ZeroMQ

Messaging in distributed systems using ZeroMQ

Barbara Streisand
Release: 2024-11-21 07:33:11
Original
506 people have browsed it

Messaging in distributed systems using ZeroMQ

Let's use Python to develop the different messaging patterns.

You will need to watch the following video to follow the step by step commands.

Take your time; make sure you double-check the commands before you run them.

  • The following video demonstrates the commands used in this tutorial.

Messaging in distributed systems using ZeroMQ

I run this tutorial on my GCP VM, but feel free to run it locally ✅

This tutorial introduces the concepts of sockets in Python3 using ZeroMQ. ZeroMQ is an easy way to develop sockets to allow distributed processes to communicate with each other by sending messages.

  • In its simplest form, a socket (node) “listens” on a specific IP port, while another socket reaches out to form a connection. Using sockets, we can have on-to-one, one-to-many and many-to-many connection patterns.

The patterns of messaging that we will examine today are the following:

  • Pair: Exclusive, one to one communication, where two peers communicate with each other. The communication is bidirectional and there is no specific state stored in the socket. The server listens on a certain port, and the client connects to it.

Messaging in distributed systems using ZeroMQ

  • Client – Server: A client connects to one or more servers. This pattern allows REQUEST – RESPONSE mode. A client sends a request “zmq.REQ” and receives a reply.

Messaging in distributed systems using ZeroMQ

  • Publish/Subscribe: A traditional communication pattern where senders of messages, called publishers, send messages to specific receivers, called subscribers. Messages are published without the knowledge of what or if any subscriber of that knowledge exists. Multiple subscribers subscribe to messages/topics being published by a publisher or one subscriber can connect to multiple publishers.

Messaging in distributed systems using ZeroMQ

  • Push and Pull sockets (aka Pipelines): Let you distribute messages to multiple workers, arranged in a pipeline. A Push socket will distribute sent messages to its Pull clients evenly. This is equivalent to producer/consumer model, but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

Messaging in distributed systems using ZeroMQ

? Note: Working with Sockets could be tricky, running again and again the same code, using the same port number/same socket, could lead to a connection that “hangs” (the server looks like it is running, but it cannot accept connections). This happens because we did not close and destroy the previous connections correctly.

The most appropriate way to address this is to close the socket and destroy the ZeroMQ context. Refer to try – catch blocks of Phase 2 and Phase 3 for more details.

In this tutorial, you might experience such issues, e.g., running multiple times the same server in the same port. If you face hanging problems, you are advised to kill the Python process, clean the TCP port number, and run the server once more (see step 11).

Phase 1: Pairing a server to a client

Let us start by creating a new VM, then we will install Python3.

  • Keep a copy of the VM internal IP, for this tutorial we will use the internal IP address.
    1. Open a new terminal connection and run the following commands (one after the other). The last command installs ZeroMQ.
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Copy after login
Copy after login
Copy after login
Copy after login

Type: Y when prompted.

Many applications these days consist of components that stretch across networks, so messaging is essential. Today we will use TCP for message transferring.

You can access your VM using VSC, or you can run the commands using the SSH and edit files with pico, in my case I will use SSH.

? Make sure you copy code carefully.

We will need to create our first ZeroMQ server, the server will allow binding with only one client at a time.

  • Create a new file called pair-server.py, then enter the following code.

  • The code creates a new socket using the zmq.PAIR pattern, then binds the server to a particular IP port (that we already openned in GCP). Note that the server will not stop running until we stop it.

  • Have a look at the comments to understand how this works.

  • Make sure that you change the ; that is the internal IP address of the GCP VM; the client port should be the same with the server.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Copy after login
Copy after login
Copy after login
Copy after login

Do not run the server yet, first let us create the client.

Create the client and take a minute to examine the comments. I will call it pair-client.py.

Make sure that you change the ; the port should be the same as in the server.

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Copy after login
Copy after login
Copy after login
Copy after login

We will need two terminal windows to run the PAIR example. We will run the server on one window and the client on the other. Now, run it as follows.

  • Run the server
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Copy after login
Copy after login
Copy after login
Copy after login
  • Run the client
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Copy after login
Copy after login
Copy after login

Examine the output, we just created a new PAIR socket.

  • The script will terminate when the client completes its connection. Then stop the server (ctrl c) and kill it.

We will need to clear the TCP connection before we run it again. To do this, use the following command.

$ python3 pair-server.py
Copy after login
Copy after login
Copy after login

? Notes:

  • We can run only one PAIR at a time, this mean that we cannot have multiple clients, remember this is a PAIR, the first client will lock the socket.

  • If we run the server once, and the client twice, the second client will “hang”, this means that the second client it will wait for a new server to connect.

  • If we want to run the pair more than once, we will need to kill the server and clear the TCP connection.

  • PAIRs are ideal when a client needs to have exclusive access to a server.

  • We can have multiple servers to multiple clients as PAIRs, but we will need to use different PORT numbers for the connections.

Each phase is independent of each other, so, stop the server, clear the TCP ports, and move to the next phase.

Phase 2: Pairing a server to multiple clients

Let us create a client-server connection, where multiple clients will connect to a single server. This is the most popular messaging pattern.

  • Let’s create a server in the context of REP-REQ (reply to a request) pattern.
  • We will call the server rep-server.py, using port 5555.
$ python3 pair-client.py
Copy after login
Copy after login
Copy after login

Now we will develop two Clients that will be identical in terms of their functionality.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Copy after login
Copy after login
Copy after login
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Copy after login
Copy after login
Copy after login

Let us create a copy of this client and edit it accordingly. Run the following command to make a new copy.

* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Copy after login
Copy after login
Copy after login

Then edit the req-client2.py and change client 1 to client 2.

Let's edit the print and socket messages (lines 8 and 9)

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Copy after login
Copy after login

To run this example, we will need three terminal windows, one for the server and two for the clients. Run the following in the first terminal.

  • Let's start the server
$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Copy after login
Copy after login
Copy after login
Copy after login
  • Let's start the first client
# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Copy after login
Copy after login
Copy after login
Copy after login
  • Let's start the second client
import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Copy after login
Copy after login
Copy after login

Check the output of the windows, we just created two clients talking to one server. You can have as many clients as you want, you will need to make clients, even with different functionalities that connect to one server.

? Notes:

  • Client – Server is the most widely used pattern, we already used it in class 1 when we installed and ran the Apache HTTP server.

  • Stop the server and clean TCP port 5555

    • Kill the server:


bash
$ sudo fuser -k 5555/tcp

Phase 3: Pairing a server to a client

The publish – subscribe pattern is a very common way to control broadcasting of data to many clients that are subscribed to a context, in a way that servers send data to one or more clients.

$ python3 pair-server.py
Copy after login
Copy after login
Copy after login

Let us first create a simple example.

$ python3 pair-client.py
Copy after login
Copy after login
Copy after login

Let us create a new file, call it pub_server.py.

$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Copy after login
Copy after login
Copy after login
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Copy after login
Copy after login
Copy after login
  • This command will instruct python to run a server in specific and
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Copy after login
Copy after login
Copy after login

Create a new file pub_client.py.
* The script accepts three arguments from the command line (that are the IP and the two ports).

import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 1 ", request,"...")
    socket.send_string("Hello from client 1")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Copy after login
Copy after login

We are ready to run our pub-sub application! We will need three terminal windows. In the first terminal run:

$ cp req-client1.py req-client2.py
Copy after login
  • In the second terminal run:
import zmq
import time
context = zmq.Context()
socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST)
socket.setsockopt(zmq.LINGER, 0)
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
for request in range (1,10):
    print("Sending request Client 2 ", request,"...")
        socket.send_string("Hello from client 2")
    message = socket.recv()
    print("Received reply ", request, "[", message, "]")
socket.close()
context.destroy()
Copy after login
  • Each server generates weather data. For example:
    • The postal code, e.g.: 10001
    • The temperate, e.g.: -68

Let’s run the client to connect and subscribe to data by postal code e.g., 10001 (NYC). Remember the client script subscribes to both server instances. Run the next command:

$ python3 rep-server.py
Copy after login
  • When you finish kill the servers (ctrl z) and clear the TCP ports running the next commands:
$ python3 req-client1.py
Copy after login
$ python3 req-client2.py
Copy after login
Phase 4: Push/Pull: Using a Pipeline pattern**

Push/Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. This is very useful for running code in parallel. A Push socket will distribute messages to its Pull clients evenly, and the clients will send a response to another server, called collector.

Messaging in distributed systems using ZeroMQ

  • This is equivalent to producer/consumer model, but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.

  • We will implement the following functionality.

  • The producer will PUSH random numbers from 0 to 10 to the consumers.

  • Two instances of the same consumer will PULL the numbers and will perform a heavy task.

  • The task could be any heavy calculation e.g., matrix multiplication.

  • For simplicity, our “heavy task” will just return the same number.

  • The consumers will PUSH the individual results (heavy task calculations) to a Result Collector, that will aggregate the results.

  • For simplicity, an instance of the Result Collector will PULL the results and calculate the partial sum of each consumer. We can easily sum the two partial sums if needed.

  • Let us see a simple example.

    • Producer generates [1,2,3,4,5].
    • Consumer 1 receives [2,4], then calculates a heavy task and forwards results to the Result Collector.
    • Consumer 2 receives [1,3,5], then calculates a heavy task and forwards results to the Result Collector.
    • The result collector calculates the counts and partial sums e.g.:
    • Consumer1[2,4], this means 2 numbers received from Consumer1 and their sum is 6.
    • Consumer2[1,3,5], that means 3 numbers received from this Consumer2 and their sum is 9.
  • This example demonstrates the potential of distributed processing for parallel processing.

Firstly, let us create the producer called producer.py running on port 5555 make sure you adapt your .

$ sudo apt update
$ sudo apt install software-properties-common
$ sudo apt install python3.8
$ sudo apt-get -y install python3-pip
$ pip3 install pyzmq
Copy after login
Copy after login
Copy after login
Copy after login

Then create the consumer.py is as follows. Do not forget to change the two s in code.

# import the library
import zmq
import time
# Initialize a new context that is the way to create a socket
context = zmq.Context()
# We will build a PAIR connection
socket = context.socket(zmq.PAIR) # We create a PAIR server
# Do not worry about this for the moment...
socket.setsockopt(zmq.LINGER, 0) 
# Create a new socket and "bind" it in the following address
# Make sure you update the address
socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT
# Keep the socket alive for ever...
while True:
    # Send a text message to the client (send_string)
    socket.send_string("Server message to Client")
    # Receive a message, store it in msg and then print it
    msg = socket.recv()
    print(msg)
    # Sleep for 1 second, so when we run it, we can see the results
    time.sleep(1)
Copy after login
Copy after login
Copy after login
Copy after login

Finally, let us develop the collector.py, again change the .

import zmq
import time
# Same as before, initialize a socket
context = zmq.Context()
socket = context.socket(zmq.PAIR) # We create a PAIR server
socket.setsockopt(zmq.LINGER, 0)
# Connect to the IP that we already bind in the server
socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555")
# A counter will help us control our connection
# For example connect until you send 10 messages, then disconnect...
count = 0
while count<10:
    msg = socket.recv()
    print(msg)
    socket.send_string("Hello from Client")
    socket.send_string("This is a client message to server")
    print("Counter: ",count)
    count+=1
    time.sleep(1)
# Destroy the context socket and then close the connection
context.destroy()
socket.close()
Copy after login
Copy after login
Copy after login

Make sure you don’t have an indentation error!

$ python3 pair-server.py
Copy after login
Copy after login
Copy after login

Firstly, we need to run the collector.py, the collector will be waiting for data to be collected until we start the producer.

$ python3 pair-client.py
Copy after login
Copy after login
Copy after login
  • Then, we will start the consumers one by one, run each command in a different terminal window.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
Copy after login
Copy after login
Copy after login
  • Run the same command in another terminal.
import zmq
import time
try: # Try to create a new connection
    context = zmq.Context()
    socket = context.socket(zmq.REP) # We create a REP server
    # Here we set a linger period for the socket
    # Linger 0: no waiting period for new messages
    socket.setsockopt(zmq.LINGER, 0)
    socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555")
    while True: # Wait for next request from client
        message = socket.recv()
        print("Received request: ", message)
        time.sleep (1)  
        socket.send_string("Hi from Server")
except KeyboardInterrupt: # “ctr+c” to break and close the socket!
    context.destroy()
    socket.close()
Copy after login
Copy after login
Copy after login
  • Finally, we will start our producer that will start sending data to our pipeline.
* **Client 1** will send a “Client 1 Hello world” request

* **Client 2** will send a “Client 2 Hello world” request to the server. 
* Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Copy after login
Copy after login
Copy after login

Well done! ? You used ZeroMQ to develop messaging patterns!

The above is the detailed content of Messaging in distributed systems using ZeroMQ. For more information, please follow other related articles on the PHP Chinese website!

source:dev.to
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Latest Articles by Author
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template