RabbitMQ is a server based on MQ. Python can be used for program control through the Pika library. Here we will explain in detail the remote result return of Python's operation of the RabbitMQ server message queue:
Let's talk about the author's test first. Environment: Ubuntu14.04 + Python 2.7.4
RabbitMQ server
sudo apt-get install rabbitmq-server
Python requires the Pika library to use RabbitMQ
sudo pip install pika
Remote result return
No result is returned after the message sending end sends the message. If you just send a message, of course there is no problem, but in practice, the receiving end is often required to process the received message and return it to the sending end.
Processing method description: Before sending information, the sending end generates a temporary queue for receiving messages. This queue is used to receive the returned results. In fact, the concepts of receiving end and sending end are relatively blurred here, because the sending end also needs to receive messages, and the receiving end also needs to send messages, so here I use another example to demonstrate this process.
Example content: Assume there is a control center and a computing node. The control center will send a natural number N to the computing node. The computing node adds 1 to the N value and returns it to the control center. Here, center.py is used to simulate the control center, and compute.py is used to simulate the computing nodes.
compute.py code analysis
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, properties, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心 ch.basic_publish(exchange='', routing_key=properties.reply_to, body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
The code of the calculation node is relatively simple. It is worth mentioning that the original receiving methods are all direct The message is printed out, a calculation of plus one is performed here, and the result is sent back to the control center.
center.py code analysis
#!/usr/bin/env python #coding=utf8 import pika class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response = body def request(self, n): self.response = None #发送计算请求,并声明返回队列 self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, ), body=str(n)) #接收返回的数据 while self.response is None: self.connection.process_data_events() return int(self.response) center = Center() print " [x] Requesting increase(30)" response = center.request(30) print " [.] Got %r" % (response,)
The above example code defines the queue and processing method for receiving returned data, and when sending a request Assign the queue to reply_to. This parameter is used to obtain the return queue in the calculation node code.
Open two terminals, one runs the code python compute.py, and the other terminal runs center.py. If the execution is successful, you should be able to see the effect.
During the test, the author encountered some minor problems, that is, the return queue was not specified when center.py sent the message. As a result, compute.py reported an error when sending back the data after calculating the results, prompting that the routing_key was not exists, and an error is reported when running again. Use rabbitmqctl list_queues to view the queue and find that the compute_queue queue has 1 piece of data. This data will be reprocessed every time compute.py is re-run. Later, I used /etc/init.d/rabbitmq-server restart to restart rabbitmq and everything was fine.
Correlation number correlation id
The previous example demonstrated an example of remote result return, but one thing was not mentioned, which is the correlation id. What is this?
Assume that there are multiple computing nodes, and the control center starts multiple threads, sends numbers to these computing nodes, requires calculation results and returns them, but the control center only opens one queue, and all threads start from this queue. To obtain a message, how does each thread determine that the message received corresponds to that thread? This is the use of correlation id. Correlation is translated into Chinese as mutual correlation, which also expresses this meaning.
Correlation id operating principle: The control center sets the correlation id when sending a calculation request, and then the calculation node returns the calculation result together with the received correlation id, so that the control center can identify the request through the correlation id. In fact, correlation id can also be understood as the unique identification code of the request.
Example content: The control center starts multiple threads, and each thread initiates a calculation request. Through the correlation id, each thread can accurately receive the corresponding calculation results.
compute.py code analysis
Compared with the previous article, only one place needs to be modified: when sending the calculation results back to the control center, add the parameter correlation_id setting, the value of this parameter In fact, it was sent from the control center, and it is just sent back again. The code is as follows:
#!/usr/bin/env python #coding=utf8 import pika #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() #定义队列 channel.queue_declare(queue='compute_queue') print ' [*] Waiting for n' #将n值加1 def increase(n): return n + 1 #定义接收到消息的处理方法 def request(ch, method, props, body): print " [.] increase(%s)" % (body,) response = increase(int(body)) #将计算结果发送回控制中心,增加correlation_id的设定 ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(request, queue='compute_queue') channel.start_consuming()
center.py code analysis
The control center code is slightly more complicated, and there are three key places:
Use python's uuid to generate a unique correlation_id.
When sending a calculation request, set the parameter correlation_id.
Define a dictionary to save the returned data, and the key value is the correlation_id generated by the corresponding thread.
The code is as follows:
#!/usr/bin/env python #coding=utf8 import pika, threading, uuid #自定义线程类,继承threading.Thread class MyThread(threading.Thread): def __init__(self, func, num): super(MyThread, self).__init__() self.func = func self.num = num def run(self): print " [x] Requesting increase(%d)" % self.num response = self.func(self.num) print " [.] increase(%d)=%d" % (self.num, response) #控制中心类 class Center(object): def __init__(self): self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() #定义接收返回消息的队列 result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue) #返回的结果都会存储在该字典里 self.response = {} #定义接收到返回消息的处理方法 def on_response(self, ch, method, props, body): self.response[props.correlation_id] = body def request(self, n): corr_id = str(uuid.uuid4()) self.response[corr_id] = None #发送计算请求,并设定返回队列和correlation_id self.channel.basic_publish(exchange='', routing_key='compute_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = corr_id, ), body=str(n)) #接收返回的数据 while self.response[corr_id] is None: self.connection.process_data_events() return int(self.response[corr_id]) center = Center() #发起5次计算请求 nums= [10, 20, 30, 40 ,50] threads = [] for num in nums: threads.append(MyThread(center.request, num)) for thread in threads: thread.start() for thread in threads: thread.join()
The author opened two terminals to run compute.py, opened a terminal to run center.py, and finally output the results The screenshot is as follows:
You can see that although the obtained results are not output sequentially, the results correspond to the source data.
The example here is to create a queue and use the correlation id to identify each request. There is also a way to not use the correlation id, which is to create a temporary queue every time a request is made. However, this consumes too much performance and is not officially recommended.
For more Python operations on remote result return of the RabbitMQ server message queue, please pay attention to the PHP Chinese website!