Home Backend Development Python Tutorial Use of Python rabbitmq (2)

Use of Python rabbitmq (2)

Jan 17, 2017 pm 02:51 PM

The previous article introduced the installation of rabbitmq and the classic hello world! Example. Here we will have an understanding of Work Queues. Because it is a continuation of the previous article, if you have not read the previous article, this article may be difficult to understand. The address of the previous article is: How to install rabbitmq and python on ubuntu


Messages can also be understood as tasks, the message sender can be understood as the task assigner, and the message receiver It can be understood as a worker. When the worker receives a task and has not completed it, the task allocator sends another task, and it is too busy, so multiple workers are needed to handle these tasks together. Workers are called work queues. The structure diagram is as follows:

Use of Python rabbitmq (2)

rabbitmq’s python instance work queue


Preparation


In the example program, use new_task.py to simulate the task allocator and worker.py to simulate the worker.


Modify send.py, receive information from the command line parameters, and send

1

2

3

4

5

6

import sys

message= ' '.join(sys.argv[1:])or "Hello World!"

channel.basic_publish(exchange='',

routing_key='hello',

body=message)

print " [x] Sent %r" % (message,)

Copy after login

Modify the callback function of receive.py.

1

2

3

4

5

import time

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep( body.count('.') )

print " [x] Done"

Copy after login

First open two terminals here, both run worker.py, and are in the listening state. This is equivalent to two workers. Open the third terminal and run new_task.py

1

2

3

4

5

$ python new_task.py First message.

$ python new_task.py Second message..

$ python new_task.py Third message...

$ python new_task.py Fourth message....

$ python new_task.py Fifth message.....

Copy after login

Observe that worker.py receives tasks. One worker receives 3 tasks:

1

2

3

4

5

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'First message.'

[x] Received 'Third message...'

[x] Received 'Fifth message.....'

Copy after login

The other worker receives 2 Task:

1

2

3

4

$ python worker.py

[*] Waiting for messages. To exit press CTRL+C

[x] Received 'Second message..'

[x] Received 'Fourth message....'

Copy after login

From the above point of view, each worker will be assigned tasks in turn. So if a worker dies while processing a task, the task is not completed and should be handed over to other workers. So there should be a mechanism that will provide feedback when a worker completes a task.

Message acknowledgment (Message acknowledgment)


Message acknowledgment means that when the worker completes the task, it will be fed back to rabbitmq. Modify the callback function in worker.py:

1

2

3

4

5

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep(5)

print " [x] Done"

ch.basic_ack(delivery_tag= method.delivery_tag)

Copy after login


Pause here for 5 seconds to facilitate ctrl+c exit.

You can also remove the no_ack=True parameter or set it to False.

1

channel.basic_consume(callback, queue='hello', no_ack=False)

Copy after login

Run with this code, even if one of the workers exits with ctrl+c, the task being executed will not be lost, and rabbitmq will redistribute the task to other workers.


Message durability (Message durability)


Although there is a message feedback mechanism, if rabbitmq itself hangs If you drop it, the mission will still be lost. Therefore, tasks need to be stored persistently. Declare persistent storage:

1

channel.queue_declare(queue='hello', durable=True)

Copy after login

But this program will execute an error because the hello queue already exists and is non-persistent. RabbitMQ does not allow the use of different parameters to redefine existing queues. Redefine a queue:

1

channel.queue_declare(queue='task_queue', durable=True)

Copy after login

When sending a task, use delivery_mode=2 to mark the task as persistent storage:


1

2

3

4

5

6

channel.basic_publish(exchange='',

routing_key="task_queue",

body=message,

properties=pika.BasicProperties(

delivery_mode= 2,# make message persistent

))

Copy after login

Fair dispatch


In the above example, although each worker is assigned to tasks in turn, each The tasks are not necessarily the same. Some tasks may be heavier and take a longer time to execute; some tasks may be lighter and take a shorter time to execute. It would be best if it can be scheduled fairly. Use basic_qos to set prefetch_count=1 so that rabbitmq will not assign multiple tasks to workers at the same time. That is, only after the worker completes the task, will it receive the task again.

1

channel.basic_qos(prefetch_count=1)

Copy after login

new_task.py complete code

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

#!/usr/bin/env python

import pika

import sys

connection= pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel= connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message= ' '.join(sys.argv[1:])or "Hello World!"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode= 2,# make message persistent

))

print " [x] Sent %r" % (message,)

connection.close()

Copy after login

worker.py complete code

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

#!/usr/bin/env python

import pika

import time

connection= pika.BlockingConnection(pika.ConnectionParameters(

host='localhost'))

