Les fonctions de MQ incluent le découplage, l'asynchrone, etc.
Habituellement, les producteurs sont uniquement responsables de la production des messages et ne se soucient pas de savoir qui reçoit les messages ou quels sont les résultats de la consommation ; les consommateurs sont uniquement responsables de la réception des messages spécifiés pour le traitement commercial et ne se soucient pas d'où viennent les messages et répondent à l'entreprise. statut de traitement. Mais il y a une tâche particulière dans notre projet.En tant que producteur de messages, nous devons recevoir le résultat de la réponse du consommateur après avoir produit le message (pour le dire franchement, c'est similaire à l'utilisation par MQ de la réponse synchrone aux demandes d'appel). recherche, le mode de réponse de MQ (modèle de réponse directe) a été créé pour ce modèle commercial.
Dépendances :
Je ne liste que les dépendances principales requises par RabbitMq
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
Configuration :
Aucune autre configuration spéciale, car la réponse est juste une méthode interactive de lapinmq
spring: rabbitmq: host: 10.50.40.116 port: 5673 username: admin password: admin
package com.leilei.demo; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @author lei * @create 2022-09-19 21:44 * @desc mq配置 **/ @Configuration public class RabbitMqConfig { @Bean public Queue bizQueue() { return new Queue("bizQueue"); } @Bean public Queue replyQueue() { return new Queue("replyQueue"); } @Bean FanoutExchange bizExchange() { return new FanoutExchange("bizExchange"); } }
Classe affaires :
@Data @NoArgsConstructor @AllArgsConstructor public class Vehicle implements Serializable { private Integer id; private String name; }
Choses que la fin de la production des messages doit faire : produire des messages et les accepter Message réponse de consommation
(1) Produire un message
1. Produire des messages, en fonction du scénario commercial, choisissez de générer ou non un ID de message personnalisé unique au monde
2. (Réponse)
/** * 生产消息 * * @param * @return void * @author lei * @date 2022-09-19 21:59:18 */ public void replySend() { MessageProperties messageProperties = new MessageProperties(); messageProperties.setReplyTo("replyQueue"); //todo 根据业务,做一个严谨的全局唯一ID,我这里暂时用UUID String correlationId = UUID.randomUUID().toString(); // 我这里指定了唯一消息ID,看业务场景,消费者消费响应后,生产者端可根据消息ID做业务处理 messageProperties.setCorrelationId(correlationId); Vehicle vehicle = new Vehicle(1, "川A0001"); Message message = new Message(JSON.toJSONString(vehicle).getBytes(), messageProperties); rabbitTemplate.convertAndSend("bizExchange","",message); System.out.println("生产者发送消息,自定义消息ID为:" + correlationId); }
(2) Acceptez la réponse de réponse
Une fois que le consommateur a consommé le message, le résultat du traitement sera envoyé dans une file d'attente. En lisant cette file d'attente, nous pouvons obtenir le résultat de la réponse correspondante. message pour le traitement commercial
/** * 接收消息响应 * * @param message * @return void * @author lei * @date 2022-09-19 21:59:27 */ @RabbitListener(queues = "replyQueue") public void replyResponse(Message message) { String s = new String(message.getBody()); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("收到客户端响应消息ID:" + correlationId); //todo 根据消息ID可判断这是哪一个消息的响应,我们就可做业务操作 System.out.println("收到客户端响应消息:" + s); }
Ce que le consommateur du message doit faire est : accepter le message, puis effectuer le traitement commercial et répondre au message
De manière générale, nous consommons mq La méthode d'écoute n'a pas besoin de valeur de retour. Nous utilisons ici l'annotation sendTo, et le message auquel il faut répondre doit être défini comme valeur de retour. L'annotation sendTo spécifie à quelle file d'attente répondre. to
Points clés :
1. L'annotation sendTo spécifie la file d'attente correspondante (à noter qu'elle est cohérente avec la fin de production)
2. Le contenu de la valeur de retour défini par la méthode est le message. à répondre, qui sera finalement envoyé à l'annotation sendTo pour spécifier la file d'attente correspondante
3 Cette méthode L'inconvénient est que le côté consommateur est très important, car la file d'attente cible spécifiée par sendTo peut être écrite à l'aveugle, ce qui empêche le côté producteur de recevoir correctement la réponse au message, mais je crois que cela ne se fait pas dans les projets généraux
/** * 方式1 SendTo指定响应队列 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues ="bizQueue") @SendTo("replyQueue") public String handleEmailMessage(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客户端响应消息:"+msg+"处理完成!"; } catch (Exception e) { log.error("处理业务消息失败",e); } return null; }
Tout comme la méthode consommateur ordinaire, vous n'avez besoin que de l'annotation RabbitListener pour écouter la file d'attente professionnelle, mais vous devez également obtenir l'adresse ReplyTo en fonction du message, puis envoyer manuellement le message dans votre propre méthode consommateur
. 1. Avantages, vous pouvez ressentir plus fortement l'interactivité de la demande et de la réponse du message, et le processus semble plus clair
2 Inconvénients, le code n'est pas élégant
/** * 方式2 message消息获取内部reply rabbitmq手动发送 * * @param message * @return String * @author lei * @date 2022-09-19 16:17:52 */ @RabbitListener(queues = "bizQueue") public void handleEmailMessage2(Message message) { try { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}", msg); String replyTo = message.getMessageProperties().getReplyTo(); System.out.println("接收到的reply:" + replyTo); rabbitTemplate.convertAndSend(replyTo, "客户端响应消息:" + msg + "处理完成!", x -> { x.getMessageProperties().setCorrelationId(message.getMessageProperties().getCorrelationId()); return x; }); } catch (Exception e) { log.error("处理业务消息失败",e); } }
. Cette méthode est en fait cohérente avec 1, mais je l'ai testée. Étant donné que le message du producteur spécifie l'adresse de ReplyTo, le consommateur n'a pas besoin de la spécifier à nouveau manuellement, c'est-à-dire où produire le message, s'il doit répondre et où. pour envoyer le message de réponse. L'extrémité de production est vide d'elle-même, et le consommateur n'a qu'à traiter sa propre affaire et renvoyer les résultats
/** * 方式三 方法有返回值,返回要响应的数据 (reply 由生产者发送消息时指定,消费者不做任何处理) * * @param message * @return String * @author lei * @date 2022-09-19 23:17:47 */ @RabbitListener(queues ="bizQueue") public String handleEmailMessage3(Message message) { try { String msg=new String(message.getBody(), StandardCharsets.UTF_8); log.info("---consumer接收到消息----{}",msg); return "客户端响应消息:"+msg+"处理完成!"; } catch (Exception e) { log.error("处理业务消息失败",e); } return null; }
Message de production :
Message de consommation. et réponse :
Réponse reçue :
Lien :
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!