springboot + Rabbitmqのメッセージで確認する方法

coldplay.xixi
リリース: 2020-07-02 09:13:43
転載
2383 人が閲覧しました

springboot + Rabbitmqのメッセージで確認する方法

最近、社内の技術的な雰囲気を活性化するためと言って、部署がもっと技術共有セッションを開催するよう呼びかけていますが、ずっと見てきた私は、この T M はブラッシング専用であることを知っておいてくださいKPI。しかし、そうは言っても、これは確かに良いことであり、退屈な議論の会議をする代わりに、より多くの技術的な意見交換を行うことは、個人の成長に非常に役立ちます。

それで、私は率先して共有に参加するためにサインアップしました、咳咳咳〜、それは本当にそのためではありません KPI、私はただみんなと一緒に学びたいだけです!

springboot + Rabbitmqのメッセージで確認する方法

関連する学習に関する推奨事項: Java ビデオ チュートリアル

今回共有する内容は次のとおりです。 springboot rabbitmq メッセージ確認機構の実装方法と、実際の開発での落とし穴体験 実際、全体的な内容は比較的シンプルです。間違いを犯しやすくなります。

RabbitMQ を使用した後、ビジネス リンクが明らかに長くなったことがわかります。システム間の分離は達成されましたが、メッセージ損失を引き起こす可能性のあるシナリオも増加しました。例:

  • #メッセージプロデューサー -> Rabbitmq サーバー (メッセージ送信失敗)

  • rabbitmq サーバー自体の障害によりメッセージ損失が発生しました

  • メッセージ コンシューマー - > Rabbitmq サービス (メッセージの消費に失敗しました)

springboot + Rabbitmqのメッセージで確認する方法したがって、ミドルウェアを使用できない場合は、使用しないでください。使用するためだけに使用すると、トラブルが増えるだけです。メッセージ確認メカニズムがオンになった後は、メッセージの正確な配信がかなりの程度保証されますが、頻繁な確認対話により、
rabbitmq 全体の効率が低下し、スループットが大幅に低下します。非常に重要なメッセージは実際には評価されません。メッセージ確認メカニズムを使用することをお勧めします。


まず、

springboot rabbitmq メッセージ確認メカニズムを実装してから、発生した問題を詳細に分析しましょう。

1.環境を準備します

1.rabbitmq 依存関係パッケージを導入します
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId></dependency>
ログイン後にコピー

2. application.properties 構成を変更します。

構成では、

senderconsumer のメッセージ確認を有効にする必要があります。

spring.rabbitmq.host=127.0.0.1spring.rabbitmq.port=5672spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 发送者开启 confirm 确认机制
spring.rabbitmq.publisher-confirms=true# 发送者开启 return 确认机制
spring.rabbitmq.publisher-returns=true####################################################
# 设置消费端手动 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重试
spring.rabbitmq.listener.simple.retry.enabled=true
ログイン後にコピー

3. Exchange と Queue を定義します

switch

confirmTestExchange と queueconfirm_test_queue を定義し、キューを on にバインドしますスイッチ。

@Configurationpublic class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }}
ログイン後にコピー

rabbitmq のメッセージ確認は、メッセージ送信確認とメッセージ受信確認の 2 つの部分に分かれています。

springboot + Rabbitmqのメッセージで確認する方法

#2. メッセージ送信確認メッセージ送信確認:プロデューサー

プロデューサー##を確認するために使用されます。 # メッセージを

brokerbroker 上の交換 exchange に送信し、それをキュー queue に配信するプロセスで、チェックを入れます。メッセージが正常に配信されたかどうか。 Producer から

rabbitmq Broker

へのメッセージには、confirmCallback 確認モードがあります。 exchange から

queue

へのメッセージ配信失敗に対する returnCallback フォールバック モードがあります。 これら 2 つの Callback を使用すると、メッセージを 100% 確実に配信できます。

1. confirmCallback 確認モード

メッセージが rabbitmq ブローカー によって受信される限り、

confirmCallback

コールバックは引き起こされる。 <div class="code" style="position:relative; padding:0px; margin:0px;"><pre class="brush:php;toolbar:false;">@Slf4j @Componentpublic class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error(&quot;消息发送异常!&quot;); } else { log.info(&quot;发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}&quot;, correlationData.getId(), ack, cause); } }}</pre><div class="contentsignin">ログイン後にコピー</div></div> インターフェイス confirmCallback を実装し、その

confirm()

