首頁 後端開發 php教程 .net平台的rabbitmq使用封裝

.net平台的rabbitmq使用封裝

Mar 07, 2017 pm 02:40 PM

前言

RabbitMq大家再熟悉不過,這篇文章主要整對rabbitmq學習後封裝RabbitMQ.Client的一個分享。文章最後,我會把封裝組件和demo奉上。

Rabbitmq的運作

從下圖可以看出,發布者(Publisher)是把訊息先發送到交換器(Exchange),再從交換器發送到指定隊列(Queue),而先前已經聲明交換器與隊列綁定關係,最後消費者(Customer)透過訂閱主動取指定佇列訊息進行消費。

那麼剛剛提到的訂閱和主動取可以理解成,推(被動),拉(主動)。

推,只要隊列增加一則訊息,就會通知空閒的消費者進行消費。 (我不找你,就等你找我,觀察者模式)

拉,不會通知消費者,而是由消費者主動輪循或定時去取隊列訊息。 (我需要才去找你)

使用場景我舉個例子,假如有兩套系統 訂單系統和發貨系統,從訂單系統發起發貨消息指令,為了及時發貨,發貨系統需要訂閱隊列,只要有指令就處理。

可是程式偶爾會出異常,例如網路或DB超時了,把訊息丟到失敗佇列,這個時候需要重發機制。但是我又不想while(IsPostSuccess == True),因為只要出異常了,會在某個時間段內都會有異常,這樣的重試是沒意義的。

這時候不需要及時的去處理訊息,有個JOB計時或每隔幾分鐘(失敗次數*間隔分鐘)去取失敗佇列訊息,進行重發。

Publish(發布)的封裝

步驟:初始化連結->聲明交換器->聲明佇列->換機器與佇列綁定->發布訊息。注意的是,我將Model存到了ConcurrentDictionary裡面,因為聲明與綁定是非常耗時的,其次,往重複的佇列發送訊息是不需要重新初始化的。

 1         /// <summary> 2         /// 交换器声明 3         /// </summary> 4         /// <param name="iModel"></param> 5         /// <param name="exchange">交换器</param> 6         /// <param name="type">交换器类型: 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12         /// 交换机转发消息是最快的。13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15         /// 只会匹配到“audit.irs”。</param>16         /// <param name="durable">持久化</param>17         /// <param name="autoDelete">自动删除</param>18         /// <param name="arguments">参数</param>19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20             bool durable = true,21             bool autoDelete = false, IDictionary<string, object> arguments = null)22         {23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25         }26 27         /// <summary>28         /// 队列声明29         /// </summary>30         /// <param name="channel"></param>31         /// <param name="queue">队列</param>32         /// <param name="durable">持久化</param>33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38         /// <param name="autoDelete">自动删除</param>39         /// <param name="arguments">参数</param>40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41             bool autoDelete = false, IDictionary<string, object> arguments = null)42         {43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45         }46 47         /// <summary>48         /// 获取Model49         /// </summary>50         /// <param name="exchange">交换机名称</param>51         /// <param name="queue">队列名称</param>52         /// <param name="routingKey"></param>53         /// <param name="isProperties">是否持久化</param>54         /// <returns></returns>55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56         {57             return ModelDic.GetOrAdd(queue, key =>58             {59                 var model = _conn.CreateModel();60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61                 QueueDeclare(model, queue, isProperties);62                 model.QueueBind(queue, exchange, routingKey);63                 ModelDic[queue] = model;64                 return model;65             });66         }67 68         /// <summary>69         /// 发布消息70         /// </summary>71         /// <param name="routingKey">路由键</param>72         /// <param name="body">队列信息</param>73         /// <param name="exchange">交换机名称</param>74         /// <param name="queue">队列名</param>75         /// <param name="isProperties">是否持久化</param>76         /// <returns></returns>77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78         {79             var channel = GetModel(exchange, queue, routingKey, isProperties);80 81             try82             {83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84             }85             catch (Exception ex)86             {87                 throw ex.GetInnestException();88             }89         }
登入後複製

View Code

下次是本機測試的發布速度截圖:

4.2W/S屬於穩定速度,把反序列化(ToJson)會稍微快一些。

 

Subscribe(訂閱)的封裝

發布的時候是申明了交換器和隊列並綁定,然而訂閱的時候只需要聲明隊列就可。從下面程式碼能看到,捕獲到異常的時候,會把訊息送到自訂的「死信隊列」裡,由另外的JOB進行定時重發,因此,finally是應答成功的。

        /// <summary>
        /// 获取Model        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {            return ModelDic.GetOrAdd(queue, value =>
             {                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;                 return model;
             });
        }    

        /// <summary>
        /// 接收消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                try
                {
                    handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }
登入後複製

View Code

下次是本機測試的發布速度截圖:

快的時候有1.9K/S,慢的時候有1.7K/S

 

Pull(拉)的封裝

直接上代碼:

        /// <summary>
        /// 获取消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {            var channel = GetModel(exchange, queue, routingKey);            var result = channel.BasicGet(queue, false);            if (result.IsNull())                return;            var msg = result.Body.DeserializeUtf8().FromJson<T>();            try
            {
                handler(msg);
            }            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }
登入後複製

View Code

快的時候有1.8K/s,穩定是1.5K/S

 

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }
登入後複製

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

Video Face Swap

Video Face Swap

使用我們完全免費的人工智慧換臉工具,輕鬆在任何影片中換臉!

熱門文章

