RabbitMq大家再熟悉不過,這篇文章主要整對rabbitmq學習後封裝RabbitMQ.Client的一個分享。文章最後,我會把封裝組件和demo奉上。
從下圖可以看出,發布者(Publisher)是把訊息先發送到交換器(Exchange),再從交換器發送到指定隊列(Queue),而先前已經聲明交換器與隊列綁定關係,最後消費者(Customer)透過訂閱或主動取指定佇列訊息進行消費。
那麼剛剛提到的訂閱和主動取可以理解成,推(被動),拉(主動)。
推,只要隊列增加一則訊息,就會通知空閒的消費者進行消費。 (我不找你,就等你找我,觀察者模式)
拉,不會通知消費者,而是由消費者主動輪循或定時去取隊列訊息。 (我需要才去找你)
使用場景我舉個例子,假如有兩套系統 訂單系統和發貨系統,從訂單系統發起發貨消息指令,為了及時發貨,發貨系統需要訂閱隊列,只要有指令就處理。
可是程式偶爾會出異常,例如網路或DB超時了,把訊息丟到失敗佇列,這個時候需要重發機制。但是我又不想while(IsPostSuccess == True),因為只要出異常了,會在某個時間段內都會有異常,這樣的重試是沒意義的。
這時候不需要及時的去處理訊息,有個JOB計時或每隔幾分鐘(失敗次數*間隔分鐘)去取失敗佇列訊息,進行重發。
步驟:初始化連結->聲明交換器->聲明佇列->換機器與佇列綁定->發布訊息。注意的是,我將Model存到了ConcurrentDictionary裡面,因為聲明與綁定是非常耗時的,其次,往重複的佇列發送訊息是不需要重新初始化的。
1 /// <summary> 2 /// 交换器声明 3 /// </summary> 4 /// <param name="iModel"></param> 5 /// <param name="exchange">交换器</param> 6 /// <param name="type">交换器类型: 7 /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10 /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12 /// 交换机转发消息是最快的。13 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15 /// 只会匹配到“audit.irs”。</param>16 /// <param name="durable">持久化</param>17 /// <param name="autoDelete">自动删除</param>18 /// <param name="arguments">参数</param>19 private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20 bool durable = true,21 bool autoDelete = false, IDictionary<string, object> arguments = null)22 {23 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25 }26 27 /// <summary>28 /// 队列声明29 /// </summary>30 /// <param name="channel"></param>31 /// <param name="queue">队列</param>32 /// <param name="durable">持久化</param>33 /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34 /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38 /// <param name="autoDelete">自动删除</param>39 /// <param name="arguments">参数</param>40 private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41 bool autoDelete = false, IDictionary<string, object> arguments = null)42 {43 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45 }46 47 /// <summary>48 /// 获取Model49 /// </summary>50 /// <param name="exchange">交换机名称</param>51 /// <param name="queue">队列名称</param>52 /// <param name="routingKey"></param>53 /// <param name="isProperties">是否持久化</param>54 /// <returns></returns>55 private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56 {57 return ModelDic.GetOrAdd(queue, key =>58 {59 var model = _conn.CreateModel();60 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61 QueueDeclare(model, queue, isProperties);62 model.QueueBind(queue, exchange, routingKey);63 ModelDic[queue] = model;64 return model;65 });66 }67 68 /// <summary>69 /// 发布消息70 /// </summary>71 /// <param name="routingKey">路由键</param>72 /// <param name="body">队列信息</param>73 /// <param name="exchange">交换机名称</param>74 /// <param name="queue">队列名</param>75 /// <param name="isProperties">是否持久化</param>76 /// <returns></returns>77 public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78 {79 var channel = GetModel(exchange, queue, routingKey, isProperties);80 81 try82 {83 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84 }85 catch (Exception ex)86 {87 throw ex.GetInnestException();88 }89 }
View Code
下次是本機測試的發布速度截圖:
4.2W/S屬於穩定速度,把反序列化(ToJson)會稍微快一些。
發布的時候是申明了交換器和隊列並綁定,然而訂閱的時候只需要聲明隊列就可。從下面程式碼能看到,捕獲到異常的時候,會把訊息送到自訂的「死信隊列」裡,由另外的JOB進行定時重發,因此,finally是應答成功的。
/// <summary> /// 获取Model /// </summary> /// <param name="queue">队列名称</param> /// <param name="isProperties"></param> /// <returns></returns> private static IModel GetModel(string queue, bool isProperties = false) { return ModelDic.GetOrAdd(queue, value => { var model = _conn.CreateModel(); QueueDeclare(model, queue, isProperties); //每次消费的消息数 model.BasicQos(0, 1, false); ModelDic[queue] = model; return model; }); } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queue">队列名称</param> /// <param name="isProperties"></param> /// <param name="handler">消费处理</param> /// <param name="isDeadLetter"></param> public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); if (!isDeadLetter) PublishToDead<DeadLetterQueue>(queue, msgStr, ex); } finally { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
View Code
下次是本機測試的發布速度截圖:
快的時候有1.9K/S,慢的時候有1.7K/S
直接上代碼:
/// <summary> /// 获取消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="handler">消费处理</param> private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class { var channel = GetModel(exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result.IsNull()) return; var msg = result.Body.DeserializeUtf8().FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicAck(result.DeliveryTag, false); } }
View Code
快的時候有1.8K/s,穩定是1.5K/S
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary> /// RPC客户端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue, true, consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待响应超时"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服务端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param name="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter) { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。