


Python+Pika+RabbitMQ environment deployment and implementation of work queue
The Chinese translation of rabbitmq mainly refers to the letters mq: Message Queue, which means message queue. There is also the word "rabbit" in front of it, which means rabbit. It is the same as the python language called python. Foreigners are quite humorous. The rabbitmq service is similar to the mysql and apache services, but the functions provided are different. rabbimq is used to provide a service for sending messages, which can be used to communicate between different applications.
Install rabbitmq
First install rabbitmq. Under ubuntu 12.04, you can install it directly through apt-get:
sudo apt-get install rabbitmq-server
After installation, the rabbitmq service has been started. Next, let’s look at an example of writing Hello World! in python. The content of the example is to send "Hello World!" from send.py to rabbitmq, and receive.py receives the information sent by send.py from rabbitmq.
P stands for produce, which means producer, and can also be called sender. In the example, it is shown as send.py; C stands for consumer, which means consumer. Meaning, it can also be called the receiver, which is represented by receive.py in the example; the red one in the middle represents the meaning of the queue, which is represented by the hello queue in the example.
Python uses the rabbitmq service. You can use the ready-made class libraries pika, txAMQP or py-amqplib. Here, pika is chosen.
Install pika
To install pika, you can use pip to install it. pip is the software management package of python. If it is not installed, you can install it through apt-get
sudo apt-get install python-pip
Install pika via pip:
sudo pip install pika
send.py code
Connect to the rabbitmq server. Because it is tested locally, just use localhost.
connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
Declare the message queue in which messages will be delivered. If messages are sent to a queue that does not exist, rabbitmq will automatically clear these messages.
channel.queue_declare(queue='hello')
Send a message to the hello queue declared above, where exchange represents the exchanger, which can accurately specify which queue the message should be sent to, and routing_key is set to the name of the queue , the body is the content to be sent, and we will not pay attention to the specific sending details for now.
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Close connection
connection.close()
Full code
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print " [x] Sent 'Hello World!'" connection.close()
Let’s execute this program first. If the execution is successful, rabbitmqctl should successfully add the hello queue, and there should be a message in the queue. Use the rabbitmqctl command to check it
rabbitmqctl list_queues
Output the following information on the author's computer:
receive.py code
is the same as the previous two steps of send.py, both of which require connecting to the server first and then declaring the message queue, which will not be discussed here. Posted the same code. Receiving messages is more complicated, and you need to define a callback function to process it. The callback function here is to print out the information.def callback(ch, method, properties, body): print "Received %r" % (body,)
channel.basic_consume(callback, queue='hello', no_ack=True)
channel.start_consuming()
#!/usr/bin/env python #coding=utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel() channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print " [x] Received %r" % (body,) channel.basic_consume(callback, queue='hello', no_ack=True) print ' [*] Waiting for messages. To exit press CTRL+C' channel.start_consuming()
Work queue example
1. Preparation
import sys message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='hello', body=message) print " [x] Sent %r" % (message,)
import time def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done"
$ python new_task.py First message. $ python new_task.py Second message.. $ python new_task.py Third message... $ python new_task.py Fourth message.... $ python new_task.py Fifth message.....
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'First message.' [x] Received 'Third message...' [x] Received 'Fifth message.....'
$ python worker.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Second message..' [x] Received 'Fourth message....'
从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。
2.消息确认(Message acknowledgment)
消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:
def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep(5) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag)
这边停顿5秒,可以方便ctrl+c退出。
去除no_ack=True参数或者设置为False也可以。
channel.basic_consume(callback, queue='hello', no_ack=False)
用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。
3.消息持久化存储(Message durability)
虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:
channel.queue_declare(queue='hello', durable=True)
但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:
channel.queue_declare(queue='task_queue', durable=True)
在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:
channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
4.公平调度(Fair dispatch)
上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。
channel.basic_qos(prefetch_count=1)
new_task.py完整代码
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) message = ' '.join(sys.argv[1:]) or "Hello World!" channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent )) print " [x] Sent %r" % (message,) connection.close() worker.py完整代码 #!/usr/bin/env python import pika import time connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.queue_declare(queue='task_queue', durable=True) print ' [*] Waiting for messages. To exit press CTRL+C' def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume(callback, queue='task_queue') channel.start_consuming()
更多Python+Pika+RabbitMQ environment deployment and implementation of work queue相关文章请关注PHP中文网!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics

This tutorial demonstrates how to use Python to process the statistical concept of Zipf's law and demonstrates the efficiency of Python's reading and sorting large text files when processing the law. You may be wondering what the term Zipf distribution means. To understand this term, we first need to define Zipf's law. Don't worry, I'll try to simplify the instructions. Zipf's Law Zipf's law simply means: in a large natural language corpus, the most frequently occurring words appear about twice as frequently as the second frequent words, three times as the third frequent words, four times as the fourth frequent words, and so on. Let's look at an example. If you look at the Brown corpus in American English, you will notice that the most frequent word is "th

This article explains how to use Beautiful Soup, a Python library, to parse HTML. It details common methods like find(), find_all(), select(), and get_text() for data extraction, handling of diverse HTML structures and errors, and alternatives (Sel

Python provides a variety of ways to download files from the Internet, which can be downloaded over HTTP using the urllib package or the requests library. This tutorial will explain how to use these libraries to download files from URLs from Python. requests library requests is one of the most popular libraries in Python. It allows sending HTTP/1.1 requests without manually adding query strings to URLs or form encoding of POST data. The requests library can perform many functions, including: Add form data Add multi-part file Access Python response data Make a request head

Dealing with noisy images is a common problem, especially with mobile phone or low-resolution camera photos. This tutorial explores image filtering techniques in Python using OpenCV to tackle this issue. Image Filtering: A Powerful Tool Image filter

PDF files are popular for their cross-platform compatibility, with content and layout consistent across operating systems, reading devices and software. However, unlike Python processing plain text files, PDF files are binary files with more complex structures and contain elements such as fonts, colors, and images. Fortunately, it is not difficult to process PDF files with Python's external modules. This article will use the PyPDF2 module to demonstrate how to open a PDF file, print a page, and extract text. For the creation and editing of PDF files, please refer to another tutorial from me. Preparation The core lies in using external module PyPDF2. First, install it using pip: pip is P

This tutorial demonstrates how to leverage Redis caching to boost the performance of Python applications, specifically within a Django framework. We'll cover Redis installation, Django configuration, and performance comparisons to highlight the bene

Natural language processing (NLP) is the automatic or semi-automatic processing of human language. NLP is closely related to linguistics and has links to research in cognitive science, psychology, physiology, and mathematics. In the computer science

This article compares TensorFlow and PyTorch for deep learning. It details the steps involved: data preparation, model building, training, evaluation, and deployment. Key differences between the frameworks, particularly regarding computational grap
