RabbitMQ: 메시지를 받아 전달하는 것은 "우체국"으로 간주할 수 있습니다. 발신자와 수신자는 대기열을 통해 상호 작용합니다. 대기열의 크기는 무제한으로 간주될 수 있습니다. 여러 발신자가 대기열에 메시지를 보낼 수 있으며 여러 수신자가 대기열에서 메시지를 받을 수도 있습니다.
rabbitmq에서 사용하는 프로토콜은 amqp이고, Python에 권장되는 클라이언트는 pika입니다.
pip install pika -i https://pypi.douban.com/simple/
send.py
# coding: utf8 import pika # 建立一个连接 connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) # 连接本地的RabbitMQ服务器 channel = connection.channel() # 获得channel
링크는 다음과 같습니다. 이 머신의 경우 다른 머신의 서버에 연결하려면 주소나 호스트 이름만 입력하면 됩니다.
이때 hello 큐에 메시지가 저장되어 있습니다. Rabbitmqctl list_queues를 사용하면 hello 큐에 메시지가 저장되어 있음을 나타내는
channel.queue_declare(queue='hello') # 在RabbitMQ中创建hello这个队列 channel.basic_publish(exchange='', # 使用默认的exchange来发送消息到队列 routing_key='hello', # 发送到该队列 hello 中 body='Hello World!') # 消息内容 connection.close() # 关闭 同时flush
를 볼 수 있습니다
receive .py
hello 1
이전에 보낼 때와 동일하게 먼저 서버에 연결
# coding: utf8 import pika connection = pika.BlockingConnection(pika.ConnectionParameters( 'localhost')) channel = connection.channel()
작업 대기열(작업 대기열)
작업 대기열은 다음과 같습니다. 시간이 많이 걸리는 작업을 여러 작업 프로세스에 배포하는 데 사용됩니다. 리소스를 많이 소모하는 작업을 즉시 수행하는 대신(이러한 작업이 완료될 때까지 기다려야 함) 나중에 실행되도록 해당 작업을 예약하세요. 예를 들어 작업을 대기열에 메시지로 보내고, 작업자 프로세스를 시작하여 이를 수락하고 최종적으로 실행하며, 여러 작업자 프로세스를 시작하여 작업할 수 있습니다. 이는 http 요청 처리 창 내에서 복잡한 작업을 완료해서는 안 되는 웹 애플리케이션에 적용됩니다.
channel.queue_declare(queue='hello') # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错 def callback(ch, method, properties, body): # 用于接收到消息后的回调 print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', # 收指定队列hello的消息 no_ack=True) #在处理完消息后不发送ack给服务器 channel.start_consuming() # 启动消息接受 这会进入一个死循环
메시지 승인
메시지가 작업자 프로세스에 할당되었지만 처리가 완료되기 전에 작업자 프로세스가 충돌하는 경우, Rabbitmq가 작업자 프로세스에 메시지를 배포하면 메시지가 손실될 수 있습니다. , 메시지를 삭제합니다.
ack는 기본적으로 활성화되어 있습니다. 이전에는 작업자 프로세스에서 no_ack=True
channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # 使得消息持久化 ))
ack:
channel.basic_consume(callback, queue='hello') # 会启用ack
메시지 지속성
을 사용한 콜백을 지정했습니다. RabbitMQ가 다시 시작되고 메시지가 손실됩니다. 큐를 생성할 때 지속성을 설정할 수 있습니다.
(큐의 성격은 일단 결정되면 변경할 수 없습니다.)def callback(ch, method, properties, body): print " [x] Received %r" % (body,) time.sleep( body.count('.') ) print " [x] Done" ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
channel .basic_publish(exchange='',
channel.queue_declare(queue='task_queue', durable=True)
그러나 RabbitMQ가 메시지를 방금 받았고 저장할 시간이 없다면 메시지는 여전히 손실됩니다. 동시에 RabbitMQ는 수신한 모든 메시지를 저장하지 않습니다. 더 완전한 보장이 필요한 경우 게시자 확인을 사용해야 합니다.
공정한 메시지 배포
폴링 모드 메시지 배포는 공정하지 않을 수 있습니다. 예를 들어 홀수 개의 메시지가 무거운 작업인 경우 일부 프로세스는 항상 무거운 작업을 실행합니다. 예를 들어 작업자 프로세스에서 처리되지 않은 메시지의 백로그가 있어도 RabbitMQ는 여전히 순서대로 메시지를 보낼 수 있습니다. <.>
routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
channel.basic_qos(prefetch_count=1)
channel.exchange_declare(exchange='logs', type='fanout') # 该exchange会把消息发送给所有它知道的队列中
result = channel.queue_declare() # 创建一个随机队列 result = channel.queue_declare(exclusive=True) # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它 queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue='hello')
channel.basic_publish(exchange='logs', routing_key='', body=message)
channel.exchange_declare(exchange='direct_logs', type='direct')
"stock.usd.nyse" "nyse.vmw"
和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:
* 代表1个单词 # 代表0个或多个单词
如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。
Q1: *.orange.* 对应的是中间的colour都为orange的 Q2: *.*.rabbit 对应的是最后部分的species为rabbit的 lazy.# 对应的是第一部分是lazy的
qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。
在远程机器上运行一个函数然后获得结果。
1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列
self.connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) self.channel = self.connection.channel() result = self.channel.queue_declare(exclusive=True) self.callback_queue = result.method.queue self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复
注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了
channel.basic_publish(exchange='', routing_key='rpc_queue', properties=pika.BasicProperties( reply_to = self.callback_queue, correlation_id = self.corr_id, ), body=str(n)) # 发出调用 while self.response is None: # 这边就相当于阻塞了 self.connection.process_data_events() # 查看回调队列 return int(self.response)
3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复
channel.basic_consume(on_request, queue='rpc_queue') # 绑定 等待请求 # 处理之后: ch.basic_publish(exchange='', routing_key=props.reply_to, properties=pika.BasicProperties(correlation_id = \ props.correlation_id), body=str(response)) # 发送回复到回调队列 ch.basic_ack(delivery_tag = method.delivery_tag) # 发送ack
5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作
if self.corr_id == props.correlation_id: self.response = body
위 내용은 RabbitMQ 빠른 시작 Python 튜토리얼의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!