목차
HelloWorld
소개
code
메시지가 배포되는 방식은 폴링입니다. 즉, 각 작업자 프로세스는 동일한 수의 메시지를 받습니다.
메시지 손실을 방지하기 위해 Rabbitmq는 ack를 제공합니다. 즉, 작업자 프로세스가 메시지를 수신하고 처리한 후 ack를 Rabbitmq에 보내 이 메시지가 큐에서 삭제될 수 있음을 Rabbitmq에 알립니다. 시간. 작업자 프로세스가 종료되고 Rabbitmq가 응답을 받지 못하면 메시지가 다른 작업자 프로세스로 재배포됩니다. 작업 시간이 오래 걸리더라도 처리할 수 있으므로 시간 초과를 설정할 필요가 없습니다.
메시지를 보낼 때
 RPC
백엔드 개발 파이썬 튜토리얼 RabbitMQ 빠른 시작 Python 튜토리얼

RabbitMQ 빠른 시작 Python 튜토리얼

Mar 09, 2017 am 09:28 AM
pika python rabbitmq

HelloWorld

소개

RabbitMQ: 메시지를 받아 전달하는 것은 "우체국"으로 간주할 수 있습니다. 발신자와 수신자는 대기열을 통해 상호 작용합니다. 대기열의 크기는 무제한으로 간주될 수 있습니다. 여러 발신자가 대기열에 메시지를 보낼 수 있으며 여러 수신자가 대기열에서 메시지를 받을 수도 있습니다.

code

rabbitmq에서 사용하는 프로토콜은 amqp이고, Python에 권장되는 클라이언트는 pika입니다.

pip install pika -i https://pypi.douban.com/simple/
로그인 후 복사

send.py

# coding: utf8
import pika

# 建立一个连接
connection = pika.BlockingConnection(pika.ConnectionParameters(
           'localhost'))  # 连接本地的RabbitMQ服务器
channel = connection.channel()  # 获得channel
로그인 후 복사

링크는 다음과 같습니다. 이 머신의 경우 다른 머신의 서버에 연결하려면 주소나 호스트 이름만 입력하면 됩니다.

이때 hello 큐에 메시지가 저장되어 있습니다. Rabbitmqctl list_queues를 사용하면 hello 큐에 메시지가 저장되어 있음을 나타내는

channel.queue_declare(queue='hello')  # 在RabbitMQ中创建hello这个队列
channel.basic_publish(exchange='',  # 使用默认的exchange来发送消息到队列
                  routing_key='hello',  # 发送到该队列 hello 中
                  body='Hello World!')  # 消息内容

connection.close()  # 关闭 同时flush
로그인 후 복사

를 볼 수 있습니다

receive .py

hello 1
로그인 후 복사

이전에 보낼 때와 동일하게 먼저 서버에 연결

# coding: utf8
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
               'localhost'))
channel = connection.channel()
로그인 후 복사

작업 대기열(작업 대기열)

작업 대기열은 다음과 같습니다. 시간이 많이 걸리는 작업을 여러 작업 프로세스에 배포하는 데 사용됩니다. 리소스를 많이 소모하는 작업을 즉시 수행하는 대신(이러한 작업이 완료될 때까지 기다려야 함) 나중에 실행되도록 해당 작업을 예약하세요. 예를 들어 작업을 대기열에 메시지로 보내고, 작업자 프로세스를 시작하여 이를 수락하고 최종적으로 실행하며, 여러 작업자 프로세스를 시작하여 작업할 수 있습니다. 이는 http 요청 처리 창 내에서 복잡한 작업을 완료해서는 안 되는 웹 애플리케이션에 적용됩니다.

channel.queue_declare(queue='hello')  # 此处就是声明了 来确保该队列 hello 存在 可以多次声明 这里主要是为了防止接受程序先运行时出错

def callback(ch, method, properties, body):  # 用于接收到消息后的回调
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',  # 收指定队列hello的消息
                      no_ack=True)  #在处理完消息后不发送ack给服务器
channel.start_consuming()  # 启动消息接受 这会进入一个死循环
로그인 후 복사

