メッセージ蓄積の生成シナリオ:
プロデューサによって生成されるメッセージの速度は、消費者の消費速度よりも高速です。解決策: コンシューマーの数または速度を増やします。
消費者がいない場合。解決策: デッドレターキュー、メッセージの有効期間を設定します。これは、メッセージに有効期間を設定するのと同じです。指定された時間内に消費がない場合、メッセージは自動的に期限切れになります。期限が切れると、クライアント コールバック監視メソッドが実行され、メッセージがデータベース テーブル レコードに保存されます。補償は後から実現します。
<dependencies> <!-- springboot-web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- 添加springboot对amqp的支持 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <!--fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.49</version> </dependency> </dependencies>
server: # 服务启动端口配置 port: 8081 servlet: # 应用访问路径 context-path: / spring: #增加application.druid.yml 的配置文件 # profiles: # active: rabbitmq rabbitmq: ####连接地址 host: www.kaicostudy.com ####端口号 port: 5672 ####账号 username: kaico ####密码 password: kaico ### 地址 virtual-host: /kaicoStudy ###模拟演示死信队列 kaico: dlx: exchange: kaico_order_dlx_exchange queue: kaico_order_dlx_queue routingKey: kaico.order.dlx ###备胎交换机 order: exchange: kaico_order_exchange queue: kaico_order_queue routingKey: kaico.order
@Configuration public class DeadLetterMQConfig { /** * 订单交换机 */ @Value("${kaico.order.exchange}") private String orderExchange; /** * 订单队列 */ @Value("${kaico.order.queue}") private String orderQueue; /** * 订单路由key */ @Value("${kaico.order.routingKey}") private String orderRoutingKey; /** * 死信交换机 */ @Value("${kaico.dlx.exchange}") private String dlxExchange; /** * 死信队列 */ @Value("${kaico.dlx.queue}") private String dlxQueue; /** * 死信路由 */ @Value("${kaico.dlx.routingKey}") private String dlxRoutingKey; /** * 声明死信交换机 * * @return DirectExchange */ @Bean public DirectExchange dlxExchange() { return new DirectExchange(dlxExchange); } /** * 声明死信队列 * * @return Queue */ @Bean public Queue dlxQueue() { return new Queue(dlxQueue); } /** * 声明订单业务交换机 * * @return DirectExchange */ @Bean public DirectExchange orderExchange() { return new DirectExchange(orderExchange); } /** * 绑定死信队列到死信交换机 * * @return Binding */ @Bean public Binding binding() { return BindingBuilder.bind(dlxQueue()) .to(dlxExchange()) .with(dlxRoutingKey); } /** * 声明订单队列,并且绑定死信队列 * * @return Queue */ @Bean public Queue orderQueue() { // 订单队列绑定我们的死信交换机 Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-dead-letter-exchange", dlxExchange); arguments.put("x-dead-letter-routing-key", dlxRoutingKey); return new Queue(orderQueue, true, false, false, arguments); } /** * 绑定订单队列到订单交换机 * * @return Binding */ @Bean public Binding orderBinding() { return BindingBuilder.bind(orderQueue()) .to(orderExchange()) .with(orderRoutingKey); } }
@Component public class OrderDlxConsumer { /** * 死信队列监听队列回调的方法 * @param msg */ @RabbitListener(queues = "kaico_order_dlx_queue") public void orderDlxConsumer(String msg) { System.out.println("死信队列消费订单消息" + msg); } }
@Component public class OrderConsumer { /** * 监听队列回调的方法 * * @param msg */ @RabbitListener(queues = "kaico_order_queue") public void orderConsumer(String msg) { System.out.println("正常订单消费者消息msg:" + msg); } }
acknowledge-mode: Manual第 2 ステップ、コンシューマー Java コード
int result = orderMapper.addOrder(orderEntity); if (result >= 0) { // 开启消息确认机制 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); }
理由: コンシューマが自動再試行をオンにする可能性があり、再試行プロセスによってコンシューマのビジネス ロジック コードが繰り返し実行される可能性があります。現時点では、メッセージは消費されています。ビジネス エラーによりメッセージが再び消費されるため、
解決策が表示されます: メッセージ グローバル ID を使用して、ビジネスに従って決定します。コンシューマはこれを判断できます。ビジネス ID (グローバル固有 ID) に基づくメッセージ。メッセージは消費されました。
コンシューマ コード ロジック:
##RabbitMQ は分散トランザクションの問題を解決します分散トランザクション: 分散システムでは、複数の異なるトランザクションが存在するため、サービスコールインタフェース内で動作し、各トランザクションは相互に影響を与えません。分散トランザクションには問題があります。 分散トランザクションを解決するための中心的な考え方: データの最終的な整合性。 分散フィールドの名詞: 強い一貫性: 同期速度が非常に速いか、ロック メカニズムがダーティ リードを許可しないかのいずれか; 強い一貫性ソリューション: いずれかのデータベースA がデータをデータ B に非常に迅速に同期するか、データベース A の同期が完了する前にデータベース B がデータを読み取ることができません。 弱い整合性: 読み取りが許可されるデータは元のダーティ データであり、読み取られた結果の不整合が許可されます。 最終的な整合性: 分散システムでは、データがネットワークを通じて同期的に通信されるため、短いデータ遅延は許容されますが、最終的なデータは一貫性がなければなりません。 RabbitMQ に基づいて分散トランザクションを解決するというアイデアRabbitMQ に基づいて分散トランザクションを解決するというアイデア: (最終整合性ソリューションの採用)以上がJava RabbitMQメッセージキューの一般的な問題と解決策の分析の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。