그래서 제가 앞장서서 나눔에 참여하기 위해 기침기침기침~, 사실 그 KPI
를 위한 것이 아니고, 그냥 모두와 함께 배우고 싶어요!
KPI
。不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多做技术交流还是很有助于个人成长的。于是乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点
KPI
,就是想和大伙一起学习学习!相关学习推荐:Java视频教程
这次我分享的是
springboot
+rabbitmq
如何实现消息确认机制,以及在实际开发中的一点踩坑经验,其实整体的内容比较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。可以看到使用了
RabbitMQ
以后,我们的业务链路明显变长了,虽然做到了系统间的解耦,但可能造成消息丢失的场景也增加了。例如:
消息生产者 - > rabbitmq服务器(消息发送失败)
rabbitmq服务器自身故障导致消息丢失
消息消费者 - > rabbitmq服务(消费消息失败)
所以说能不使用中间件就尽量不要用,如果为了用而用只会徒增烦恼。开启消息确认机制以后,尽管很大程度上保证了消息的准确送达,但由于频繁的确认交互,rabbitmq
整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议你用消息确认机制。
下边我们先来实现
springboot
+rabbitmq
消息确认机制,再对遇到的问题做具体分析。一、准备环境
1、引入 rabbitmq 依赖包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>로그인 후 복사2、修改 application.properties 配置
配置中需要开启
发送端
和消费端
的消息确认。spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 发送者开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制 spring.rabbitmq.publisher-returns=true#################################################### # 设置消费端手动 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重试 spring.rabbitmq.listener.simple.retry.enabled=true로그인 후 복사3、定义 Exchange 和 Queue
定义交换机
confirmTestExchange
和队列confirm_test_queue
,并将队列绑定在交换机上。@Configurationpublic class QueueConfig { @Bean(name = "confirmTestQueue") public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); }}로그인 후 복사
rabbitmq
的消息确认分为两部分:发送消息确认 和 消息接收确认。二、消息发送确认
发送消息确认:用来确认生产者
producer
将消息发送到broker
,broker
上的交换机exchange
再投递给队列queue
的过程中,消息是否成功投递。消息从
producer
到rabbitmq broker
有一个confirmCallback
确认模式。消息从
exchange
到queue
投递失败有一个returnCallback
退回模式。我们可以利用这两个
Callback
来确保消的100%送达。1、 ConfirmCallback确认模式
消息只要被
rabbitmq broker
接收到就会触发confirmCallback
回调 。@Slf4j @Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("消息发送异常!"); } else { log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } }}로그인 후 복사实现接口
ConfirmCallback
,重写其confirm()
方法,方法内有三个参数correlationData
、ack
、cause
。
correlationData
:对象内部只有一个id
属性,用来表示当前消息的唯一性。ack
:消息投递到broker
的状态,true
表示成功。cause
:表示投递失败的原因。但消息被
blockquote >🎜이번에는broker
接收到只能表示已经到达 MQ服务器,并不能保证消息一定会被投递到目标queue
里。所以接下来需要用到returnCallback
관련 학습 권장사항: Java 동영상 튜토리얼springboot
+rabbitmq
가 메시지 확인 메커니즘을 구현하는 방법과 실제 개발에서의 몇 가지 함정 경험을 공유합니다. 실제로 전체적인 내용은 비교적 간단합니다. . 때로는 상황이 너무 마술적이고 단순할수록 잘못될 가능성이 더 높습니다. 🎜🎜RabbitMQ
를 사용한 후 비즈니스 링크가 확실히 길어진 것을 볼 수 있습니다. 시스템 간의 분리가 이루어졌지만 메시지 손실을 일으킬 수 있는 시나리오도 늘어났습니다. 예: 🎜🎜
- 🎜메시지 생성자 - >rabbitmq 서버(메시지 전송 실패) 🎜
- 🎜rabbitmq 서버 자체 오류로 인해 메시지 손실이 발생함🎜
- 🎜메시지 소비 작성자 - > Rabbitmq 서비스(메시지 사용 실패) 🎜
따라서 미들웨어를 사용할 수 없다면 사용하지 마세요. 단지 목적으로 사용한다면 더 많은 문제를 야기할 뿐입니다. 메시지 확인 메커니즘을 켠 후에는 메시지의 정확한 전달이 대부분 보장되지만 빈번한 확인 상호 작용으로 인해rabbitmq
의 전체 효율성이 낮아지고 처리량이 심각하게 떨어지며 이는 그다지 중요하지 않습니다. 메시지 확인 메커니즘을 사용하는 것은 권장되지 않습니다. 🎜
🎜먼저springboot
+rabbitmq
메시지 확인 메커니즘을 구현한 다음 발생한 문제를 자세히 분석해 보겠습니다. 🎜1. 환경 준비
span >1.rabbitmq 종속성 패키지를 도입합니다.
@Slf4j @Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); }}로그인 후 복사로그인 후 복사2. application.properties 구성을 수정합니다.
🎜필요합니다. <코드>발신자 및소비자
의 메시지 확인 구성에서 활성화됩니다. 🎜@Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 确保消息发送失败后可以重新返回到队列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 发送消息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); }로그인 후 복사로그인 후 복사3. Exchange 및 대기열 정의
🎜confirmTestExchange
스위치 및confirm_test_queue 대기열 정의
를 선택하고 대기열을 스위치에 바인딩합니다. 🎜@Slf4j @Component @RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具体业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}로그인 후 복사로그인 후 복사🎜🎜🎜< h2 >2. 메시지 전송 확인🎜메시지 전송 확인:rabbitmq
의 메시지 확인은 메시지 전송 확인과 메시지 수신 확인 두 부분으로 나누어집니다. 🎜Producer
가 메시지를 보냈는지 확인하는 데 사용됩니다. message 메시지가브로커
에 성공적으로 전달되었는지 여부,브로커
의 스위치exchange
가queue< 대기열로 전달됩니다. /코드>. 🎜🎜<code>Producer
에서rabbitmq Broker
로 보내는 메시지에는confirmCallback
확인 모드가 있습니다. 🎜🎜exchange
에서queue
로의 메시지 전달 실패에는returnCallback
반환 모드가 있습니다. 🎜🎜이 두 개의콜백
을 사용하여 메시지 전달을 100% 보장할 수 있습니다. 🎜1. 확인 콜백 확인 모드
🎜 메시지는rabbitmq 브로커<에 의해 수신되는 한 트리거됩니다. /code>< code>confirmCallback
콜백. 🎜🎜void basicAck(long deliveryTag, boolean multiple)로그인 후 복사로그인 후 복사ConfirmCallback
인터페이스를 구현하고 해당confirm()
메서드를 다시 작성합니다. 이 메서드에는correlationData
및ack라는 세 가지 매개변수가 있습니다.
,원인
. 🎜🎜그러나
correlationData
: 개체 내부에는 현재 메시지의 고유성을 나타내는 데 사용되는id
속성이 하나만 있습니다.ack
:브로커
로의 메시지 전달 상태,true
는 성공을 나타냅니다.cause
: 전송 실패의 이유를 나타냅니다.broker
가 수신한 메시지는 MQ 서버에 도착했다는 의미일 뿐이며 메시지가 대상에 전달된다는 보장은 없습니다. 코드>큐. 따라서 다음에는returnCallback
을 사용해야 합니다. 🎜2、 ReturnCallback 退回模式
如果消息未能投递到目标
queue
里将触发回调returnCallback
,一旦向queue
投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。@Slf4j @Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); }}로그인 후 복사로그인 후 복사实现接口
ReturnCallback
,重写returnedMessage()
方法,方法有五个参数message
(消息体)、replyCode
(响应code)、replyText
(响应内容)、exchange
(交换机)、routingKey
(队列)。下边是具体的消息发送,在
rabbitTemplate
中设置Confirm
和Return
回调,我们通过setDeliveryMode()
对消息做持久化处理,为了后续测试创建一个CorrelationData
对象,添加一个id
为10000000000
。@Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 确保消息发送失败后可以重新返回到队列中 * 注意:yml需要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 发送消息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); }로그인 후 복사로그인 후 복사三、消息接收确认
消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(
ack
)的过程。使用@RabbitHandler
注解标注的方法要增加channel
(信道)、message
两个参数。@Slf4j @Component @RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具体业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } }}로그인 후 복사로그인 후 복사消费消息有三种回执方法,我们来分析一下每种方法的含义。
1、basicAck
basicAck
:表示成功确认,使用此回执方法后,消息会被rabbitmq broker
删除。void basicAck(long deliveryTag, boolean multiple)로그인 후 복사로그인 후 복사
deliveryTag
:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag
都会增加。手动消息确认模式下,我们可以对指定deliveryTag
的消息进行ack
、nack
、reject
等操作。
multiple
:是否批量确认,值为true
则会一次性ack
所有小于当前消息deliveryTag
的消息。举个栗子: 假设我先发送三条消息
deliveryTag
分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag
为8,multiple
设置为 true,会将5、6、7、8的消息全部进行确认。2、basicNack
basicNack
:表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。void basicNack(long deliveryTag, boolean multiple, boolean requeue)로그인 후 복사
deliveryTag
:表示消息投递序号。
multiple
:是否批量确认。
requeue
:值为true
消息将重新入队列。3、basicReject
basicReject
:拒绝消息,与basicNack
区别在于不能进行批量操作,其他用法很相似。void basicReject(long deliveryTag, boolean requeue)로그인 후 복사
deliveryTag
:表示消息投递序号。
requeue
:值为true
消息将重新入队列。四、测试
发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。
用抓包工具Wireshark
观察一下rabbitmq
amqp协议交互的变化,也多了ack
的过程。五、踩坑日志
1、不消息确认
这是一个非常没技术含量的坑,但却是非常容易犯错的地方。
开启消息确认机制,消费消息别忘了
channel.basicAck
,否则消息会一直存在,导致重复消费。2、消息无限投递
在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息,
int a = 1 / 0
发生异常后将消息重新投入队列。@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消费者 2 号收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }로그인 후 복사但是有个问题是,业务代码一旦出现
bug
99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。本地的
CPU
被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。
而且rabbitmq management
只有一条未被确认的消息。经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。
消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。
而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg));로그인 후 복사但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入
MySQL
并推送报警,进行人工处理和定时任务做补偿。3、重复消费
如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助
MySQL
、或者redis
将消息持久化,通过再消息中的唯一性属性校验。
위 내용은 springboot + Rabbitmq에서 메시지로 확인하는 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!