SpringBoot
intègre RabbitMQ
pour implémenter l'envoi de messages. SpringBoot
整合 RabbitMQ
实现消息的发送。
1.添加 maven
依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2.添加 application.yml 配置文件
spring: rabbitmq: host: 192.168.3.19 port: 5672 username: admin password: xxxx
3.配置交换机、队列以及绑定
@Bean public DirectExchange myExchange() { DirectExchange directExchange = new DirectExchange("myExchange"); return directExchange; } @Bean public Queue myQueue() { Queue queue = new Queue("myQueue"); return queue; } @Bean public Binding binding() { return BindingBuilder.bind(myQueue()).to(myExchange()).with("myRoutingKey"); }
4.生产发送消息
@Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/send") public String send(String message) { rabbitTemplate.convertAndSend("myExchange","myRoutingKey",message); System.out.println("【发送消息】" + message) return "【send message】" + message; }
5.消费者接收消息
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time);
6.调用生产端发送消息 hello
,控制台输出:
【发送消息】hello
【接收信息】hello 当前时间2022-05-12 10:21:14
说明消息已经被成功接收。
一条消息的从生产到消费,消息丢失可能发生在以下几个阶段:
生产端丢失: 生产者无法传输到 RabbitMQ
存储端丢失: RabbitMQ
存储自身挂了
消费端丢失:存储由于网络问题,无法发送到消费端,或者消费挂了,无法发送正常消费
RabbitMQ
从生产端、储存端、消费端都对可靠性传输做很好的支持。
生产阶段通过请求确认机制,来确保消息的可靠传输。当发送消息到 RabbitMQ 服务器 之后,RabbitMQ 收到消息之后,给发送返回一个请求确认,表示RabbitMQ 服务器已成功的接收到了消息。
配置application.yml
spring: rabbitmq: # 消息确认机制 生产者 -> 交换机 publisher-confirms: true # 消息返回机制 交换机 -> 队列 publisher-returns: true
配置
@Configuration @Slf4j public class RabbitConfig { @Autowired private ConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("【correlationData】:" + correlationData); log.info("【ack】" + ack); log.info("【cause】" + cause); if (ack) { log.info("【发送成功】"); } else { log.info("【发送失败】correlationData:" + correlationData + " cause:" + cause); } } }); rabbitTemplate.setMandatory(true); rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("【消息发送失败】"); log.info("【message】" + message); log.info("【replyCode】" + replyCode); } }); return rabbitTemplate; } }
消息从 生产者 到 交换机, 有confirmCallback
确认模式。发送消息成功后消息会调用方法confirm(CorrelationData correlationData, boolean ack, String cause)
,根据 ack
判断消息是否发送成功。
消息从 交换机 到 队列,有returnCallback
退回模式。
发送消息 product message
控制台输出如下:
【发送消息】product message
【接收信息】product message 当前时间2022-05-12 11:27:56
【correlationData】:null
【ack】true
【cause】null
【发送成功】
这里有两个方案:
发送消息后立马关闭 broke,后者把网络关闭,但是broker关闭之后控制台一直就会报错,发送消息也报500错误。
发送不存在的交换机:
// myExchange 修改成 myExchangexxxxx rabbitTemplate.convertAndSend("myExchangexxxxx","myRoutingKey",message);
结果:
【correlationData】:null
【ack】false
【cause】channel error; protocol method: #method(reply-code=404, reply-text=NOT_FOUND - no exchange 'myExchangexxxxx' in vhost '/', class-id=60, method-id=40)
【发送失败】
当发送失败可以对消息进行重试
交换机正确,发送不存在的队列:
交换机接收到消息,返回成功通知,控制台输出:
【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
【ack】true
【cause】null
【发送成功】
交换机没有找到队列,返回失败信息:
【消息发送失败】
【message】product message
【replyCode】312
开启队列持久化,创建的队列和交换机默认配置是持久化的。首先把队列和交换机设置正确,修改消费监听的队列,使得消息存放在队列里。
修改队列的持久化,修改成非持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",false); return queue; }
发送消息之后,消息存放在队列中,然后重启 RabbitMQ
,消息不存在了。
设置队列持久化:
@Bean public Queue myQueue() { Queue queue = new Queue("myQueue",true); return queue; }
重启之后,队列的消息还存在。
消费端默认开始 ack
maven
spring: rabbitmq: # 手动消息确认 listener: simple: acknowledge-mode: manual
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
@RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process(String msg, Channel channel, Message message) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date date = new Date(); String time = sdf.format(date); System.out.println("【接收信息】" + msg + " 当前时间" + time); System.out.println(message.getMessageProperties().getDeliveryTag()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { e.printStackTrace(); } }
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
bonjour
, la sortie de la console :
[Envoyer un message] bonjourIndique que le message a été reçu avec succès.
[Recevoir un message] bonjour Heure actuelle 2022- 05-12 10:21:14
Analyse de perte de messages
🎜🎜🎜De la production à la consommation d'un message, la perte de message peut survenir dans les étapes suivantes : 🎜 RabbitMQ
🎜RabbitMQ
Le stockage lui-même est en panne🎜RabbitMQ
fournit un bon support pour une transmission fiable depuis le côté production, côté stockage et côté consommateur. 🎜🎜Phase de production🎜🎜La phase de production utilise le mécanisme de confirmation de demande pour assurer une transmission fiable des messages. Après avoir envoyé un message au serveur RabbitMQ, RabbitMQ reçoit le message et renvoie une confirmation de demande à l'expéditeur, indiquant que le serveur RabbitMQ a reçu avec succès le message. 🎜🎜Configurez application.yml🎜rrreee🎜Configurez 🎜rrreee🎜Messages du producteur au switch, avec le mode de confirmation confirmCallback
. Une fois l'envoi réussi du message, le message appellera la méthode confirm(CorrelationData corrélationData, boolean ack, String cause)
et déterminera si le message est envoyé avec succès en fonction de ack
. 🎜🎜Les messages du switch vers la file d'attente ont le mode de retour returnCallback
. 🎜🎜Envoyer un message message produit
Le résultat de la console est le suivant : 🎜🎜【Envoyer un message】message produit
【Recevoir un message】message produit Heure actuelle 2022-05-12 11 : 27 : 56
[correlationData]:null
[ack]true
[cause]null
[Envoyé avec succès]🎜
🎜[correlationData]:null🎜Vous pouvez réessayer le message en cas d'échec de l'envoi🎜🎜Le commutateur est correct, envoi vers une file d'attente qui n'existe pas :🎜🎜Le commutateur reçoit le message, renvoie une notification de réussite , et la sortie de la console :🎜🎜【correlationData】:CorrelationData [id=7d468b47-b422-4523-b2a2-06b14aef073c]
[ack]false
[cause] erreur de canal ; méthode de protocole : #method(reply-code=404,answer-text=NOT_FOUND - pas d'échange 'myExchangexxxxx' dans vhost '/', class-id=60, method-id=40)[Échec de l'envoi]🎜
🎜[Échec de l'envoi du message]
[message]message du produit
[replyCode]312🎜🎜RabbitMQ🎜🎜Activez la persistance de la file d'attente, les files d'attente et les commutateurs créésla configuration par défaut sont persistants. Tout d'abord, définissez correctement la file d'attente et le commutateur, puis modifiez la file d'attente pour le suivi de la consommation afin que les messages soient stockés dans la file d'attente. 🎜🎜Modifier la persistance de la file d'attente en non-persistance : 🎜rrreee🎜Après l'envoi du message, le message est stocké dans la file d'attente, puis redémarreRabbitMQ
, et le message n'existe plus.
Définir la persistance de la file d'attente : 🎜rrreee🎜Après le redémarrage, les messages dans la file d'attente existeront toujours. 🎜🎜Côté consommateur🎜🎜Le côté consommateur démarre le mode de confirmation automatiqueack
par défaut. Lorsque le message de la file d'attente est reçu par le consommateur, le message dans la file d'attente sera automatiquement supprimé, qu'il y en ait ou non. message du côté du consommateur. Par conséquent, afin de garantir que le consommateur peut consommer avec succès les messages, changez le mode automatique en mode de confirmation manuelle : 🎜🎜Modifiez le fichier application.yml🎜rrreee🎜Après avoir consommé et reçu le message, une confirmation manuelle est requise : 🎜rrreeerrreee🎜Si non ajouté : 🎜rrreee🎜Envoyer deux messages 🎜🎜Une fois le message reçu, il n'y a pas de confirmation et il est remis dans la file d'attente : 🎜🎜🎜🎜🎜Redémarrez le projet. Après cela, le message de la file d'attente sera envoyé au projet. consommateur, mais sans confirmation d'accusé de réception, il continuera à être remis dans la file d'attente. 🎜Après avoir ajouté
channel.basicAck
, redémarrez le projetchannel.basicAck
之后,再重启项目队列消息就被删除了
basicAck
方法最后一个参数multiple
表示是删除之前的队列。
multiple
设置成true
Le message de la file d'attente est supprimé
🎜basicAck
Le dernier paramètre de la méthodemultiple
signifie que la file d'attente avant la suppression. 🎜🎜multiple
est défini surtrue
et toutes les files d'attente suivantes sont effacées 🎜🎜🎜🎜
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!