目录
HelloWorld
简介
code
工作队列(任务队列)
消息ack
消息持久化
公平的消息分发
群发
exchange
临时队列
绑定exchange 和 队列
在发送消息是使用刚刚创建的 logs exchange
路由
使用direct exchange
使用topic exchange
 RPC
首页 后端开发 Python教程 RabbitMQ快速入门python教程

RabbitMQ快速入门python教程

Mar 09, 2017 am 09:28 AM
pika python rabbitmq

HelloWorld

简介

RabbitMQ:接受消息再传递消息,可以视为一个“邮局”。发送者和接受者通过队列来进行交互,队列的大小可以视为无限的,多个发送者可以发生给一个队列,多个接收者也可以从一个队列中接受消息。

code

rabbitmq使用的协议是amqp,用于python的推荐客户端是pika

pip install pika -i https://pypi.douban.com/simple/
登录后复制

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
登录后复制

这里链接的是本机的,如果想要连接其他机器上的服务器,只要填入地址或主机名即可。

接下来我们开始发送消息了,注意要确保接受消息的队列是存在的,否则rabbitmq就丢弃掉该消息

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
登录后复制

RabbitMQ默认需要1GB的空闲磁盘空间,否则发送会失败。

这时已在本地队列hello中存放了一个消息,如果使用 rabbitmqctl list_queues 可看到

hello 1
登录后复制

说明有一个hello队列 里面存放了一个消息

receive.py

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
登录后复制

还是先链接到服务器,和之前发送时相同

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
登录后复制

工作队列(任务队列)

工作队列是用于分发耗时任务给多个工作进程的。不立即做那些耗费资源的任务(需要等待这些任务完成),而是安排这些任务之后执行。例如我们把task作为message发送到队列里,启动工作进程来接受并最终执行,且可启动多个工作进程来工作。这适用于web应用,即不应在一个http请求的处理窗口内完成复杂任务。

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
登录后复制

分配消息的方式为 轮询 即每个工作进程获得相同的消息数。

消息ack

如果消息分配给某个工作进程,但是该工作进程未处理完成就崩溃了,可能该消息就丢失了,因为rabbitmq一旦把一个消息分发给工作进程,它就把该消息删掉了。

为了预防消息丢失,rabbitmq提供了ack,即工作进程在收到消息并处理后,发送ack给rabbitmq,告知rabbitmq这时候可以把该消息从队列中删除了。如果工作进程挂掉 了,rabbitmq没有收到ack,那么会把该消息 重新分发给其他工作进程。不需要设置timeout,即使该任务需要很长时间也可以处理。

ack默认是开启的,之前我们的工作进程显示指定了no_ack=True

channel.basic_consume(callback, queue='hello')  # 会启用ack
登录后复制

带ack的callback:

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登录后复制

消息持久化

但是,有时RabbitMQ重启了,消息也会丢失。可在创建队列时设置持久化:
(队列的性质一旦确定无法改变)

channel.queue_declare(queue='task_queue', durable=True)
登录后复制

同时在发送消息时也得设置该消息的持久化属性:

channel.basic_publish(exchange='',

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
登录后复制

但是,如果在RabbitMQ刚接收到消息还没来得及存储,消息还是会丢失。同时,RabbitMQ也不是在接受到每个消息都进行存盘操作。如果还需要更完善的保证,需要使用publisher confirm。

公平的消息分发

轮询模式的消息分发可能并不公平,例如奇数的消息都是繁重任务的话,某些进程则会一直运行繁  重任务。即使某工作进程上有积压的消息未处理,如很多都没发ack,但是RabbitMQ还是会按照顺序发消息给它。可以在接受进程中加设置:

channel.basic_qos(prefetch_count=1)
登录后复制

告知RabbitMQ,这样在一个工作进程没回发ack情况下是不会再分配消息给它。

群发

一般情况下,一条消息是发送给一个工作进程,然后完成,有时想把一条消息同时发送给多个进程:

exchange

发送者是不是直接发送消息到队列中的,事实上发生者根本不知道消息会发送到那个队列,发送者只能把消息发送到exchange里。exchange一方面收生产者的消息,另一方面把他们推送到队列中。所以作为exchange,它需要知道当收到消息时它需要做什么,是应该把它加到一个特殊的队列中还是放到很多的队列中,或者丢弃。exchange有direct、topic、headers、fanout等种类,而群发使用的即fanout。之前在发布消息时,exchange的值为 '' 即使用default exchange。

channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
登录后复制

临时队列

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
登录后复制

这样result.method.queue即是队列名称,在发送或接受时即可使用。

绑定exchange 和 队列

channel.queue_bind(exchange='logs',
               queue='hello')
登录后复制

logs在发送消息时给hello也发一份。

在发送消息是使用刚刚创建的 logs exchange

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
登录后复制

路由

之前已经使用过bind,即建立exchange和queue的关系(该队列对来自该exchange的消息有兴趣),bind时可另外指定routing_key选项。

使用direct exchange

将对应routing key的消息发送到绑定相同routing key的队列中

channel.exchange_declare(exchange='direct_logs',
                     type='direct')
登录后复制

发送函数,发布不同severity的消息:

channel.basic_publish(exchange='direct_logs',
                  routing_key=severity,
                  body=message)
登录后复制

接受函数中绑定对应severity的:

channel.queue_bind(exchange='direct_logs',
                   queue=queue_name,
                   routing_key=severity)
登录后复制

使用topic exchange

之前使用的direct exchange 只能绑定一个routing key,可以使用这种可以拿.隔开routing key的topic exchange,例如:

"stock.usd.nyse" "nyse.vmw"
登录后复制

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
登录后复制

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
登录后复制

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
登录后复制

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
登录后复制

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
登录后复制

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
登录后复制

                                               

以上是RabbitMQ快速入门python教程的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.聊天命令以及如何使用它们
4 周前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Python:游戏,Guis等 Python:游戏,Guis等 Apr 13, 2025 am 12:14 AM

Python在游戏和GUI开发中表现出色。1)游戏开发使用Pygame,提供绘图、音频等功能,适合创建2D游戏。2)GUI开发可选择Tkinter或PyQt,Tkinter简单易用,PyQt功能丰富,适合专业开发。

