Let's use Python to develop the different messaging patterns.
You will need to watch the following video to follow the step by step commands.
Take your time; make sure you double-check the commands before you run them.
I run this tutorial on my GCP VM, but feel free to run it locally ✅
This tutorial introduces the concepts of sockets in Python3 using ZeroMQ. ZeroMQ is an easy way to develop sockets to allow distributed processes to communicate with each other by sending messages.
The patterns of messaging that we will examine today are the following:
? Note: Working with Sockets could be tricky, running again and again the same code, using the same port number/same socket, could lead to a connection that “hangs” (the server looks like it is running, but it cannot accept connections). This happens because we did not close and destroy the previous connections correctly.
The most appropriate way to address this is to close the socket and destroy the ZeroMQ context. Refer to try – catch blocks of Phase 2 and Phase 3 for more details.
In this tutorial, you might experience such issues, e.g., running multiple times the same server in the same port. If you face hanging problems, you are advised to kill the Python process, clean the TCP port number, and run the server once more (see step 11).
Let us start by creating a new VM, then we will install Python3.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Type: Y when prompted.
Many applications these days consist of components that stretch across networks, so messaging is essential. Today we will use TCP for message transferring.
You can access your VM using VSC, or you can run the commands using the SSH and edit files with pico, in my case I will use SSH.
? Make sure you copy code carefully.
We will need to create our first ZeroMQ server, the server will allow binding with only one client at a time.
Create a new file called pair-server.py, then enter the following code.
The code creates a new socket using the zmq.PAIR pattern, then binds the server to a particular IP port (that we already openned in GCP). Note that the server will not stop running until we stop it.
Have a look at the comments to understand how this works.
Make sure that you change the ; that is the internal IP address of the GCP VM; the client port should be the same with the server.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Do not run the server yet, first let us create the client.
Create the client and take a minute to examine the comments. I will call it pair-client.py.
Make sure that you change the ; the port should be the same as in the server.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
We will need two terminal windows to run the PAIR example. We will run the server on one window and the client on the other. Now, run it as follows.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Examine the output, we just created a new PAIR socket.
We will need to clear the TCP connection before we run it again. To do this, use the following command.
$ python3 pair-server.py
? Notes:
We can run only one PAIR at a time, this mean that we cannot have multiple clients, remember this is a PAIR, the first client will lock the socket.
If we run the server once, and the client twice, the second client will “hang”, this means that the second client it will wait for a new server to connect.
If we want to run the pair more than once, we will need to kill the server and clear the TCP connection.
PAIRs are ideal when a client needs to have exclusive access to a server.
We can have multiple servers to multiple clients as PAIRs, but we will need to use different PORT numbers for the connections.
Each phase is independent of each other, so, stop the server, clear the TCP ports, and move to the next phase.
Let us create a client-server connection, where multiple clients will connect to a single server. This is the most popular messaging pattern.
$ python3 pair-client.py
Now we will develop two Clients that will be identical in terms of their functionality.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
Let us create a copy of this client and edit it accordingly. Run the following command to make a new copy.
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Then edit the req-client2.py and change client 1 to client 2.
Let's edit the print and socket messages (lines 8 and 9)
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
To run this example, we will need three terminal windows, one for the server and two for the clients. Run the following in the first terminal.
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Check the output of the windows, we just created two clients talking to one server. You can have as many clients as you want, you will need to make clients, even with different functionalities that connect to one server.
? Notes:
Client – Server is the most widely used pattern, we already used it in class 1 when we installed and ran the Apache HTTP server.
Stop the server and clean TCP port 5555
- Kill the server:
bash
$ sudo fuser -k 5555/tcp
The publish – subscribe pattern is a very common way to control broadcasting of data to many clients that are subscribed to a context, in a way that servers send data to one or more clients.
$ python3 pair-server.py
Let us first create a simple example.
$ python3 pair-client.py
Let us create a new file, call it pub_server.py.
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Create a new file pub_client.py.
* The script accepts three arguments from the command line (that are the IP and the two ports).
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 1 ", request,"...") socket.send_string("Hello from client 1") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
We are ready to run our pub-sub application! We will need three terminal windows. In the first terminal run:
$ cp req-client1.py req-client2.py
import zmq import time context = zmq.Context() socket = context.socket(zmq.REQ) # We create a REQ client (REQUEST) socket.setsockopt(zmq.LINGER, 0) socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") for request in range (1,10): print("Sending request Client 2 ", request,"...") socket.send_string("Hello from client 2") message = socket.recv() print("Received reply ", request, "[", message, "]") socket.close() context.destroy()
Let’s run the client to connect and subscribe to data by postal code e.g., 10001 (NYC). Remember the client script subscribes to both server instances. Run the next command:
$ python3 rep-server.py
$ python3 req-client1.py
$ python3 req-client2.py
Push/Pull sockets let you distribute messages to multiple workers, arranged in a pipeline. This is very useful for running code in parallel. A Push socket will distribute messages to its Pull clients evenly, and the clients will send a response to another server, called collector.
This is equivalent to producer/consumer model, but the results computed by consumer are not sent upstream but downstream to another pull/consumer socket.
We will implement the following functionality.
The producer will PUSH random numbers from 0 to 10 to the consumers.
Two instances of the same consumer will PULL the numbers and will perform a heavy task.
The task could be any heavy calculation e.g., matrix multiplication.
For simplicity, our “heavy task” will just return the same number.
The consumers will PUSH the individual results (heavy task calculations) to a Result Collector, that will aggregate the results.
For simplicity, an instance of the Result Collector will PULL the results and calculate the partial sum of each consumer. We can easily sum the two partial sums if needed.
Let us see a simple example.
This example demonstrates the potential of distributed processing for parallel processing.
Firstly, let us create the producer called producer.py running on port 5555 make sure you adapt your .
$ sudo apt update $ sudo apt install software-properties-common $ sudo apt install python3.8 $ sudo apt-get -y install python3-pip $ pip3 install pyzmq
Then create the consumer.py is as follows. Do not forget to change the two s in code.
# import the library import zmq import time # Initialize a new context that is the way to create a socket context = zmq.Context() # We will build a PAIR connection socket = context.socket(zmq.PAIR) # We create a PAIR server # Do not worry about this for the moment... socket.setsockopt(zmq.LINGER, 0) # Create a new socket and "bind" it in the following address # Make sure you update the address socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") # IP:PORT # Keep the socket alive for ever... while True: # Send a text message to the client (send_string) socket.send_string("Server message to Client") # Receive a message, store it in msg and then print it msg = socket.recv() print(msg) # Sleep for 1 second, so when we run it, we can see the results time.sleep(1)
Finally, let us develop the collector.py, again change the .
import zmq import time # Same as before, initialize a socket context = zmq.Context() socket = context.socket(zmq.PAIR) # We create a PAIR server socket.setsockopt(zmq.LINGER, 0) # Connect to the IP that we already bind in the server socket.connect("tcp://<INTERNAL_VM_ADDRESS>:5555") # A counter will help us control our connection # For example connect until you send 10 messages, then disconnect... count = 0 while count<10: msg = socket.recv() print(msg) socket.send_string("Hello from Client") socket.send_string("This is a client message to server") print("Counter: ",count) count+=1 time.sleep(1) # Destroy the context socket and then close the connection context.destroy() socket.close()
Make sure you don’t have an indentation error!
$ python3 pair-server.py
Firstly, we need to run the collector.py, the collector will be waiting for data to be collected until we start the producer.
$ python3 pair-client.py
$ sudo fuser -k 5555/tcp # 5555 refers to your port number
import zmq import time try: # Try to create a new connection context = zmq.Context() socket = context.socket(zmq.REP) # We create a REP server # Here we set a linger period for the socket # Linger 0: no waiting period for new messages socket.setsockopt(zmq.LINGER, 0) socket.bind("tcp://<INTERNAL_VM_ADDRESS>:5555") while True: # Wait for next request from client message = socket.recv() print("Received request: ", message) time.sleep (1) socket.send_string("Hi from Server") except KeyboardInterrupt: # “ctr+c” to break and close the socket! context.destroy() socket.close()
* **Client 1** will send a “Client 1 Hello world” request * **Client 2** will send a “Client 2 Hello world” request to the server. * Let us create a file called `req-client1.py`, then edit as follows, again make sure you change the <INTERNAL_VM_ADDRESS>.
Well done! ? You used ZeroMQ to develop messaging patterns!
The above is the detailed content of Messaging in distributed systems using ZeroMQ. For more information, please follow other related articles on the PHP Chinese website!