메시지가 배포되는 방식은 폴링입니다. 즉, 각 작업자 프로세스는 동일한 수의 메시지를 받습니다.

메시지 승인

메시지가 작업자 프로세스에 할당되었지만 처리가 완료되기 전에 작업자 프로세스가 충돌하는 경우, Rabbitmq가 작업자 프로세스에 메시지를 배포하면 메시지가 손실될 수 있습니다. , 메시지를 삭제합니다.

메시지 손실을 방지하기 위해 Rabbitmq는 ack를 제공합니다. 즉, 작업자 프로세스가 메시지를 수신하고 처리한 후 ack를 Rabbitmq에 보내 이 메시지가 큐에서 삭제될 수 있음을 Rabbitmq에 알립니다. 시간. 작업자 프로세스가 종료되고 Rabbitmq가 응답을 받지 못하면 메시지가 다른 작업자 프로세스로 재배포됩니다. 작업 시간이 오래 걸리더라도 처리할 수 있으므로 시간 초과를 설정할 필요가 없습니다.

ack는 기본적으로 활성화되어 있습니다. 이전에는 작업자 프로세스에서 no_ack=True

channel.basic_publish(exchange='',
                  routing_key='task_queue',
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # 使得消息持久化
                  ))
로그인 후 복사

ack:

channel.basic_consume(callback, queue='hello')  # 会启用ack
로그인 후 복사

메시지 지속성

을 사용한 콜백을 지정했습니다. RabbitMQ가 다시 시작되고 메시지가 손실됩니다. 큐를 생성할 때 지속성을 설정할 수 있습니다.

(큐의 성격은 일단 결정되면 변경할 수 없습니다.)

def callback(ch, method, properties, body):
    print " [x] Received %r" % (body,)
    time.sleep( body.count('.') )
    print " [x] Done"
    ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
로그인 후 복사

동시에 메시지를 보낼 때 메시지의 지속성 속성도 설정해야 합니다. :


channel .basic_publish(exchange='',

channel.queue_declare(queue='task_queue', durable=True)
로그인 후 복사

그러나 RabbitMQ가 메시지를 방금 받았고 저장할 시간이 없다면 메시지는 여전히 손실됩니다. 동시에 RabbitMQ는 수신한 모든 메시지를 저장하지 않습니다. 더 완전한 보장이 필요한 경우 게시자 확인을 사용해야 합니다.

공정한 메시지 배포

폴링 모드 메시지 배포는 공정하지 않을 수 있습니다. 예를 들어 홀수 개의 메시지가 무거운 작업인 경우 일부 프로세스는 항상 무거운 작업을 실행합니다. 예를 들어 작업자 프로세스에서 처리되지 않은 메시지의 백로그가 있어도 RabbitMQ는 여전히 순서대로 메시지를 보낼 수 있습니다. <.>

                  routing_key="task_queue",
                  body=message,
                  properties=pika.BasicProperties(
                     delivery_mode = 2, # make message persistent
                  ))
로그인 후 복사

작업자 프로세스가 응답을 다시 보내지 않으면 더 이상 메시지가 할당되지 않음을 RabbitMQ에 알립니다. 작업자 프로세스를 거친 후 동시에 여러 프로세스에 메시지를 보내고 싶을 때가 있습니다.

교환

실제로 보낸 사람이 메시지를 대기열로 직접 보냅니까? 발신자는 메시지가 어느 대기열로 전송될지 알지 못한 채 메시지를 교환기로만 보낼 수 있습니다. 한편으로 교환기는 생산자로부터 메시지를 수신하고 다른 한편으로는 해당 메시지를 대기열에 푸시합니다. 따라서 Exchange에서는 메시지가 수신된 시점을 알아야 하며, Exchange가 직접, 주제, 헤더, 팬아웃을 가지고 있는지, 특수 대기열에 추가해야 하는지를 알아야 합니다. 및 기타 유형이 있으며 대량 전송에 사용되는 것은 게시 전 팬아웃입니다. 메시지를 보낼 때 교환 값은 '' 즉,

channel.basic_qos(prefetch_count=1)
로그인 후 복사
임시 대기열