PHP和Python:比较两种流行的编程语言 PHP和Python:比较两种流行的编程语言 Apr 14, 2025 am 12:13 AM

PHP和Python各有优势,选择依据项目需求。1.PHP适合web开发,尤其快速开发和维护网站。2.Python适用于数据科学、机器学习和人工智能,语法简洁,适合初学者。

debian readdir如何与其他工具集成 debian readdir如何与其他工具集成 Apr 13, 2025 am 09:42 AM

Debian系统中的readdir函数是用于读取目录内容的系统调用,常用于C语言编程。本文将介绍如何将readdir与其他工具集成,以增强其功能。方法一:C语言程序与管道结合首先,编写一个C程序调用readdir函数并输出结果:#include#include#includeintmain(intargc,char*argv[]){DIR*dir;structdirent*entry;if(argc!=2){

Python和时间:充分利用您的学习时间 Python和时间:充分利用您的学习时间 Apr 14, 2025 am 12:02 AM

要在有限的时间内最大化学习Python的效率,可以使用Python的datetime、time和schedule模块。1.datetime模块用于记录和规划学习时间。2.time模块帮助设置学习和休息时间。3.schedule模块自动化安排每周学习任务。

Nginx SSL证书更新Debian教程 Nginx SSL证书更新Debian教程 Apr 13, 2025 am 07:21 AM

本文将指导您如何在Debian系统上更新NginxSSL证书。第一步:安装Certbot首先,请确保您的系统已安装certbot和python3-certbot-nginx包。若未安装,请执行以下命令:sudoapt-getupdatesudoapt-getinstallcertbotpython3-certbot-nginx第二步:获取并配置证书使用certbot命令获取Let'sEncrypt证书并配置Nginx:sudocertbot--nginx按照提示选

Debian上GitLab的插件开发指南 Debian上GitLab的插件开发指南 Apr 13, 2025 am 08:24 AM

在Debian上开发GitLab插件需要一些特定的步骤和知识。以下是一个基本的指南,帮助你开始这个过程。安装GitLab首先,你需要在Debian系统上安装GitLab。可以参考GitLab的官方安装手册。获取API访问令牌在进行API集成之前,首先需要获取GitLab的API访问令牌。打开GitLab仪表盘,在用户设置中找到“AccessTokens”选项,生成一个新的访问令牌。将生成的

Debian OpenSSL如何配置HTTPS服务器 Debian OpenSSL如何配置HTTPS服务器 Apr 13, 2025 am 11:03 AM

在Debian系统上配置HTTPS服务器涉及几个步骤,包括安装必要的软件、生成SSL证书、配置Web服务器(如Apache或Nginx)以使用SSL证书。以下是一个基本的指南,假设你使用的是ApacheWeb服务器。1.安装必要的软件首先,确保你的系统是最新的,并安装Apache和OpenSSL:sudoaptupdatesudoaptupgradesudoaptinsta

apache属于什么服务 apache属于什么服务 Apr 13, 2025 pm 12:06 PM

Apache是互联网幕后的英雄,不仅是Web服务器,更是一个支持巨大流量、提供动态内容的强大平台。它通过模块化设计提供极高的灵活性,可根据需要扩展各种功能。然而,模块化也带来配置和性能方面的挑战,需要谨慎管理。Apache适合需要高度可定制、满足复杂需求的服务器场景。

See all articles