Maison développement back-end tutoriel php La plateforme .net RabbitMQ utilise l'encapsulation

La plateforme .net RabbitMQ utilise l'encapsulation

Mar 07, 2017 pm 02:40 PM

Préface

Tout le monde connaît RabbitMq. Cet article partage principalement l'encapsulation de RabbitMQ.Client après avoir appris RabbitMq. A la fin de l'article, je présenterai les composants encapsulés et la démo.

Fonctionnement de RabbitMQ

Comme le montre la figure ci-dessous, l'éditeur (Publisher) envoie d'abord le message à l'échange (Exchange), puis l'envoie de l'échange à la file d'attente spécifiée (File d'attente). a été déclaré auparavant, et la consommation finale Le client (Client) s'abonne à via ou récupère activement les messages de file d'attente spécifiés pour la consommation.

Ensuite, l'abonnement et la récupération active que nous venons de mentionner peuvent être compris comme push (passif) et pull (actif).

Push, tant qu'un message est ajouté à la file d'attente, les consommateurs inactifs seront informés de consommer. (Si je ne te cherche pas, j'attendrai juste que tu me cherches, mode observateur)

Tirez, le consommateur ne sera pas averti, mais le consommateur prendra l'initiative de prendre le message de la file d'attente de manière circulaire ou régulière. (Je ne vais vers toi que quand j'en ai besoin)

Permettez-moi de vous donner un exemple de scénario d'utilisation. Supposons qu'il existe deux systèmes : un système de commande et un système d'expédition. Les instructions de message d'expédition sont lancées à partir du système de commande. Afin d'expédier les marchandises dans les délais, le système d'expédition doit le faire. abonnez-vous à la file d'attente et traitez-les tant qu'il y a des instructions.

Cependant, le programme rencontre parfois des exceptions, telles qu'un délai d'attente du réseau ou de la base de données, et le message est jeté dans la file d'attente des échecs. Dans ce cas, un mécanisme de renvoi est nécessaire. Mais je ne veux pas faire while(IsPostSuccess == True), car tant qu'une exception se produit, il y aura des exceptions dans un certain laps de temps, et une telle nouvelle tentative n'a aucun sens.

À l'heure actuelle, il n'est pas nécessaire de traiter le message à temps. Il existe un JOB pour récupérer le message de file d'attente ayant échoué régulièrement ou toutes les quelques minutes (nombre d'échecs * minutes d'intervalle) et le renvoyer.

Publier le package

Étapes : Initialiser le lien->Déclarer l'échangeur->Déclarer la file d'attente->Modifier la liaison de la machine et de la file d'attente->Publier le message. Notez que j'ai enregistré le modèle dans ConcurrentDictionary car la déclaration et la liaison prennent beaucoup de temps. Deuxièmement, l'envoi de messages à des files d'attente répétées ne nécessite pas de réinitialisation.

 1         /// <summary> 2         /// 交换器声明 3         /// </summary> 4         /// <param name="iModel"></param> 5         /// <param name="exchange">交换器</param> 6         /// <param name="type">交换器类型: 7         /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全 8         /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的 9         /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog10         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都11         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout12         /// 交换机转发消息是最快的。13         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多14         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”15         /// 只会匹配到“audit.irs”。</param>16         /// <param name="durable">持久化</param>17         /// <param name="autoDelete">自动删除</param>18         /// <param name="arguments">参数</param>19         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,20             bool durable = true,21             bool autoDelete = false, IDictionary<string, object> arguments = null)22         {23             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();24             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);25         }26 27         /// <summary>28         /// 队列声明29         /// </summary>30         /// <param name="channel"></param>31         /// <param name="queue">队列</param>32         /// <param name="durable">持久化</param>33         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,34         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可35         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连36         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者37         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>38         /// <param name="autoDelete">自动删除</param>39         /// <param name="arguments">参数</param>40         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,41             bool autoDelete = false, IDictionary<string, object> arguments = null)42         {43             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();44             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);45         }46 47         /// <summary>48         /// 获取Model49         /// </summary>50         /// <param name="exchange">交换机名称</param>51         /// <param name="queue">队列名称</param>52         /// <param name="routingKey"></param>53         /// <param name="isProperties">是否持久化</param>54         /// <returns></returns>55         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)56         {57             return ModelDic.GetOrAdd(queue, key =>58             {59                 var model = _conn.CreateModel();60                 ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);61                 QueueDeclare(model, queue, isProperties);62                 model.QueueBind(queue, exchange, routingKey);63                 ModelDic[queue] = model;64                 return model;65             });66         }67 68         /// <summary>69         /// 发布消息70         /// </summary>71         /// <param name="routingKey">路由键</param>72         /// <param name="body">队列信息</param>73         /// <param name="exchange">交换机名称</param>74         /// <param name="queue">队列名</param>75         /// <param name="isProperties">是否持久化</param>76         /// <returns></returns>77         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)78         {79             var channel = GetModel(exchange, queue, routingKey, isProperties);80 81             try82             {83                 channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());84             }85             catch (Exception ex)86             {87                 throw ex.GetInnestException();88             }89         }
Copier après la connexion

Afficher le code

La prochaine fois, une capture d'écran de la vitesse de publication du test natif :

4,2 W/S est une vitesse stable et la désérialisation (ToJson) sera légèrement plus rapide.

Forfait d'abonnement

Lors de la publication, l'échangeur et la file d'attente sont déclarés et liés, mais lors de la souscription, il vous suffit de déclarer la file d'attente. Comme vous pouvez le voir dans le code ci-dessous, lorsqu'une exception est interceptée, le message sera envoyé à la "file d'attente des lettres mortes" personnalisée et renvoyé régulièrement par un autre JOB. Par conséquent, la réponse finale est réussie.

        /// <summary>
        /// 获取Model        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {            return ModelDic.GetOrAdd(queue, value =>
             {                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;                 return model;
             });
        }    

        /// <summary>
        /// 接收消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                try
                {
                    handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }
Copier après la connexion

Afficher le code

La prochaine fois, une capture d'écran de la vitesse de publication du test natif :

Lorsqu’il est rapide, il est de 1,9 K/S, et lorsqu’il est lent, il est de 1,7 K/S.

Tirer le paquet

Accédez directement au code :

        /// <summary>
        /// 获取消息        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {            var channel = GetModel(exchange, queue, routingKey);            var result = channel.BasicGet(queue, false);            if (result.IsNull())                return;            var msg = result.Body.DeserializeUtf8().FromJson<T>();            try
            {
                handler(msg);
            }            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }
Copier après la connexion

Afficher le code

Lorsqu’il est rapide, il est de 1,8 K/s, et lorsqu’il est stable, il est de 1,5 K/S.

Rpc(远程调用)的封装

首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

        /// <summary>
        /// RPC客户端        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {            var channel = GetModel(exchange, queue, routingKey, isProperties);            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);            try
            {                var correlationId = Guid.NewGuid().ToString();                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());                var sw = Stopwatch.StartNew();                while (true)
                {                    var ea = consumer.Queue.Dequeue();                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {                        return ea.Body.DeserializeUtf8();
                    }                    if (sw.ElapsedMilliseconds > 30000)                        throw new Exception("等待响应超时");
                }
            }            catch (Exception ex)
            {                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {            //队列声明
            var channel = GetModel(queue, isProperties);            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {                var body = ea.Body;                var msgStr = body.DeserializeUtf8();                var msg = msgStr.FromJson<T>();                var props = ea.BasicProperties;                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;                try
                {
                    msg = handler(msg);
                }                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }
Copier après la connexion

View Code

   可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

 结尾

本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 http://www.php.cn/ 。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

如果本篇文章您有用,请点击一下推荐,谢谢大家阅读。

Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn

Outils d'IA chauds

Undresser.AI Undress

Undresser.AI Undress

Application basée sur l'IA pour créer des photos de nu réalistes

AI Clothes Remover

AI Clothes Remover

Outil d'IA en ligne pour supprimer les vêtements des photos.

Undress AI Tool

Undress AI Tool

Images de déshabillage gratuites

Clothoff.io

Clothoff.io

Dissolvant de vêtements AI

AI Hentai Generator

AI Hentai Generator

Générez AI Hentai gratuitement.

Article chaud

R.E.P.O. Crystals d'énergie expliqués et ce qu'ils font (cristal jaune)
1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Meilleurs paramètres graphiques
1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Comment réparer l'audio si vous n'entendez personne
1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Commandes de chat et comment les utiliser
1 Il y a quelques mois By 尊渡假赌尊渡假赌尊渡假赌

Outils chauds

Bloc-notes++7.3.1

Bloc-notes++7.3.1

Éditeur de code facile à utiliser et gratuit

SublimeText3 version chinoise

SublimeText3 version chinoise

Version chinoise, très simple à utiliser

Envoyer Studio 13.0.1

Envoyer Studio 13.0.1

Puissant environnement de développement intégré PHP

Dreamweaver CS6

Dreamweaver CS6

Outils de développement Web visuel

SublimeText3 version Mac

SublimeText3 version Mac

Logiciel d'édition de code au niveau de Dieu (SublimeText3)

Comment créer une application de messagerie fiable avec React et RabbitMQ Comment créer une application de messagerie fiable avec React et RabbitMQ Sep 28, 2023 pm 08:24 PM

Comment créer une application de messagerie fiable avec React et RabbitMQ Introduction : Les applications modernes doivent prendre en charge une messagerie fiable pour obtenir des fonctionnalités telles que les mises à jour en temps réel et la synchronisation des données. React est une bibliothèque JavaScript populaire pour créer des interfaces utilisateur, tandis que RabbitMQ est un middleware de messagerie fiable. Cet article explique comment combiner React et RabbitMQ pour créer une application de messagerie fiable et fournit des exemples de code spécifiques. Présentation de RabbitMQ :

TrendForce : les produits de la plate-forme Blackwell de Nvidia permettent d'augmenter la capacité de production CoWoS de TSMC de 150 % cette année TrendForce : les produits de la plate-forme Blackwell de Nvidia permettent d'augmenter la capacité de production CoWoS de TSMC de 150 % cette année Apr 17, 2024 pm 08:00 PM

Selon les informations de ce site du 17 avril, TrendForce a récemment publié un rapport estimant que la demande pour les nouveaux produits de la plate-forme Blackwell de Nvidia est haussière et devrait entraîner une augmentation de la capacité totale de production d'emballages CoWoS de TSMC de plus de 150 % en 2024. Les nouveaux produits de la plate-forme Blackwell de NVIDIA comprennent des GPU de série B et des cartes accélératrices GB200 intégrant le propre processeur GraceArm de NVIDIA. TrendForce confirme que la chaîne d'approvisionnement est actuellement très optimiste quant au GB200. On estime que les livraisons en 2025 devraient dépasser le million d'unités, représentant 40 à 50 % des GPU haut de gamme de Nvidia. Nvidia prévoit de livrer des produits tels que le GB200 et le B100 au second semestre, mais le conditionnement des plaquettes en amont doit encore adopter des produits plus complexes.

Quelles sont les perspectives d'emploi du C# ? Quelles sont les perspectives d'emploi du C# ? Oct 19, 2023 am 11:02 AM

Que vous soyez débutant ou professionnel expérimenté, la maîtrise du C# ouvrira la voie à votre carrière.

Solution de synchronisation des données en temps réel entre Golang et RabbitMQ Solution de synchronisation des données en temps réel entre Golang et RabbitMQ Sep 27, 2023 pm 10:41 PM

Introduction à la solution de synchronisation des données en temps réel entre Golang et RabbitMQ : À l'ère actuelle, avec la popularité d'Internet et la croissance explosive du volume de données, la synchronisation des données en temps réel est devenue de plus en plus importante. Afin de résoudre les problèmes de transmission asynchrone et de synchronisation des données, de nombreuses entreprises ont commencé à utiliser des files d'attente de messages pour réaliser une synchronisation des données en temps réel. Cet article présentera une solution de synchronisation de données en temps réel basée sur Golang et RabbitMQ et fournira des exemples de code spécifiques. 1. Qu'est-ce que RabbitMQ ? Rabbin

Partagez plusieurs frameworks de projets open source .NET liés à l'IA et au LLM Partagez plusieurs frameworks de projets open source .NET liés à l'IA et au LLM May 06, 2024 pm 04:43 PM

Le développement des technologies d’intelligence artificielle (IA) bat son plein aujourd’hui et elles ont montré un grand potentiel et une grande influence dans divers domaines. Aujourd'hui, Dayao partagera avec vous 4 cadres de projets liés au modèle d'IA open source .NET LLM, dans l'espoir de vous fournir une référence. https://github.com/YSGStudyHards/DotNetGuide/blob/main/docs/DotNet/DotNetProjectPicks.mdSemanticKernelSemanticKernel est un kit de développement logiciel (SDK) open source conçu pour intégrer de grands modèles de langage (LLM) tels qu'OpenAI, Azure

Golang RabbitMQ : Conception architecturale et mise en œuvre d'un système de file d'attente de messages hautement disponible Golang RabbitMQ : Conception architecturale et mise en œuvre d'un système de file d'attente de messages hautement disponible Sep 28, 2023 am 08:18 AM

GolangRabbitMQ : La conception architecturale et la mise en œuvre d'un système de file d'attente de messages hautement disponible nécessitent des exemples de code spécifiques Introduction : Avec le développement continu de la technologie Internet et sa large application, les files d'attente de messages sont devenues un élément indispensable des systèmes logiciels modernes. En tant qu'outil permettant de mettre en œuvre le découplage, la communication asynchrone, le traitement tolérant aux pannes et d'autres fonctions, la file d'attente de messages offre une haute disponibilité et une prise en charge de l'évolutivité pour les systèmes distribués. En tant que langage de programmation efficace et concis, Golang est largement utilisé pour créer des systèmes à haute concurrence et hautes performances.

Taille du boîtier AMD « Strix Halo » FP11 exposée : équivalent à Intel LGA1700, 60 % plus grand que Phoenix Taille du boîtier AMD « Strix Halo » FP11 exposée : équivalent à Intel LGA1700, 60 % plus grand que Phoenix Jul 18, 2024 am 02:04 AM

Ce site Web a rapporté le 9 juillet que les processeurs de la série « Strix » à architecture AMD Zen5 auront deux solutions de packaging. Le plus petit StrixPoint utilisera le package FP8, tandis que le StrixHalo utilisera le package FP11. Source : source videocardz @Olrak29_ La dernière révélation est que la taille du boîtier FP11 de StrixHalo est de 37,5 mm x 45 mm (1 687 millimètres carrés), ce qui est la même que la taille du boîtier LGA-1700 des processeurs Intel AlderLake et RaptorLake. Le dernier APU Phoenix d'AMD utilise une solution de packaging FP8 d'une taille de 25*40 mm, ce qui signifie que le F de StrixHalo

Technologie d'emballage et application en PHP Technologie d'emballage et application en PHP Oct 12, 2023 pm 01:43 PM

La technologie d'encapsulation et l'encapsulation d'applications en PHP sont un concept important dans la programmation orientée objet. Elle fait référence à l'encapsulation de données et d'opérations sur les données afin de fournir une interface d'accès unifiée aux programmes externes. En PHP, l'encapsulation peut être réalisée via des modificateurs de contrôle d'accès et des définitions de classe. Cet article présentera la technologie d'encapsulation dans PHP et ses scénarios d'application, et fournira quelques exemples de code spécifiques. 1. Modificateurs de contrôle d'accès encapsulés En PHP, l'encapsulation est principalement réalisée via des modificateurs de contrôle d'accès. PHP fournit trois modificateurs de contrôle d'accès,

See all articles