メソッドを書き換えます。メソッドには correlationDataack という 3 つのパラメータがあります。 ## #、 ###原因###。 correlationData: オブジェクト内には id 属性が 1 つだけあり、現在のメッセージの一意性を示すために使用されます。

  • ack: brokertrue
  • へのメッセージ配信のステータスは成功を示します。
  • cause: 配信失敗の理由を示します。 しかし、メッセージが
  • broker
  • によって受信されたということは、メッセージが MQ サーバーに到着したことを意味するだけであり、メッセージがターゲット ## に配信されるという保証はありません。 #列###。したがって、次に returnCallback を使用する必要があります。
  • 2、 ReturnCallback 退回模式

    如果消息未能投递到目标 queue 里将触发回调 returnCallback ,一旦向 queue 投递消息未成功,这里一般会记录下当前消息的详细投递数据,方便后续做重发或者补偿等操作。

    @Slf4j
    @Componentpublic class ReturnCallbackService implements RabbitTemplate.ReturnCallback {
    
        @Override    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
        }}
    ログイン後にコピー

    实现接口ReturnCallback,重写 returnedMessage() 方法,方法有五个参数message(消息体)、replyCode(响应code)、replyText(响应内容)、exchange(交换机)、routingKey(队列)。

    下边是具体的消息发送,在rabbitTemplate中设置 ConfirmReturn 回调,我们通过setDeliveryMode()对消息做持久化处理,为了后续测试创建一个 CorrelationData对象,添加一个id10000000000

    @Autowired    private RabbitTemplate rabbitTemplate;
    
        @Autowired    private ConfirmCallbackService confirmCallbackService;
    
        @Autowired    private ReturnCallbackService returnCallbackService;
    
        public void sendMessage(String exchange, String routingKey, Object msg) {
    
            /**
             * 确保消息发送失败后可以重新返回到队列中
             * 注意:yml需要配置 publisher-returns: true
             */
            rabbitTemplate.setMandatory(true);
    
            /**
             * 消费者确认收到消息后,手动ack回执回调处理
             */
            rabbitTemplate.setConfirmCallback(confirmCallbackService);
    
            /**
             * 消息投递到队列失败回调处理
             */
            rabbitTemplate.setReturnCallback(returnCallbackService);
    
            /**
             * 发送消息
             */
            rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                    message -> {
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        return message;
                    },
                    new CorrelationData(UUID.randomUUID().toString()));
        }
    ログイン後にコピー

    三、消息接收确认

    消息接收确认要比消息发送确认简单一点,因为只有一个消息回执(ack)的过程。使用@RabbitHandler注解标注的方法要增加 channel(信道)、message 两个参数。

    @Slf4j
    @Component
    @RabbitListener(queues = "confirm_test_queue")public class ReceiverMessage1 {
    
        @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {
    
            try {
                log.info("小富收到消息:{}", msg);
    
                //TODO 具体业务
    
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
            }  catch (Exception e) {
    
                if (message.getMessageProperties().getRedelivered()) {
    
                    log.error("消息已重复处理失败,拒绝再次接收...");
    
                    channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息
                } else {
    
                    log.error("消息即将再次返回队列处理...");
    
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
                }
            }
        }}
    ログイン後にコピー

    消费消息有三种回执方法,我们来分析一下每种方法的含义。

    1、basicAck

    basicAck:表示成功确认,使用此回执方法后,消息会被rabbitmq broker 删除。

    void basicAck(long deliveryTag, boolean multiple)
    ログイン後にコピー

    deliveryTag:表示消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加。手动消息确认模式下,我们可以对指定deliveryTag的消息进行acknackreject等操作。

    multiple:是否批量确认,值为 true 则会一次性 ack所有小于当前消息 deliveryTag 的消息。

    举个栗子: 假设我先发送三条消息deliveryTag分别是5、6、7,可它们都没有被确认,当我发第四条消息此时deliveryTag为8,multiple设置为 true,会将5、6、7、8的消息全部进行确认。

    2、basicNack

    basicNack :表示失败确认,一般在消费消息业务异常时用到此方法,可以将消息重新投递入队列。

    void basicNack(long deliveryTag, boolean multiple, boolean requeue)
    ログイン後にコピー

    deliveryTag:表示消息投递序号。

    multiple:是否批量确认。

    requeue:值为 true 消息将重新入队列。

    3、basicReject

    basicReject:拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。

    void basicReject(long deliveryTag, boolean requeue)
    ログイン後にコピー

    deliveryTag:表示消息投递序号。

    requeue:值为 true 消息将重新入队列。

    四、测试

    发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。
    springboot + Rabbitmqのメッセージで確認する方法
    用抓包工具Wireshark 观察一下rabbitmq amqp协议交互的变化,也多了 ack 的过程。
    springboot + Rabbitmqのメッセージで確認する方法

    五、踩坑日志

    1、不消息确认

    这是一个非常没技术含量的坑,但却是非常容易犯错的地方。

    开启消息确认机制,消费消息别忘了channel.basicAck,否则消息会一直存在,导致重复消费。
    springboot + Rabbitmqのメッセージで確認する方法

    2、消息无限投递

    在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0 发生异常后将消息重新投入队列。

    @RabbitHandler    public void processHandler(String msg, Channel channel, Message message) throws IOException {
    
            try {
                log.info("消费者 2 号收到:{}", msg);
    
                int a = 1 / 0;
    
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    
            } catch (Exception e) {
    
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        }
    ログイン後にコピー

    但是有个问题是,业务代码一旦出现 bug 99.9%的情况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,导致了死循环。

    springboot + Rabbitmqのメッセージで確認する方法

    本地的CPU被瞬间打满了,大家可以想象一下当时在生产环境导致服务死机,我是有多慌。

    springboot + Rabbitmqのメッセージで確認する方法
    而且rabbitmq management 只有一条未被确认的消息。

    springboot + Rabbitmqのメッセージで確認する方法

    经过测试分析发现,当消息重新投递到消息队列时,这条消息不会回到队列尾部,仍是在队列头部。

    消费者会立刻消费这条消息,业务处理再抛出异常,消息再重新入队,如此反复进行。导致消息队列处理出现阻塞,导致正常消息也无法运行。

    而我们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时我们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。

    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);// 重新发送消息到队尾channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                        message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                        JSON.toJSONBytes(msg));
    ログイン後にコピー

    但这种方法并没有解决根本问题,错误消息还是会时不时报错,后面优化设置了消息重试次数,达到了重试上限以后,手动确认,队列删除此消息,并将消息持久化入MySQL并推送报警,进行人工处理和定时任务做补偿。

    3、重复消费

    如何保证 MQ 的消费是幂等性,这个需要根据具体业务而定,可以借助MySQL、或者redis 将消息持久化,通过再消息中的唯一性属性校验。

    以上がspringboot + Rabbitmqのメッセージで確認する方法の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

関連ラベル:
ソース:learnku.com
このウェブサイトの声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
人気のチュートリアル
詳細>
最新のダウンロード
詳細>
ウェブエフェクト
公式サイト
サイト素材
フロントエンドテンプレート