File d'attente de messages
La file d'attente de messages (anglais : File d'attente de messages) est une méthode de communication inter-processus ou de communication entre différents threads du même processus. La file d'attente logicielle est utilisée pour traiter. a Une série d'entrées, généralement provenant de l'utilisateur. La file d'attente des messages fournit un protocole de communication asynchrone. Chaque enregistrement de la file d'attente contient des informations détaillées, notamment l'heure d'apparition, le type de périphérique d'entrée et les paramètres d'entrée spécifiques, c'est-à-dire : l'expéditeur et le destinataire du message n'ont pas besoin. pour interagir avec la file d'attente des messages en même temps. Le message reste dans la file d'attente jusqu'à ce que le destinataire le récupère.
Pour faire simple, la file d'attente stocke les commandes que nous devons traiter mais n'obtient pas les résultats du traitement à temps
Implémentation
En fait, les files d'attente de messages sont souvent stockées dans des structures de listes chaînées. Les processus autorisés peuvent écrire ou lire des messages à partir de la file d'attente des messages. Actuellement, il existe de nombreuses implémentations open source de files d'attente de messages, notamment JBoss Messaging, JORAM, Apache ActiveMQ, Sun Open Message Queue, Apache Qpid et HTTPSQS. Avantages, inconvénientsLa file d'attente des messages elle-même est asynchrone, ce qui permet au destinataire de récupérer le message longtemps après son envoi, ce qui est différent de la plupart des protocoles de communication. Par exemple, le protocole HTTP utilisé sur le WWW est synchrone car le client doit attendre que le serveur réponde après avoir effectué une requête. Cependant, il existe de nombreuses situations dans lesquelles nous avons besoin de protocoles de communication asynchrones. Par exemple, un processus informe un autre processus qu'un événement s'est produit, mais n'a pas besoin d'attendre une réponse. Cependant, la nature asynchrone de la file d'attente de messages crée également un inconvénient, à savoir que le destinataire doit interroger la file d'attente de messages pour recevoir le dernier message. Par rapport aux signaux, les files d'attente de messages peuvent transmettre plus d'informations. Par rapport aux canaux, les files d'attente de messages fournissent des données formatées, ce qui peut réduire la charge de travail des développeurs. Mais les files d’attente de messages ont toujours des limites de taille. Lecture des messages de la file d'attente Il existe principalement deux types (1) Push par le serveur ; (2) Pull par le client Pull : principalement le client interroge régulièrement ; obtenez le traitement des messages ; Push : avertissez activement les abonnés via l'abonnement aux événements pour le traitement Stockage des messagesLe stockage simple est obtenu grâce à une liste de liens en mémoire que vous pouvez également utiliser ; Les bases de données, telles que Redis, peuvent également être conservées dans des fichiers locaux ; Comment assurer la cohérence du traitement asynchroneBien que l'objectif principal de la file d'attente soit de stocker les messages, elle asynchrone également les appels. et mises en œuvre. Mais si vous souhaitez obtenir une cohérence dans le traitement des messages, un bon moyen consiste à distinguer l'ordre du traitement métier, comme l'exploitation de la base de données maître-esclave, le maître est responsable de l'écriture et l'esclave est responsable de la lecture. chance d'obtenir les résultats souhaités en lisant la base de données immédiatement après l'écriture ;En même temps, nous devons utiliser des états intermédiaires. Lorsque plusieurs états intermédiaires rencontrent les résultats de l'appel en même temps, ils seront traités au moment opportun. le "message d'exception" sera conservé jusqu'à la prochaine opération ; Le code ci-dessusÉtablir la file d'attente principale de prise en charge des messages{ public delegate void MessageQueueEventNotifyHandler(Message.BaseMessage message); public class MessageQueue:Queue<BaseMessage> { public static MessageQueue GlobalQueue = new MessageQueue(); private Timer timer = new Timer(); public MessageQueue() { this.timer.Interval = 5000; this.timer.Elapsed += Notify; this.timer.Enabled = true; } private void Notify(object sender, ElapsedEventArgs e) { lock (this) { if (this.Count > 0) { //this.messageNotifyEvent.GetInvocationList()[0].DynamicInvoke(this.Dequeue()); var message = this.Dequeue(); this.messageNotifyEvent(message); } } } private MessageQueueEventNotifyHandler messageNotifyEvent; public event MessageQueueEventNotifyHandler MessageNotifyEvent { add { this.messageNotifyEvent += value; } remove { if (this.messageNotifyEvent != null) { this.messageNotifyEvent -= value; } } } } }
public const string OrderCodePrefix = "P"; public void Submit(Message.BaseMessage message) { Order order = message.Body as Order; if (order.OrderCode.StartsWith(OrderCodePrefix)) { System.Console.WriteLine("这个是个正确的以({0})开头的订单:{1}", OrderCodePrefix,order.OrderCode); } else { System.Console.WriteLine("这个是个错误的订单,没有以({0})开头:{1}",OrderCodePrefix,order.OrderCode); } }
public class OrderServiceProxy:IOrderService { public void Submit(Message.BaseMessage message) { MessageQueue.MessageQueue.GlobalQueue.Enqueue(message); } }
OrderService orderService = new OrderService(); MessageQueue.MessageQueue.GlobalQueue.MessageNotifyEvent += orderService.Submit; var orders = new List<Order>() { new Order(){OrderCode="P001"}, new Order(){OrderCode="P002"}, new Order(){OrderCode="B003"} }; OrderServiceProxy proxy = new OrderServiceProxy(); orders.ForEach(order => proxy.Submit(new Message.BaseMessage() { Body=order})); Console.ReadLine();