Die .net-Plattform Rabbitmq verwendet Kapselung
Vorwort
Jeder ist mit RabbitMq vertraut. In diesem Artikel wird hauptsächlich die Kapselung von RabbitMQ.Client nach dem Erlernen von RabbitMq beschrieben. Am Ende des Artikels werde ich die gekapselten Komponenten und die Demo vorstellen.
Betrieb von Rabbitmq
Wie aus der folgenden Abbildung ersichtlich ist, sendet der Herausgeber (Herausgeber) die Nachricht zunächst an die Börse (Exchange) und sendet sie dann von der Börse an die angegebene Warteschlange (Warteschlange). Es besteht eine Bindungsbeziehung zwischen der Börse und der Warteschlange wurde zuvor deklariert, und der endgültige Verbrauch Der Kunde (Kunde) abonniert über oder ruft aktiv bestimmte Warteschlangennachrichten für den Verbrauch ab.
Dann kann das gerade erwähnte Abonnement und der aktive Abruf als Push (passiv) und Pull (aktiv) verstanden werden.
Push: Solange eine Nachricht zur Warteschlange hinzugefügt wird, werden inaktive Verbraucher zum Konsumieren benachrichtigt. (Wenn ich nicht nach dir suche, warte ich einfach darauf, dass du nach mir suchst, Beobachtermodus)
Pull: Der Verbraucher wird nicht benachrichtigt, aber der Verbraucher ergreift die Initiative, um die Warteschlangennachrichten im Round-Robin-Verfahren oder in regelmäßigen Abständen abzurufen. (Ich gehe nur zu dir, wenn ich es brauche)Lassen Sie mich ein Beispiel für ein Nutzungsszenario geben: Ein Bestellsystem und ein Versandsystem. Anweisungen zum Versand von Nachrichten werden vom Bestellsystem initiiert Abonnieren Sie die Warteschlange und verarbeiten Sie sie, solange Anweisungen vorliegen.
Allerdings stößt das Programm gelegentlich auf Ausnahmen, wie z. B. eine Netzwerk- oder DB-Zeitüberschreitung, und die Nachricht wird in die Fehlerwarteschlange geworfen. In diesem Fall ist ein Mechanismus zum erneuten Senden erforderlich. Aber ich möchte while(IsPostSuccess == True) nicht machen, denn solange eine Ausnahme auftritt, wird es innerhalb eines bestimmten Zeitraums Ausnahmen geben und ein solcher Wiederholungsversuch ist bedeutungslos.
Zu diesem Zeitpunkt besteht keine Notwendigkeit, die Nachricht rechtzeitig zu verarbeiten. Es gibt einen JOB, um die fehlgeschlagene Warteschlangennachricht regelmäßig oder alle paar Minuten (Anzahl der Fehler * Intervallminuten) abzurufen und erneut zu senden.
Paket veröffentlichen
Schritte: Link initialisieren – Austauscher deklarieren – Warteschlange deklarieren – Maschine und Warteschlangenbindung ändern – Nachricht veröffentlichen. Beachten Sie, dass ich das Modell in ConcurrentDictionary gespeichert habe, da Deklaration und Bindung sehr zeitaufwändig sind. Zweitens erfordert das Senden von Nachrichten an wiederholte Warteschlangen keine Neuinitialisierung.
1 /// <summary> 2 /// 交换器声明 3 /// </summary> 4 /// <param name="iModel"></param> 5 /// <param name="exchange">交换器</param> 6 /// <param name="type">交换器类型: 7 /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8 /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9 /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10 /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11 /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12 /// 交换机转发消息是最快的。13 /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14 /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15 /// 只会匹配到“audit.irs”。</param>16 /// <param name="durable">持久化</param>17 /// <param name="autoDelete">自动删除</param>18 /// <param name="arguments">参数</param>19 private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20 bool durable = true,21 bool autoDelete = false, IDictionary<string, object> arguments = null)22 {23 exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24 iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25 }26 27 /// <summary>28 /// 队列声明29 /// </summary>30 /// <param name="channel"></param>31 /// <param name="queue">队列</param>32 /// <param name="durable">持久化</param>33 /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34 /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35 /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36 /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37 /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38 /// <param name="autoDelete">自动删除</param>39 /// <param name="arguments">参数</param>40 private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41 bool autoDelete = false, IDictionary<string, object> arguments = null)42 {43 queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44 channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45 }46 47 /// <summary>48 /// 获取Model49 /// </summary>50 /// <param name="exchange">交换机名称</param>51 /// <param name="queue">队列名称</param>52 /// <param name="routingKey"></param>53 /// <param name="isProperties">是否持久化</param>54 /// <returns></returns>55 private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56 {57 return ModelDic.GetOrAdd(queue, key =>58 {59 var model = _conn.CreateModel();60 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61 QueueDeclare(model, queue, isProperties);62 model.QueueBind(queue, exchange, routingKey);63 ModelDic[queue] = model;64 return model;65 });66 }67 68 /// <summary>69 /// 发布消息70 /// </summary>71 /// <param name="routingKey">路由键</param>72 /// <param name="body">队列信息</param>73 /// <param name="exchange">交换机名称</param>74 /// <param name="queue">队列名</param>75 /// <param name="isProperties">是否持久化</param>76 /// <returns></returns>77 public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78 {79 var channel = GetModel(exchange, queue, routingKey, isProperties);80 81 try82 {83 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84 }85 catch (Exception ex)86 {87 throw ex.GetInnestException();88 }89 }
Code anzeigen
Das nächste Mal ist ein Screenshot der Veröffentlichungsgeschwindigkeit des nativen Tests:
Abonnementpaket
Beim Veröffentlichen werden der Austauscher und die Warteschlange deklariert und gebunden. Beim Abonnieren müssen Sie jedoch nur die Warteschlange deklarieren. Wie Sie dem folgenden Code entnehmen können, wird die Nachricht beim Abfangen einer Ausnahme an die benutzerdefinierte „Warteschlange für unzustellbare Nachrichten“ gesendet und regelmäßig von einem anderen JOB erneut gesendet. Daher ist die endgültige Antwort erfolgreich.
/// <summary> /// 获取Model /// </summary> /// <param name="queue">队列名称</param> /// <param name="isProperties"></param> /// <returns></returns> private static IModel GetModel(string queue, bool isProperties = false) { return ModelDic.GetOrAdd(queue, value => { var model = _conn.CreateModel(); QueueDeclare(model, queue, isProperties); //每次消费的消息数 model.BasicQos(0, 1, false); ModelDic[queue] = model; return model; }); } /// <summary> /// 接收消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="queue">队列名称</param> /// <param name="isProperties"></param> /// <param name="handler">消费处理</param> /// <param name="isDeadLetter"></param> public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); if (!isDeadLetter) PublishToDead<DeadLetterQueue>(queue, msgStr, ex); } finally { channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
Code anzeigen
Das nächste Mal ist ein Screenshot der Veröffentlichungsgeschwindigkeit des nativen Tests:
Paket ziehen
Gehen Sie direkt zum Code:
/// <summary> /// 获取消息 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="handler">消费处理</param> private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class { var channel = GetModel(exchange, queue, routingKey); var result = channel.BasicGet(queue, false); if (result.IsNull()) return; var msg = result.Body.DeserializeUtf8().FromJson<T>(); try { handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicAck(result.DeliveryTag, false); } }
Code anzeigen
Rpc(远程调用)的封装
首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:
1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常
2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。
/// <summary> /// RPC客户端 /// </summary> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="routingKey"></param> /// <param name="body"></param> /// <param name="isProperties"></param> /// <returns></returns> public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false) { var channel = GetModel(exchange, queue, routingKey, isProperties); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue, true, consumer); try { var correlationId = Guid.NewGuid().ToString(); var basicProperties = channel.CreateBasicProperties(); basicProperties.ReplyTo = queue; basicProperties.CorrelationId = correlationId; channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8()); var sw = Stopwatch.StartNew(); while (true) { var ea = consumer.Queue.Dequeue(); if (ea.BasicProperties.CorrelationId == correlationId) { return ea.Body.DeserializeUtf8(); } if (sw.ElapsedMilliseconds > 30000) throw new Exception("等待响应超时"); } } catch (Exception ex) { throw ex.GetInnestException(); } } /// <summary> /// RPC服务端 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="exchange"></param> /// <param name="queue"></param> /// <param name="isProperties"></param> /// <param name="handler"></param> /// <param name="isDeadLetter"></param> public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter) { //队列声明 var channel = GetModel(queue, isProperties); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var msgStr = body.DeserializeUtf8(); var msg = msgStr.FromJson<T>(); var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { msg = handler(msg); } catch (Exception ex) { ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq"); } finally { channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8()); channel.BasicAck(ea.DeliveryTag, false); } }; channel.BasicConsume(queue, false, consumer); }
View Code
可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。
结尾
本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。
如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

