Maison > développement back-end > Tutoriel Python > Déploiement de l'environnement Python Pika RabbitMQ et mise en œuvre de la file d'attente de travail

Déploiement de l'environnement Python Pika RabbitMQ et mise en œuvre de la file d'attente de travail

高洛峰
Libérer: 2017-03-01 14:04:50
original
1631 Les gens l'ont consulté

La traduction chinoise de lapinmq fait principalement référence aux lettres mq : Message Queue, qui signifie file d'attente de messages. Il y a aussi le mot "lapin" devant, qui signifie lapin. C'est la même chose que le langage python appelé python. Les étrangers sont assez humoristiques. Le service RabbitMQ est similaire aux services MySQL et Apache, mais les fonctions fournies sont différentes. rabbimq est utilisé pour fournir un service d'envoi de messages, qui peut être utilisé pour communiquer entre différentes applications.

Installez Rabbitmq
Installez d'abord Rabbitmq Sous Ubuntu 12.04, vous pouvez l'installer directement via apt-get :

sudo apt-get install rabbitmq-server
Copier après la connexion

Après l'installation, le service lapinmq a été démarré. Examinons ensuite un exemple d’écriture de Hello World en python. Le contenu de l'exemple est d'envoyer "Hello World!" de send.py à lapinmq, et recevoir.py reçoit les informations envoyées par send.py depuis lapinmq.

Python Pika RabbitMQ环境部署及实现工作队列

P signifie product, qui signifie producteur, et peut également être appelé expéditeur. Dans l'exemple, il est affiché sous la forme send.py ; , ce qui signifie consommateur. Signification, il peut également être appelé le récepteur, qui est représenté par recevoir.py dans l'exemple ; le rouge au milieu représente la signification de la file d'attente, qui est représentée par la file d'attente hello dans l'exemple ;

Python utilise le service Rabbitmq. Vous pouvez utiliser les bibliothèques de classes prêtes à l'emploi pika, txAMQP ou py-amqplib. Ici, j'ai choisi pika.

Installer pika

Pour installer pika, vous pouvez utiliser pip est un package de gestion de logiciel python. S'il n'est pas installé, vous pouvez l'installer via apt-. get

sudo apt-get install python-pip
Copier après la connexion

Installer pika via pip :

sudo pip install pika
Copier après la connexion

send.py Le code

se connecte au serveur Rabbitmq Parce qu'il est testé localement, utilisez simplement localhost.

connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
Copier après la connexion

Déclare une file d'attente de messages dans laquelle les messages seront livrés. Si des messages sont envoyés vers une file d'attente qui n'existe pas, Rabbitmq effacera automatiquement ces messages.

channel.queue_declare(queue='hello')
Copier après la connexion

Envoyer le message à la file d'attente hello déclarée ci-dessus, où échange représente l'échangeur, qui peut spécifier avec précision à quelle file d'attente le message doit être envoyé, et router_key est défini sur la file d'attente. Le nom et le corps sont le contenu à envoyer. Nous ne prêterons pas attention aux détails spécifiques de l'envoi pour l'instant.

channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
Copier après la connexion

Fermer la connexion

connection.close()
Copier après la connexion

Code complet

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print " [x] Sent 'Hello World!'"
connection.close()
Copier après la connexion

Exécutez d'abord ce programme. Si l'exécution réussit, Rabbitmqctl devrait ajouter avec succès la file d'attente hello et il devrait y avoir un message dans la file d'attente. Utilisez la commande Rabbitmqctl. pour le vérifier.

rabbitmqctl list_queues
Copier après la connexion

Sortez les informations suivantes sur l'ordinateur de l'auteur :

Python Pika RabbitMQ环境部署及实现工作队列


Il y a bien une file d'attente bonjour, et il y a un message dans la file d'attente. Ensuite, utilisez recevoir.py pour obtenir les informations dans la file d'attente.

le code receive.py

est le même que les deux étapes précédentes de send.py, qui nécessitent toutes deux d'abord de se connecter au serveur, puis de déclarer la file d'attente des messages , qui ne sera pas discuté ici. Publié le même code.

La réception de messages est plus compliquée et vous devez définir une fonction de rappel pour les traiter. La fonction de rappel ici consiste à imprimer les informations.

def callback(ch, method, properties, body):
  print "Received %r" % (body,)
Copier après la connexion

Dites à Rabbitmq d'utiliser le rappel pour recevoir des messages

channel.basic_consume(callback, queue='hello', no_ack=True)
Copier après la connexion

Commencez à recevoir des informations et entrez dans l'état de blocage. Ce n'est que lorsqu'il y a des informations dans la file d'attente que le rappel sera appelé pour traitement. Appuyez sur ctrl c pour quitter.