<🎜>:泡泡膠模擬器無窮大 - 如何獲取和使用皇家鑰匙
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
北端:融合系統,解釋
3 週前 By 尊渡假赌尊渡假赌尊渡假赌
Mandragora:巫婆樹的耳語 - 如何解鎖抓鉤
3 週前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

熱門話題

Java教學
1666
14
CakePHP 教程
1425
52
Laravel 教程
1325
25
PHP教程
1273
29
C# 教程
1252
24
如何利用React和RabbitMQ建立可靠的訊息應用 如何利用React和RabbitMQ建立可靠的訊息應用 Sep 28, 2023 pm 08:24 PM

如何利用React和RabbitMQ建立可靠的訊息傳遞應用程式引言:現代化的應用程式需要支援可靠的訊息傳遞,以實現即時更新和資料同步等功能。 React是一種流行的JavaScript庫,用於建立使用者介面,而RabbitMQ是一種可靠的訊息傳遞中間件。本文將介紹如何結合React和RabbitMQ建立可靠的訊息傳遞應用,並提供具體的程式碼範例。 RabbitMQ概述:

集邦諮詢:英偉達 Blackwell 平台產品帶動台積電今年 CoWoS 產能提升 150% 集邦諮詢:英偉達 Blackwell 平台產品帶動台積電今年 CoWoS 產能提升 150% Apr 17, 2024 pm 08:00 PM

本站4月17日消息,集邦諮詢(TrendForce)近日發布報告,認為英偉達Blackwell新平台產品需求看漲,預估帶動台積電2024年CoWoS封裝總產能提升逾150%。英偉達Blackwell新平台產品包含B系列的GPU,以及整合英偉達自家GraceArmCPU的GB200加速卡等。集邦諮詢確認為供應鏈目前非常看好GB200,預估2025年出貨量預計超過百萬片,在英偉達高階GPU中的佔比達到40-50%。在英偉達計畫下半年交付GB200以及B100等產品,但上游晶圓封裝方面須進一步採用更複

分享幾個.NET開源的AI和LLM相關專案框架 分享幾個.NET開源的AI和LLM相關專案框架 May 06, 2024 pm 04:43 PM

當今人工智慧(AI)技術的發展如火如荼,它們在各個領域都展現了巨大的潛力和影響力。今天大姚給大家分享4個.NET開源的AI模型LLM相關的專案框架,希望能為大家提供一些參考。 https://github.com/YSGStudyHards/DotNetGuide/blob/main/docs/DotNet/DotNetProjectPicks.mdSemanticKernelSemanticKernel是一種開源的軟體開發工具包(SDK),旨在將大型語言模型(LLM)如OpenAI、Azure

C#的就業前景如何 C#的就業前景如何 Oct 19, 2023 am 11:02 AM

無論您是初學者還是有經驗的專業人士,掌握C#將為您的職業發展鋪平道路。

AMD 'Strix Halo” FP11 封裝尺寸曝光:和英特爾 LGA1700 相當,比 Phoenix 大 60% AMD 'Strix Halo” FP11 封裝尺寸曝光:和英特爾 LGA1700 相當,比 Phoenix 大 60% Jul 18, 2024 am 02:04 AM

本站7月9日訊息,AMDZen5架構「Strix」系列處理器會有兩種封裝方案,其中較小的StrixPoint將採用FP8封裝,而StrixHalo將會採用FP11封裝。圖源:videocardz訊息源@Olrak29_最新曝料稱StrixHalo的FP11封裝尺寸為37.5mm*45mm(1687平方毫米),和英特爾AlderLake、RaptorLakeCPU的LGA-1700封裝尺寸相同。 AMD最新的PhoenixAPU採用FP8封裝方案,尺寸為25*40mm,這意味著StrixHalo的F

Golang與RabbitMQ實現即時資料同步的解決方案 Golang與RabbitMQ實現即時資料同步的解決方案 Sep 27, 2023 pm 10:41 PM

Golang與RabbitMQ實現即時數據同步的解決方案引言:當今時代,隨著互聯網的普及和數據量的爆發式增長,即時數據的同步變得越來越重要。為了解決資料非同步傳輸和資料同步的問題,許多公司開始採用訊息佇列的方式來實現資料的即時同步。本文將介紹基於Golang和RabbitMQ的即時資料同步的解決方案,並提供具體的程式碼範例。一、什麼是RabbitMQ? Rabbi

C++ 函式如何透過封裝程式碼來提高 GUI 開發的效率? C++ 函式如何透過封裝程式碼來提高 GUI 開發的效率? Apr 25, 2024 pm 12:27 PM

透過封裝程式碼,C++函數可以提高GUI開發效率:程式碼封裝:函數將程式碼分組到獨立單元,使程式碼易於理解和維護。可重複使用性:函數可建立通用功能供應用程式中重複使用,減少重複編寫和錯誤。簡潔程式碼:封裝程式碼讓主邏輯簡潔,方便閱讀和除錯。

PHP中的封裝技術及應用 PHP中的封裝技術及應用 Oct 12, 2023 pm 01:43 PM

PHP中的封裝技術及應用封裝是物件導向程式設計中的重要概念,它指的是將資料和資料的操作封裝在一起,以便提供對外部程式的統一存取介面。在PHP中,封裝可以透過存取控制修飾符和類別的定義來實現。本文將介紹PHP中的封裝技術及其應用場景,並提供一些具體的程式碼範例。一、封裝的存取控制修飾符在PHP中,封裝主要透過存取控制修飾符來實現。 PHP提供了三個存取控制修飾符,

See all articles