을 사용합니다.
channel.exchange_declare(exchange='logs', type='fanout')  # 该exchange会把消息发送给所有它知道的队列中
로그인 후 복사

이런 방식으로 result.method.queue는 전송 또는 수신 시 사용할 수 있는 대기열 이름입니다.

교환 및 대기열 바인딩

result = channel.queue_declare()  # 创建一个随机队列
result = channel.queue_declare(exclusive=True)  # 创建一个随机队列,同时在没有接收者连接该队列后则销毁它
queue_name = result.method.queue
로그인 후 복사

로그도 복사본을 보냅니다. 안녕하세요 메시지를 보낼 때

새로 생성된 로그 교환을 사용합니다

메시지를 보낼 때

라우팅

바인드는 이전에 사용되었습니다. 즉, 교환 간의 관계입니다. 큐가 설정됩니다(큐는 교환의 메시지에 관심이 있습니다). 바인딩할 때 Routing_key 옵션을 지정하여 라우팅 키에 해당하는 메시지를 바인딩된 큐로 보낼 수도 있습니다. 동일한 라우팅 키

channel.queue_bind(exchange='logs',
               queue='hello')
로그인 후 복사

보내기 기능, 다른 심각도의 메시지 게시:

   channel.basic_publish(exchange='logs',
                  routing_key='',
                  body=message)
로그인 후 복사
허용 기능 심각도에 해당하는 바인딩:

channel.exchange_declare(exchange='direct_logs',
                     type='direct')
로그인 후 복사

주제 교환 사용

이전에 사용된 직접 교환은 하나의 라우팅 키만 바인딩할 수 있습니다. 라우팅 키를 구분하는 이 주제 교환을 사용할 수 있습니다. 예:

"stock.usd.nyse" "nyse.vmw"
로그인 후 복사

和direct exchange一样,在接受者那边绑定的key与发送时指定的routing key相同即可,另外有些特殊的值:

* 代表1个单词
# 代表0个或多个单词
로그인 후 복사

如果发送者发出的routing key都是3个部分的,如:celerity.colour.species。

Q1:
*.orange.*  对应的是中间的colour都为orange的

Q2:
*.*.rabbit  对应的是最后部分的species为rabbit的
lazy.#      对应的是第一部分是lazy的
로그인 후 복사

qucik.orange.rabbit Q1 Q2都可接收到,quick.orange.fox 只有Q1能接受到,对于lazy.pink.rabbit虽然匹配到了Q2两次,但是只会发送一次。如果绑定时直接绑定#,则会收到所有的。

 RPC

在远程机器上运行一个函数然后获得结果。

1、客户端启动 同时设置一个临时队列用于接受回调,绑定该队列

    self.connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    self.channel = self.connection.channel()
    result = self.channel.queue_declare(exclusive=True)
    self.callback_queue = result.method.queue
    self.channel.basic_consume(self.on_response, no_ack=True,
                               queue=self.callback_queue)
로그인 후 복사

2、客户端发送rpc请求,同时附带reply_to对应回调队列,correlation_id设置为每个请求的唯一id(虽然说可以为每一次RPC请求都创建一个回调队列,但是这样效率不高,如果一个客户端只使用一个队列,则需要使用correlation_id来匹配是哪个请求),之后阻塞在回调队列直到收到回复

注意:如果收到了非法的correlation_id直接丢弃即可,因为有这种情况--服务器已经发了响应但是还没发ack就挂了,等一会服务器重启了又会重新处理该任务,又发了一遍相应,但是这时那个请求已经被处理掉了

channel.basic_publish(exchange='',
                       routing_key='rpc_queue',
                       properties=pika.BasicProperties(
                             reply_to = self.callback_queue,
                             correlation_id = self.corr_id,
                             ),
                       body=str(n))  # 发出调用

while self.response is None:  # 这边就相当于阻塞了
    self.connection.process_data_events()  # 查看回调队列
return int(self.response)
로그인 후 복사

3、请求会发送到rpc_queue队列
4、RPC服务器从rpc_queue中取出,执行,发送回复

channel.basic_consume(on_request, queue='rpc_queue')  # 绑定 等待请求

