


Tutorial on how to operate message queue (RabbitMQ) in Python
RabbitMQ is a complete, reusable enterprise messaging system based on AMQP. It follows the Mozilla Public License open source agreement. The following article mainly introduces you to the tutorial on how to use Python to operate the message queue RabbitMQ. Friends in need can refer to it.
Preface
RabbitMQ is a complete, reusable enterprise messaging system based on AMQP. It follows the Mozilla Public License open source agreement.
MQ stands for Message Queue. Message Queue (MQ) is an application-to-application communication method. Applications communicate by reading and writing messages (application-specific data) to and from queues without requiring a dedicated connection to link them. Messaging refers to programs communicating with each other by sending data in messages, rather than by making direct calls to each other, which is typically used for techniques such as remote procedure calls. Queuing refers to applications communicating through queues. The use of queues removes the requirement that receiving and sending applications execute simultaneously.
Application scenarios:
RabbitMQ is undoubtedly one of the most popular message queues at present, and it also has rich support for various language environments. As a .NET developer, I have It is necessary to learn and understand this tool. There are roughly three usage scenarios for message queues:
1. System integration and distributed system design. Various subsystems are connected through messages, and this solution has gradually developed into an architectural style, namely "architecture passing through messages."
2. When the synchronization processing method in the system seriously affects the throughput, such as logging. If we need to record all user behavior logs in the system, recording the logs synchronously will inevitably affect the response speed of the system. When we send log messages to the message queue, the logging subsystem will consume the logs asynchronously. information.
3. High availability of the system, such as e-commerce flash sale scenarios. When the application server or database server receives a large number of requests at a certain time, system downtime will occur. If the request can be forwarded to the message queue, and then the server consumes these messages, the request will become smoother and the availability of the system will be improved.
1. Installation environment
First, install rabbitmq on Linux
# 环境为CentOS 7 yum install rabbitmq-server # 安装RabbitMQ systemctl start rabbitmq-server # 启动 systemctl enable rabbitmq-server # 开机自启 systemctl stop firewall-cmd # 临时关闭防火墙
Then use pip to install the Python3 development package
pip3 install pika
After installing the software, you can visit http://115.xx.xx. xx:15672/ to access the built-in web page to view and manage RabbitMQ. The default administrator user password is guest
2. Simply add a message to the queue
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:25 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Producer import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 提交消息 for i in range(10): channel.basic_publish(exchange='', routing_key='test_queue', body='hello,world' + str(i)) print("sent...") # 关闭连接 connection.close()
3. Simply get messages from the queue
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 19:40 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Consumer import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ服务器 connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 指定一个队列,如果该队列不存在则创建 channel.queue_declare(queue='test_queue') # 定义一个回调函数 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 告诉RabbitMQ使用callback来接收信息 channel.basic_consume(callback, queue='test_queue', no_ack=False) print('waiting...') # 开始接收信息,并进入阻塞状态,队列里有信息才会调用callback进行处理。按ctrl+c退出。 channel.start_consuming()
4. In case the consumer goes offline
Imagine a situation like this:
The consumer obtains n messages from the message queue The data was about to be processed but the machine crashed. What should I do? There is an ACK in RabbieMQ that can be used to confirm the end of consumer processing. It is somewhat similar to ACK in the network. Every time the consumer obtains data from the queue, the queue will not remove the data immediately, but will wait for the corresponding ACK. After the consumer obtains the data and completes processing, it will send an ACK packet to the queue to notify RabbitMQ that the message has been processed and can be deleted. At this time, RabbitMQ will remove the data from the queue. So in this case, even if the consumer goes offline, there is no problem, the data will still exist in the queue, leaving it for other consumers to process.
This is implemented in Python:
The consumer has this line of codechannel.basic_consume(callback, queue='test_queue' , no_ack=False)
, where no_ack=False
means not to send a confirmation packet. Modifying it to no_ack=True will send a confirmation packet to RabbitMQ after each processing to confirm that the message is processed.
##5. What if RabbitMQ crashes?
Although there is an ACK packet, what if RabbitMQ crashes? There will still be losses. So we can set up a data persistence storage for RabbitMQ. RabbitMQ will persist the data on disk to ensure that the queue will still be there when it is started again. Implemented in Python like this:
我们声明一个队列是这样的channel.queue_declare(queue='test_queue')
,如果需要持久化一个队列可以这样声明channel.queue_declare(queue='test_queue', durable=True)
。不过这行直接放在代码中是不能执行的,因为以前已经有了一个名为test_queue的队列,RabbitMQ 不允许用不同的方式声明同一个队列,所以可以换一个队列名新建来指定数据持久化存储。不过如果只是这样声明的话,在 RabbitMQ 宕机重启后确实队列还在,不过队列里的数据就没有了。除非我们这样来声明队列channel.basic_publish(exchange='', routing_key="test_queue", body=message, properties=pika.BasicProperties(delivery_mode = 2,))
。
六、最简单的发布订阅
最简单的发布订阅在 RabbitMQ 中称之为Fanout模式。也就是说订阅者订阅某个频道,然后发布者向这个频道中发布消息,所有订阅者就都能接收到这条消息。不过因为发布者需要使用订阅者创建的随机队列所以需要先启动订阅者才能启动发布者。
发布者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:21 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Publisher import pika # 创建连接对象 connection = pika.BlockingConnection(pika.ConnectionParameters(host='115.xx.xx.xx')) # 创建频道对象 channel = connection.channel() # 定义交换机,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') message = 'Hello Python' # 将消息发送到交换机 channel.basic_publish(exchange='my_fanout', # 指定exchange routing_key='', # fanout下不需要配置,配置了也不会生效 body=message) connection.close()
订阅者代码:
#!/usr/bin/env python3 # coding=utf-8 # @Time : 2017/6/13 20:20 # @Author : Shawn # @Blog : https://blog.just666.cn # @Email : shawnbluce@gmail.com # @purpose : RabbitMQ_Subscriber import pika credentials = pika.PlainCredentials('guest', 'guest') # 连接到RabbitMQ connection = pika.BlockingConnection(pika.ConnectionParameters('115.xx.xx.xx', 5672, '/', credentials)) channel = connection.channel() # 定义交换机,进行exchange声明,exchange表示交换机名称,type表示类型 channel.exchange_declare(exchange='my_fanout', type='fanout') # 随机创建队列 result = channel.queue_declare(exclusive=True) # exclusive=True表示建立临时队列,当consumer关闭后,该队列就会被删除 queue_name = result.method.queue # 将队列与exchange进行绑定 channel.queue_bind(exchange='my_fanout', queue=queue_name) # 定义回调方法 def callback(ch, method, properties, body): print(body.decode('utf-8')) # 从队列获取信息 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
总结
The above is the detailed content of Tutorial on how to operate message queue (RabbitMQ) in Python. For more information, please follow other related articles on the PHP Chinese website!

Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