channel.start_consuming()
Copier après la connexion

Code complet

#!/usr/bin/env python
#coding=utf8
import pika
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='hello')
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
 
channel.basic_consume(callback, queue='hello', no_ack=True)
 
print ' [*] Waiting for messages. To exit press CTRL+C'
channel.start_consuming()
Copier après la connexion

Exécutez le programme et vous pouvoir recevoir Le message Hello World! est envoyé à la file d'attente hello, puis imprimé à l'écran. Changez de terminal et exécutez à nouveau send.py. Vous pouvez voir que contain.py recevra à nouveau des informations.

Exemple de file d'attente de travail

1 Préparation

Dans l'exemple de programme, utilisez new_task .py pour simuler. l'allocateur de tâches et worker.py pour simuler le travailleur.

Modifiez send.py pour recevoir les informations des paramètres de ligne de commande et envoyez

import sys
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='hello',
           body=message)
print " [x] Sent %r" % (message,)
Copier après la connexion

Modifiez la fonction de rappel de recevoir.py.

import time
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
Copier après la connexion

Ici, ouvrez d'abord deux terminaux, tous deux exécutent worker.py et sont en état d'écoute. Ce côté équivaut à deux travailleurs. Ouvrez le troisième terminal et exécutez new_task.py

$ python new_task.py First message.
$ python new_task.py Second message..
$ python new_task.py Third message...
$ python new_task.py Fourth message....
$ python new_task.py Fifth message.....
Copier après la connexion

Observez que travailleur.py reçoit des tâches et qu'un travailleur reçoit 3 tâches :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'First message.'
 [x] Received 'Third message...'
 [x] Received 'Fifth message.....'
Copier après la connexion

Un autre travailleur a reçu 2 tâches :

$ python worker.py
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Second message..'
 [x] Received 'Fourth message....'
Copier après la connexion

从上面来看,每个工作者,都会依次分配到任务。那么如果一个工作者,在处理任务的时候挂掉,这个任务就没有完成,应当交由其他工作者处理。所以应当有一种机制,当一个工作者完成任务时,会反馈消息。

2.消息确认(Message acknowledgment)

消息确认就是当工作者完成任务后,会反馈给rabbitmq。修改worker.py中的回调函数:

def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep(5)
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
Copier après la connexion

这边停顿5秒,可以方便ctrl+c退出。

去除no_ack=True参数或者设置为False也可以。

channel.basic_consume(callback, queue='hello', no_ack=False)
Copier après la connexion

用这个代码运行,即使其中一个工作者ctrl+c退出后,正在执行的任务也不会丢失,rabbitmq会将任务重新分配给其他工作者。

3.消息持久化存储(Message durability)

虽然有了消息反馈机制,但是如果rabbitmq自身挂掉的话,那么任务还是会丢失。所以需要将任务持久化存储起来。声明持久化存储:

channel.queue_declare(queue='hello', durable=True)
Copier après la connexion

但是这个程序会执行错误,因为hello这个队列已经存在,并且是非持久化的,rabbitmq不允许使用不同的参数来重新定义存在的队列。重新定义一个队列:

channel.queue_declare(queue='task_queue', durable=True)
Copier après la connexion

在发送任务的时候,用delivery_mode=2来标记任务为持久化存储:

channel.basic_publish(exchange='',
           routing_key="task_queue",
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
Copier après la connexion

4.公平调度(Fair dispatch)

上面实例中,虽然每个工作者是依次分配到任务,但是每个任务不一定一样。可能有的任务比较重,执行时间比较久;有的任务比较轻,执行时间比较短。如果能公平调度就最好了,使用basic_qos设置prefetch_count=1,使得rabbitmq不会在同一时间给工作者分配多个任务,即只有工作者完成任务之后,才会再次接收到任务。

channel.basic_qos(prefetch_count=1)
Copier après la connexion

new_task.py完整代码

#!/usr/bin/env python
import pika
import sys
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
 
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
           routing_key='task_queue',
           body=message,
           properties=pika.BasicProperties(
             delivery_mode = 2, # make message persistent
           ))
print " [x] Sent %r" % (message,)
connection.close()
worker.py完整代码

#!/usr/bin/env python
import pika
import time
 
connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
channel = connection.channel()
 
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
 
def callback(ch, method, properties, body):
  print " [x] Received %r" % (body,)
  time.sleep( body.count('.') )
  print " [x] Done"
  ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
           queue='task_queue')
 
channel.start_consuming()
Copier après la connexion


更多Python+Pika+RabbitMQ环境部署及实现工作队列相关文章请关注PHP中文网!

Étiquettes associées:
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal