RabbitMQ は、erlang によって開発された AMQP (Advanced Message Queuing Protocol) のオープンソース実装です。
AMQP: Advanced Message Queuing Protocol は、メッセージ指向ミドルウェア向けに設計されたアプリケーション層プロトコルのオープン標準です。メッセージ ミドルウェアは主にコンポーネント間の分離に使用され、メッセージの送信者はメッセージ コンシューマの存在を知る必要はなく、その逆も同様です。 AMQP の主な機能は、メッセージ指向、キュー指向、ルーティング (ポイントツーポイントおよびパブリッシュ/サブスクライブを含む)、信頼性、およびセキュリティです。 RabbitMQ は、オープン ソースの AMQP 実装です。サーバーは Erlang 言語で書かれており、Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP などのさまざまなクライアントをサポートしています。 AJAXをサポートしています。分散システムでメッセージを保存および転送するために使用され、使いやすさ、拡張性、高可用性の点で優れたパフォーマンスを発揮します。
RabbitMQ はもともと金融システムに由来し、分散システムでメッセージを保存および転送するために使用されており、使いやすさ、拡張性、高可用性の点で優れたパフォーマンスを発揮します。具体的な機能は次のとおりです:
- 可靠性(Reliability) RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。 - 灵活的路由(Flexible Routing) 在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。 - 消息集群(Clustering) 多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。 - 高可用(Highly Available Queues) 队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。 - 多种协议(Multi-protocol) RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。 - 多语言客户端(Many Clients) RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。 - 管理界面(Management UI) RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。 - 跟踪机制(Tracing) 如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。 - 插件机制(Plugin System) RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
- Message 消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。 - Publisher 消息的生产者,也是一个向交换器发布消息的客户端应用程序。 - Exchange 交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。 - Routing Key 路由关键字,exchange根据这个关键字进行消息投递。 - Binding 绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。 - Queue 消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。 - Connection 网络连接,比如一个TCP连接。 - Channel 信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。 - Consumer 消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。 - Virtual Host 虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。 - Broker 表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
Exchange には、メッセージを配布する際のさまざまな種類に基づいたさまざまな配布戦略があります。タイプ: ダイレクト、ファンアウト、トピック、ヘッダーの 4 種類。 headers はルーティング キーの代わりに AMQP メッセージのヘッダーと一致します。さらに、headers スイッチはダイレクト スイッチとまったく同じですが、パフォーマンスははるかに劣ります。現在はほとんど使用されていないため、他の 3 つのタイプを直接見てください。 :
メッセージ内のルーティング キーが Binding のバインディング キーと一致する場合、エクスチェンジャーはメッセージを対応するキューに送信します。ルーティング キーはキュー名と正確に一致します。キューがスイッチにバインドされており、ルーティング キーが「dog」である必要がある場合、ルーティング キー「dog」でマークされたメッセージのみが転送されます。「dog.puppy」は転送されません。 「dog.puppy」や「dog.puppy」も転送されません。「dog.guard」など。完全一致のユニキャスト モードです。
ファンアウト タイプ エクスチェンジャーに送信されたすべてのメッセージは、すべてのバインドされたキューに分散されます。ファンアウト交換機はルーティング キーを処理せず、単にキューを交換機にバインドするだけであり、交換機に送信された各メッセージは、交換機にバインドされているすべてのキューに転送されます。サブネット ブロードキャストと同様に、サブネット内の各ホストはメッセージのコピーを取得します。ファンアウト タイプは、メッセージを最も速く転送します。
topic 交換機は、パターン マッチングを通じてメッセージのルーティング キー属性を割り当て、ルーティング キーを特定のパターンと照合します。このとき、キューには次のものが必要です。モデル上の にバインドされます。ルーティング キーとバインド キーの文字列をドットで区切られた単語に分割します。また、記号「#」と記号「"」という 2 つのワイルドカード文字も認識します。 #0 個以上の単語に一致します。ただし、1 つの単語に限ります。
ConnectionFactory、Connection、および Channel は、RabbitMQ が提供する API の最も基本的なオブジェクトです。
RabbbitMQ の分散メカニズムは拡張に非常に適しており、特別に設計されています。同時実行プログラムの場合 はい、負荷が増加した場合は、タスク処理用のコンシューマをさらに作成するだけで済みます。
実際のアプリケーションでは、コンシューマはシステム内のキュー メッセージを受信する場合があります。 , ただし、処理が完了する前にシステムがクラッシュ (またはその他の事故が発生) した場合、メッセージが失われる可能性があります。この状況を回避するために、メッセージを消費した後にコンシューマが RabbitMQ にレシートを送信するように要求できます。RabbitMQ は、メッセージ確認応答 (メッセージ確認応答) を受信した後でのみメッセージをキューから削除します。RabbitMQ がレシートを受信しない場合、およびコンシューマの RabbitMQ 接続の切断が検出された場合、RabbitMQ は処理のために他のコンシューマ (複数のコンシューマが存在する場合) にメッセージを送信します。ここにはタイムアウトの概念がありません。コンシューマがメッセージを処理する時間がどれだけ長くても、RabbitMQ 接続が切断されない限り、メッセージは他のコンシューマに送信されません。ここで別の問題が発生します。開発者がビジネス ロジックの処理後に RabbitMQ にレシートを送信するのを忘れた場合、深刻なバグが発生します。キューにますます多くのメッセージが蓄積され、コンシューマーが再起動した後、これらのメッセージが繰り返し消費され、繰り返し実行されます。ビジネス ロジック...
さらに、pub メッセージには ack がありません。
如果我们希望即使在RabbitMQ服务重启的情况下,也不会丢失消息,我们可以将Queue与Message都设置为可持久化的(durable),这样可以保证绝大部分情况下我们的RabbitMQ消息不会丢失。但依然解决不了小概率丢失事件的发生(比如RabbitMQ服务器已经接收到生产者的消息,但还没来得及持久化该消息时RabbitMQ服务器就断电了),如果我们需要对这种小概率事件也要管理起来,那么我们要用到事务。由于这里仅为RabbitMQ的简单介绍,所以这里将不讲解RabbitMQ相关的事务。
要持久化队列queue的持久化需要在声明时指定durable=True;
这里要注意,队列的名字一定要是Broker中不存在的,不然不能改变此队列的任何属性.
队列和交换机有一个创建时候指定的标志durable,durable的唯一含义就是具有这个标志的队列和交换机会在重启之后重新建立,它不表示说在队列中的消息会在重启后恢复
消息持久化包括3部分
- 1.exchange持久化,在声明时指定durable => true hannel.ExchangeDeclare(ExchangeName,"direct",durable:true,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 2.queue持久化,在声明时指定durable => true channel.QueueDeclare(QueueName,durable:true,exclusive:false,autoDelete:false,arguments:null);//声明消息队列,且为可持久化的 - 3.消息持久化,在投递时指定delivery_mode => 2(1是非持久化). channel.basic_publish(exchange='', routing_key="task_queue", body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))
如果exchange和queue都是持久化的,那么它们之间的binding也是持久化的,如果exchange和queue两者之间有一个持久化,一个非持久化,则不允许建立绑定.
注意:一旦创建了队列和交换机,就不能修改其标志了,例如,创建了一个non-durable的队列,然后想把它改变成durable的,唯一的办法就是删除这个队列然后重现创建。
关于持久化的进一步讨论: 为了数据不丢失,我们采用了: 在数据处理结束后发送ack,这样RabbitMQ Server会认为Message Deliver 成功。 持久化queue,可以防止RabbitMQ Server 重启或者crash引起的数据丢失。 持久化Message,理由同上。 但是这样能保证数据100%不丢失吗?答案是否定的。问题就在与RabbitMQ需要时间去把这些信息存到磁盘上,这个time window虽然短,但是它的确还是有。在这个时间窗口内如果数据没有保存,数据还会丢失。还有另一个原因就是RabbitMQ并不是为每个Message都做fsync:它可能仅仅是把它保存到Cache里,还没来得及保存到物理磁盘上。因此这个持久化还是有问题。但是对于大多数应用来说,这已经足够了。当然为了保持一致性,你可以把每次的publish放到一个transaction中。这个transaction的实现需要user defined codes。那么商业系统会做什么呢?一种可能的方案是在系统panic时或者异常重启时或者断电时,应该给各个应用留出时间去flash cache,保证每个应用都能exit gracefully。
你可能也注意到了,分发机制不是那么优雅,默认状态下,RabbitMQ将第n个Message分发给第n个Consumer。n是取余后的,它不管Consumer是否还有unacked Message,只是按照这个默认的机制进行分发.
那么如果有个Consumer工作比较重,那么就会导致有的Consumer基本没事可做,有的Consumer却毫无休息的机会,那么,Rabbit是如何处理这种问题呢?
前面我们讲到如果有多个消费者同时订阅同一个Queue中的消息,Queue中的消息会被平摊给多个消费者。这时如果每个消息的处理时间不同,就有可能会导致某些消费者一直在忙,而另外一些消费者很快就处理完手头工作并一直空闲的情况。我们可以通过设置prefetchCount来限制Queue每次发送给每个消费者的消息数,比如我们设置prefetchCount=1,则Queue每次给每个消费者发送一条消息;消费者处理完这条消息后Queue会再给该消费者发送一条消息。
通过basic.qos方法设置prefetch_count=1,这样RabbitMQ就会使得每个Consumer在同一个时间点最多处理一个Message,换句话说,在接收到该Consumer的ack前,它不会将新的Message分发给它
1
channel.basic_qos(prefetch_count=1)
注意,这种方法可能会导致queue满。当然,这种情况下你可能需要添加更多的Consumer,或者创建更多的virtualHost来细化你的设计。
RabbitMQ使用ProtoBuf序列化消息,它可作为RabbitMQ的Message的数据格式进行传输,由于是结构化的数据,这样就极大的方便了Consumer的数据高效处理,当然也可以使用XML,与XML相比, ProtoBuf有以下优势:
1.简单
2.size小了3-10倍
3.速度快了20-100倍
4.易于编程
6.减少了语义的歧义.
,ProtoBuf具有速度和空间的优势,使得它现在应用非常广泛
MQ本身是基于异步的消息处理,前面的示例中所有的生产者(P)将消息发送到RabbitMQ后不会知道消费者(C)处理成功或者失败(甚至连有没有消费者来处理这条消息都不知道)。 但实际的应用场景中,我们很可能需要一些同步处理,需要同步等待服务端将我的消息处理完成后再进行下一步处理。这相当于RPC(Remote Procedure Call,远程过程调用)。在RabbitMQ中也支持RPC。
RabbitMQ 中实现RPC 的机制是:
客户端发送请求(消息)时,在消息的属性(MessageProperties ,在AMQP 协议中定义了14中properties ,这些属性会随着消息一起发送)中设置两个值replyTo (一个Queue 名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue 中)和correlationId (此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
服务器端收到消息并处理
服务器端处理完消息后,将生成一条应答消息到replyTo 指定的Queue ,同时带上correlationId 属性
客户端之前已订阅replyTo 指定的Queue ,从中收到服务器的应答消息后,根据其中的correlationId 属性分析哪条请求被执行了,根据执行结果进行后续业务处理
インターネット上の現在の情報によると、RabbitMQ、activeM、ZeroMQ の中で、全体として RabbitMQ は最初に選んだ。
ZeroMq はサポートしていませんが、ActiveMq と RabbitMq は両方ともサポートしています。永続メッセージとは主に、不可抗力でマシンが停止した場合でもメッセージが失われないメカニズムを指します。
信頼性、柔軟なルーティング、クラスタリング、トランザクション、高可用性キュー、メッセージ分類、問題追跡、ビジュアル管理ツール、プラグイン システムなどを実現するための包括的なテクノロジー。
RabbitMq / Kafka が最高、ActiveMq が 2 番目、ZeroMq が最悪です。もちろんZeroMqでも実現できますが、実装するには手動でコードを書く必要があり、コード量も少なくありません。特に信頼性においては、耐久性、配信確認、発行元確認、高可用性が挙げられます。
RabbitMQ の実装言語は本質的に高い同時実行性と高可用性を備えた Erlang 言語であるため、RabbitMQ が最高であることは間違いありません。
RabbitMq は Kafka よりも成熟しており、可用性、安定性、信頼性の点で、(理論的には) RabbitMq の方が Kafka よりも優れています。
さらに、Kafka の位置付けは主にログなどです。Kafka の設計の本来の目的はログを処理することであるため、ログ (メッセージ) システムの重要なコンポーネントと見なすことができ、対象が非常に絞られているため、ビジネスをしている場合は、RabbitMq を選択することをお勧めします。
また、Kafka のパフォーマンス (スループット、TPS) は RabbitMq よりもはるかに優れています。
選択の最終要約:
システムに Kafka または RabbitMq のオプションがすでにあり、それが現在のビジネスに完全に対応できる場合は、車輪の追加と再発明を繰り返し行う必要がないことをお勧めします。
Kafka と RabbitMq の間でチームやビジネスに合ったものを選択できます。これが最も重要なことです。しかし、あらゆることを考慮すると、現段階では第 3 の選択肢がないことは疑いの余地がありません。
Laravel 関連の技術記事の詳細については、Laravel チュートリアル 列にアクセスして学習してください。
以上がRabbitMQ のアプリケーション シナリオと基本原理の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。