AI Hentai Generator
Generate AI Hentai for free.

Hot Article

Hot Tools

Notepad++7.3.1
Easy-to-use and free code editor

SublimeText3 Chinese version
Chinese version, very easy to use

Zend Studio 13.0.1
Powerful PHP integrated development environment

Dreamweaver CS6
Visual web development tools

SublimeText3 Mac version
God-level code editing software (SublimeText3)

Hot Topics



This article will explain how to improve website performance by analyzing Apache logs under the Debian system. 1. Log Analysis Basics Apache log records the detailed information of all HTTP requests, including IP address, timestamp, request URL, HTTP method and response code. In Debian systems, these logs are usually located in the /var/log/apache2/access.log and /var/log/apache2/error.log directories. Understanding the log structure is the first step in effective analysis. 2. Log analysis tool You can use a variety of tools to analyze Apache logs: Command line tools: grep, awk, sed and other command line tools.

Python excels in gaming and GUI development. 1) Game development uses Pygame, providing drawing, audio and other functions, which are suitable for creating 2D games. 2) GUI development can choose Tkinter or PyQt. Tkinter is simple and easy to use, PyQt has rich functions and is suitable for professional development.

PHP and Python each have their own advantages, and choose according to project requirements. 1.PHP is suitable for web development, especially for rapid development and maintenance of websites. 2. Python is suitable for data science, machine learning and artificial intelligence, with concise syntax and suitable for beginners.

The readdir function in the Debian system is a system call used to read directory contents and is often used in C programming. This article will explain how to integrate readdir with other tools to enhance its functionality. Method 1: Combining C language program and pipeline First, write a C program to call the readdir function and output the result: #include#include#include#includeintmain(intargc,char*argv[]){DIR*dir;structdirent*entry;if(argc!=2){

This article discusses the DDoS attack detection method. Although no direct application case of "DebianSniffer" was found, the following methods can be used for DDoS attack detection: Effective DDoS attack detection technology: Detection based on traffic analysis: identifying DDoS attacks by monitoring abnormal patterns of network traffic, such as sudden traffic growth, surge in connections on specific ports, etc. This can be achieved using a variety of tools, including but not limited to professional network monitoring systems and custom scripts. For example, Python scripts combined with pyshark and colorama libraries can monitor network traffic in real time and issue alerts. Detection based on statistical analysis: By analyzing statistical characteristics of network traffic, such as data

To maximize the efficiency of learning Python in a limited time, you can use Python's datetime, time, and schedule modules. 1. The datetime module is used to record and plan learning time. 2. The time module helps to set study and rest time. 3. The schedule module automatically arranges weekly learning tasks.

This article will guide you on how to update your NginxSSL certificate on your Debian system. Step 1: Install Certbot First, make sure your system has certbot and python3-certbot-nginx packages installed. If not installed, please execute the following command: sudoapt-getupdatesudoapt-getinstallcertbotpython3-certbot-nginx Step 2: Obtain and configure the certificate Use the certbot command to obtain the Let'sEncrypt certificate and configure Nginx: sudocertbot--nginx Follow the prompts to select

Configuring an HTTPS server on a Debian system involves several steps, including installing the necessary software, generating an SSL certificate, and configuring a web server (such as Apache or Nginx) to use an SSL certificate. Here is a basic guide, assuming you are using an ApacheWeb server. 1. Install the necessary software First, make sure your system is up to date and install Apache and OpenSSL: sudoaptupdatesudoaptupgradesudoaptinsta