channel= connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):

print " [x] Received %r" % (body,)

time.sleep( body.count('.') )

print " [x] Done"

ch.basic_ack(delivery_tag= method.delivery_tag)

channel.basic_qos(prefetch_count=1)

channel.basic_consume(callback,

queue='task_queue')

channel.start_consuming()

Copy after login

The above is the content of using Python rabbitmq (2). For more related content, please pay attention to PHP Chinese website (www.php.cn)!


Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: How To Unlock Everything In MyRise
4 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

Do mysql need to pay Do mysql need to pay Apr 08, 2025 pm 05:36 PM

MySQL has a free community version and a paid enterprise version. The community version can be used and modified for free, but the support is limited and is suitable for applications with low stability requirements and strong technical capabilities. The Enterprise Edition provides comprehensive commercial support for applications that require a stable, reliable, high-performance database and willing to pay for support. Factors considered when choosing a version include application criticality, budgeting, and technical skills. There is no perfect option, only the most suitable option, and you need to choose carefully according to the specific situation.

HadiDB: A lightweight, horizontally scalable database in Python HadiDB: A lightweight, horizontally scalable database in Python Apr 08, 2025 pm 06:12 PM

HadiDB: A lightweight, high-level scalable Python database HadiDB (hadidb) is a lightweight database written in Python, with a high level of scalability. Install HadiDB using pip installation: pipinstallhadidb User Management Create user: createuser() method to create a new user. The authentication() method authenticates the user's identity. fromhadidb.operationimportuseruser_obj=user("admin","admin")user_obj.

Navicat's method to view MongoDB database password Navicat's method to view MongoDB database password Apr 08, 2025 pm 09:39 PM

It is impossible to view MongoDB password directly through Navicat because it is stored as hash values. How to retrieve lost passwords: 1. Reset passwords; 2. Check configuration files (may contain hash values); 3. Check codes (may hardcode passwords).

Does mysql need the internet Does mysql need the internet Apr 08, 2025 pm 02:18 PM

MySQL can run without network connections for basic data storage and management. However, network connection is required for interaction with other systems, remote access, or using advanced features such as replication and clustering. Additionally, security measures (such as firewalls), performance optimization (choose the right network connection), and data backup are critical to connecting to the Internet.

Can mysql workbench connect to mariadb Can mysql workbench connect to mariadb Apr 08, 2025 pm 02:33 PM

MySQL Workbench can connect to MariaDB, provided that the configuration is correct. First select "MariaDB" as the connector type. In the connection configuration, set HOST, PORT, USER, PASSWORD, and DATABASE correctly. When testing the connection, check that the MariaDB service is started, whether the username and password are correct, whether the port number is correct, whether the firewall allows connections, and whether the database exists. In advanced usage, use connection pooling technology to optimize performance. Common errors include insufficient permissions, network connection problems, etc. When debugging errors, carefully analyze error information and use debugging tools. Optimizing network configuration can improve performance

How to optimize MySQL performance for high-load applications? How to optimize MySQL performance for high-load applications? Apr 08, 2025 pm 06:03 PM

MySQL database performance optimization guide In resource-intensive applications, MySQL database plays a crucial role and is responsible for managing massive transactions. However, as the scale of application expands, database performance bottlenecks often become a constraint. This article will explore a series of effective MySQL performance optimization strategies to ensure that your application remains efficient and responsive under high loads. We will combine actual cases to explain in-depth key technologies such as indexing, query optimization, database design and caching. 1. Database architecture design and optimized database architecture is the cornerstone of MySQL performance optimization. Here are some core principles: Selecting the right data type and selecting the smallest data type that meets the needs can not only save storage space, but also improve data processing speed.

How to solve mysql cannot connect to local host How to solve mysql cannot connect to local host Apr 08, 2025 pm 02:24 PM

The MySQL connection may be due to the following reasons: MySQL service is not started, the firewall intercepts the connection, the port number is incorrect, the user name or password is incorrect, the listening address in my.cnf is improperly configured, etc. The troubleshooting steps include: 1. Check whether the MySQL service is running; 2. Adjust the firewall settings to allow MySQL to listen to port 3306; 3. Confirm that the port number is consistent with the actual port number; 4. Check whether the user name and password are correct; 5. Make sure the bind-address settings in my.cnf are correct.

How to use AWS Glue crawler with Amazon Athena How to use AWS Glue crawler with Amazon Athena Apr 09, 2025 pm 03:09 PM

As a data professional, you need to process large amounts of data from various sources. This can pose challenges to data management and analysis. Fortunately, two AWS services can help: AWS Glue and Amazon Athena.

See all articles