Video Face Swap
Tauschen Sie Gesichter in jedem Video mühelos mit unserem völlig kostenlosen KI-Gesichtstausch-Tool aus!

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen





So erstellen Sie eine zuverlässige Messaging-Anwendung mit React und RabbitMQ Einführung: Moderne Anwendungen müssen zuverlässiges Messaging unterstützen, um Funktionen wie Echtzeitaktualisierungen und Datensynchronisierung zu erreichen. React ist eine beliebte JavaScript-Bibliothek zum Erstellen von Benutzeroberflächen, während RabbitMQ eine zuverlässige Messaging-Middleware ist. In diesem Artikel wird erläutert, wie Sie React und RabbitMQ kombinieren, um eine zuverlässige Messaging-Anwendung zu erstellen, und es werden spezifische Codebeispiele bereitgestellt. RabbitMQ-Übersicht:

Laut Nachrichten dieser Website vom 17. April hat TrendForce kürzlich einen Bericht veröffentlicht, in dem es davon ausgeht, dass die Nachfrage nach den neuen Blackwell-Plattformprodukten von Nvidia optimistisch ist und die gesamte CoWoS-Verpackungsproduktionskapazität von TSMC im Jahr 2024 voraussichtlich um mehr als 150 % steigen wird. Zu den neuen Blackwell-Plattformprodukten von NVIDIA gehören GPUs der B-Serie und GB200-Beschleunigerkarten, die NVIDIAs eigene GraceArm-CPU integrieren. TrendForce bestätigt, dass die Lieferkette derzeit sehr optimistisch in Bezug auf GB200 ist. Es wird geschätzt, dass die Auslieferungen im Jahr 2025 eine Million Einheiten überschreiten werden, was 40-50 % der High-End-GPUs von Nvidia ausmacht. Nvidia plant, in der zweiten Jahreshälfte Produkte wie GB200 und B100 auszuliefern, aber vorgelagerte Waferverpackungen müssen noch komplexere Produkte einführen.

