RabbitMq については誰もがよく知っています。この記事では主に RabbitMQ を学習した後の RabbitMQ.Client のカプセル化について説明します。記事の最後では、カプセル化されたコンポーネントとデモを紹介します。
以下の図からわかるように、パブリッシャー (Publisher) はまずメッセージをエクスチェンジ (Exchange) に送信し、次にエクスチェンジから指定されたキュー (Queue) にメッセージを送信します。エクスチェンジとキューの間にはバインド関係があります。事前に宣言され、最終的に消費されます。顧客 (顧客) は、メッセージをサブスクライブするか、アクティブにフェッチすることによって、指定されたキューからメッセージを消費します。
したがって、今述べたサブスクリプションとアクティブな取得は、プッシュ (受動的) とプル (アクティブ) として理解できます。
プッシュでは、メッセージがキューに追加されている限り、アイドル状態のコンシューマーにそれを消費するように通知されます。 (私が探さなければ、あなたが私を探すのをただ待ちます、オブザーバーモード) プルの場合、コンシューマには通知されませんが、コンシューマはラウンドロビンまたは定期的にキュー メッセージをフェッチする主導権を握ります。 (必要なときだけあなたのところに行きます)
使用シナリオの例を示します。注文システムと出荷システムの 2 つのシステムがあるとします。商品をタイムリーに出荷するには、出荷メッセージの指示が注文システムから開始される必要があります。キューをサブスクライブし、指示がある限り処理します。
ただし、プログラムではネットワークや DB タイムアウトなどの例外が発生する場合があり、この場合はメッセージが失敗キューにスローされます。この場合、再送信メカニズムが必要になります。ただし、 while(IsPostSuccess == True) はやりたくありません。例外が発生する限り、一定時間以内に例外が発生し、そのような再試行は無意味だからです。
現時点では、時間内にメッセージを処理する必要はありません。失敗したキュー メッセージを定期的に、または数分ごと (失敗数 * 間隔分) に取得して再送信する JOB があります。
パッケージの公開
リーリー
コードを表示次回は、ネイティブ テストの公開速度のスクリーンショットです。
4.2W/Sは安定した速度で、デシリアライズ(ToJson)も若干速くなります。
サブスクリプションパッケージ
リーリー
コードを表示次回は、ネイティブ テストの公開速度のスクリーンショットです。
速い場合は1.9K/S、遅い場合は1.7K/Sです。
プルパッケージ
リーリー
コードを表示
速いときは1.8K/s、安定しているときは1.5K/sです。
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary> /// RPC客户端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue, true, consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待响应超时"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服务端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param name="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter) { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。