# 处理之后:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id = \
                                                     props.correlation_id),
                 body=str(response))  # 发送回复到回调队列
ch.basic_ack(delivery_tag = method.delivery_tag)  # 发送ack
로그인 후 복사

5、客户端从回调队列中取出数据,检查correlation_id,执行相应操作

if self.corr_id == props.correlation_id:
        self.response = body
로그인 후 복사

                                               

위 내용은 RabbitMQ 빠른 시작 Python 튜토리얼의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.

핫 AI 도구

Undresser.AI Undress

Undresser.AI Undress

사실적인 누드 사진을 만들기 위한 AI 기반 앱

AI Clothes Remover

AI Clothes Remover

사진에서 옷을 제거하는 온라인 AI 도구입니다.

Undress AI Tool

Undress AI Tool

무료로 이미지를 벗다

Clothoff.io

Clothoff.io

AI 옷 제거제

AI Hentai Generator

AI Hentai Generator

AI Hentai를 무료로 생성하십시오.

인기 기사

R.E.P.O. 에너지 결정과 그들이하는 일 (노란색 크리스탈)
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 최고의 그래픽 설정
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 아무도들을 수없는 경우 오디오를 수정하는 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. 채팅 명령 및 사용 방법
4 몇 주 전 By 尊渡假赌尊渡假赌尊渡假赌

뜨거운 도구

메모장++7.3.1

메모장++7.3.1

사용하기 쉬운 무료 코드 편집기

SublimeText3 중국어 버전

SublimeText3 중국어 버전

중국어 버전, 사용하기 매우 쉽습니다.

스튜디오 13.0.1 보내기

스튜디오 13.0.1 보내기

강력한 PHP 통합 개발 환경

드림위버 CS6

드림위버 CS6

시각적 웹 개발 도구

SublimeText3 Mac 버전

SublimeText3 Mac 버전

신 수준의 코드 편집 소프트웨어(SublimeText3)

웹 사이트 성과를 향상시키기 위해 Debian Apache Logs를 사용하는 방법 웹 사이트 성과를 향상시키기 위해 Debian Apache Logs를 사용하는 방법 Apr 12, 2025 pm 11:36 PM

이 기사는 데비안 시스템에서 Apache Logs를 분석하여 웹 사이트 성능을 향상시키는 방법을 설명합니다. 1. 로그 분석 기본 사항 Apache Log는 IP 주소, 타임 스탬프, 요청 URL, HTTP 메소드 및 응답 코드를 포함한 모든 HTTP 요청의 자세한 정보를 기록합니다. 데비안 시스템 에서이 로그는 일반적으로 /var/log/apache2/access.log 및 /var/log/apache2/error.log 디렉토리에 있습니다. 로그 구조를 이해하는 것은 효과적인 분석의 첫 번째 단계입니다. 2. 로그 분석 도구 다양한 도구를 사용하여 Apache 로그를 분석 할 수 있습니다.

파이썬 : 게임, Guis 등 파이썬 : 게임, Guis 등 Apr 13, 2025 am 12:14 AM

Python은 게임 및 GUI 개발에서 탁월합니다. 1) 게임 개발은 Pygame을 사용하여 드로잉, 오디오 및 기타 기능을 제공하며 2D 게임을 만드는 데 적합합니다. 2) GUI 개발은 Tkinter 또는 PYQT를 선택할 수 있습니다. Tkinter는 간단하고 사용하기 쉽고 PYQT는 풍부한 기능을 가지고 있으며 전문 개발에 적합합니다.

PHP 및 Python : 두 가지 인기있는 프로그래밍 언어를 비교합니다 PHP 및 Python : 두 가지 인기있는 프로그래밍 언어를 비교합니다 Apr 14, 2025 am 12:13 AM

PHP와 Python은 각각 고유 한 장점이 있으며 프로젝트 요구 사항에 따라 선택합니다. 1.PHP는 웹 개발, 특히 웹 사이트의 빠른 개발 및 유지 보수에 적합합니다. 2. Python은 간결한 구문을 가진 데이터 과학, 기계 학습 및 인공 지능에 적합하며 초보자에게 적합합니다.

Debian Readdir가 다른 도구와 통합하는 방법 Debian Readdir가 다른 도구와 통합하는 방법 Apr 13, 2025 am 09:42 AM

데비안 시스템의 readdir 함수는 디렉토리 컨텐츠를 읽는 데 사용되는 시스템 호출이며 종종 C 프로그래밍에 사용됩니다. 이 기사에서는 ReadDir를 다른 도구와 통합하여 기능을 향상시키는 방법을 설명합니다. 방법 1 : C 언어 프로그램을 파이프 라인과 결합하고 먼저 C 프로그램을 작성하여 readDir 함수를 호출하고 결과를 출력하십시오.#포함#포함#포함#포함#includinTmain (intargc, char*argv []) {dir*dir; structdirent*entry; if (argc! = 2) {

DDOS 공격 탐지에서 데비안 스나이퍼의 역할 DDOS 공격 탐지에서 데비안 스나이퍼의 역할 Apr 12, 2025 pm 10:42 PM

이 기사에서는 DDOS 공격 탐지 방법에 대해 설명합니다. "Debiansniffer"의 직접적인 적용 사례는 발견되지 않았지만 DDOS 공격 탐지에 다음과 같은 방법을 사용할 수 있습니다. 효과적인 DDOS 공격 탐지 기술 : 트래픽 분석을 기반으로 한 탐지 : 갑작스런 트래픽 성장, 특정 포트에서의 연결 감지 등의 비정상적인 네트워크 트래픽 패턴을 모니터링하여 DDOS 공격을 식별합니다. 예를 들어, Pyshark 및 Colorama 라이브러리와 결합 된 Python 스크립트는 실시간으로 네트워크 트래픽을 모니터링하고 경고를 발행 할 수 있습니다. 통계 분석에 기반한 탐지 : 데이터와 같은 네트워크 트래픽의 통계적 특성을 분석하여

파이썬과 시간 : 공부 시간을 최대한 활용 파이썬과 시간 : 공부 시간을 최대한 활용 Apr 14, 2025 am 12:02 AM

제한된 시간에 Python 학습 효율을 극대화하려면 Python의 DateTime, Time 및 Schedule 모듈을 사용할 수 있습니다. 1. DateTime 모듈은 학습 시간을 기록하고 계획하는 데 사용됩니다. 2. 시간 모듈은 학습과 휴식 시간을 설정하는 데 도움이됩니다. 3. 일정 모듈은 주간 학습 작업을 자동으로 배열합니다.

NGINX SSL 인증서 업데이트 Debian Tutorial NGINX SSL 인증서 업데이트 Debian Tutorial Apr 13, 2025 am 07:21 AM

이 기사에서는 Debian 시스템에서 NginxSSL 인증서를 업데이트하는 방법에 대해 안내합니다. 1 단계 : CertBot을 먼저 설치하십시오. 시스템에 CERTBOT 및 PYTHON3-CERTBOT-NGINX 패키지가 설치되어 있는지 확인하십시오. 설치되지 않은 경우 다음 명령을 실행하십시오. sudoapt-getupdatesudoapt-getinstallcertbotpython3-certbot-nginx 2 단계 : 인증서 획득 및 구성 rectbot 명령을 사용하여 nginx를 획득하고 nginx를 구성하십시오.

Debian OpenSSL에서 HTTPS 서버를 구성하는 방법 Debian OpenSSL에서 HTTPS 서버를 구성하는 방법 Apr 13, 2025 am 11:03 AM

데비안 시스템에서 HTTPS 서버를 구성하려면 필요한 소프트웨어 설치, SSL 인증서 생성 및 SSL 인증서를 사용하기 위해 웹 서버 (예 : Apache 또는 Nginx)를 구성하는 등 여러 단계가 포함됩니다. 다음은 Apacheweb 서버를 사용하고 있다고 가정하는 기본 안내서입니다. 1. 필요한 소프트웨어를 먼저 설치하고 시스템이 최신 상태인지 확인하고 Apache 및 OpenSSL을 설치하십시오 : Sudoaptupdatesudoaptupgradesudoaptinsta

See all articles