Ganz gleich, ob Sie Anfänger oder erfahrener Profi sind: Die Beherrschung von C# ebnet den Weg für Ihre Karriere.

Einführung in die Lösung für die Echtzeit-Datensynchronisierung zwischen Golang und RabbitMQ: In der heutigen Zeit, mit der Popularität des Internets und dem explosionsartigen Wachstum des Datenvolumens, wird die Echtzeit-Datensynchronisierung immer wichtiger. Um die Probleme der asynchronen Datenübertragung und Datensynchronisierung zu lösen, haben viele Unternehmen damit begonnen, Nachrichtenwarteschlangen zu verwenden, um eine Echtzeitsynchronisierung von Daten zu erreichen. In diesem Artikel wird eine Echtzeit-Datensynchronisierungslösung basierend auf Golang und RabbitMQ vorgestellt und spezifische Codebeispiele bereitgestellt. 1. Was ist RabbitMQ? Rabbi

Die Entwicklung von Technologien der künstlichen Intelligenz (KI) ist heute in vollem Gange und sie haben in verschiedenen Bereichen großes Potenzial und Einfluss gezeigt. Heute wird Dayao Ihnen 4 .NET Open-Source-KI-Modell-LLM-bezogene Projekt-Frameworks vorstellen und hofft, Ihnen einige Referenzen zu geben. https://github.com/YSGStudyHards/DotNetGuide/blob/main/docs/DotNet/DotNetProjectPicks.mdSemanticKernelSemanticKernel ist ein Open-Source-Softwareentwicklungskit (SDK), das für die Integration großer Sprachmodelle (LLM) wie OpenAI und Azure entwickelt wurde

Diese Website berichtete am 9. Juli, dass die Prozessoren der AMD Zen5-Architektur „Strix“ über zwei Verpackungslösungen verfügen werden: Der kleinere StrixPoint wird das FP8-Paket verwenden, während der StrixHalo das FP11-Paket verwenden wird. Quelle: videocardz source @Olrak29_ Die neueste Enthüllung ist, dass die FP11-Gehäusegröße von StrixHalo 37,5 mm * 45 mm (1687 Quadratmillimeter) beträgt, was der LGA-1700-Gehäusegröße der AlderLake- und RaptorLake-CPUs von Intel entspricht. AMDs neueste Phoenix APU verwendet eine FP8-Gehäuselösung mit einer Größe von 25*40 mm, was bedeutet, dass StrixHalos F

GolangRabbitMQ: Der architektonische Entwurf und die Implementierung eines hochverfügbaren Nachrichtenwarteschlangensystems erfordern spezifische Codebeispiele. Einführung: Mit der kontinuierlichen Entwicklung der Internettechnologie und ihrer breiten Anwendung sind Nachrichtenwarteschlangen zu einem unverzichtbaren Bestandteil moderner Softwaresysteme geworden. Als Werkzeug zur Erzielung von Entkopplung, asynchroner Kommunikation, fehlertoleranter Verarbeitung und anderen Funktionen bietet die Nachrichtenwarteschlange Unterstützung für hohe Verfügbarkeit und Skalierbarkeit für verteilte Systeme. Als effiziente und prägnante Programmiersprache wird Golang häufig zum Aufbau von Systemen mit hoher Parallelität und hoher Leistung verwendet.

Durch die Kapselung von Code können C++-Funktionen die Effizienz der GUI-Entwicklung verbessern: Code-Kapselung: Funktionen gruppieren Code in unabhängige Einheiten, wodurch der Code leichter zu verstehen und zu warten ist. Wiederverwendbarkeit: Funktionen schaffen gemeinsame Funktionalität, die anwendungsübergreifend wiederverwendet werden kann, wodurch Duplikate und Fehler reduziert werden. Prägnanter Code: Der gekapselte Code macht die Hauptlogik prägnant und einfach zu lesen und zu debuggen.
