RabbitMQ サービスが停止しても、メッセージ プロデューサーによって送信されたメッセージは失われません。デフォルトでは、RabbitMQ が終了するかクラッシュすると、キューとメッセージは無視されます。メッセージが失われないようにするには、キューとメッセージの両方を永続的としてマークする必要があります。
1. キューの永続性: キューの作成時に、channel.queueDeclare();
の 2 番目のパラメーターを true に変更します。
2. メッセージの永続性: チャネルを使用してメッセージを送信する場合 channel.basicPublish();
3 番目のパラメーターを次のように変更します: MessageProperties.PERSISTENT_TEXT_PLAIN
は永続性情報を示します。
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化队列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
ただし、メッセージを保存するためのキャッシュ間隔があります。ディスクへの実際の書き込みはありません。耐久性の保証は十分強力ではありませんが、単純なキューには十分です。
ポーリング配信方法は、消費者ごとに処理効率が異なる場合には適しません。したがって、真の公平性は、より多くの仕事ができる人がより多くの仕事をすべきであるという前提に従うべきです。
コンシューマで変更するchannel.basicQos(1);
不公平な配布をオンにすることを示します
/** * @Description 不公平分发消费者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡三十秒 try { Thread.sleep(30000); System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 设置不公平分发 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
テスト目的: できる人ほど仕事ができるということが実感できるかどうか。
テスト方法: 2 人のコンシューマーが異なるイベントをスリープさせて、異なる処理イベントをシミュレートし、処理時間 (スリープ時間) が短く、複数のメッセージを処理できれば目的は達成されます。
最初にプロデューサーを起動してキューを作成し、次に 2 つのコンシューマーをそれぞれ起動します。
#プロデューサは 4 つのメッセージを順番に送信します:
スリープ時間が短いスレッド A は 3 つのメッセージを受信しましたスレッド B は長時間スリープ状態ですが、2 番目のメッセージのみを受信します: スレッド B は消費するため、時間がかかります, したがって、他のメッセージはスレッド A に割り当てられます。 実験は成功しました! 1.4 プリフェッチ値メッセージの送信と手動確認は非同期で完了するため、未確認メッセージのバッファーが存在します。開発者は、バッファ サイズを制限することを望んでいます。バッファ内に無制限の未確認メッセージが存在する問題。ここで期待される値は、上記のメソッド
channel.basicQos(); のパラメータです。現在のチャネルにパラメータと等しいメッセージがある場合、メッセージを消費するチャネル。
テスト方法:
1. 2 つの異なるコンシューマを作成し、それぞれ期待値 5 と 2 を与えます。 2.スリープ時間が長い場合は 5 を指定し、スリープ時間が短い場合は 2 を指定します。 3. 指定した期待値通りにメッセージが取得できればテストは成功ですが、5や2に従って分配されるわけではありません。 コードは上記のコードに従って期待値を変更できます。 2. リリース確認リリース確認は、プロデューサーがメッセージをキューにリリースした後、キューが永続性を確認してプロデューサーに通知するプロセスです。これにより、メッセージが失われることがなくなります。確認済み公開を使用するには、キューの永続性をオンにする必要があることに注意してください。
開始メソッド:
channel.confirmSelect();
は同期公開メソッドです。つまり、メッセージの送信後です。その後、公開が確認された場合に限り、後続のメッセージは引き続き公開されます。指定された時間内に確認がなかった場合は、例外がスローされます。欠点は非常に遅いことです。2.2 バッチ確認リリース/** * @Description 确认发布——单个确认 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }ログイン後にコピー
バッチ確認リリースにより、システムのスループットを向上させることができます。ただし、パブリッシュ中に障害が発生して問題が発生した場合、バッチ全体をメモリに保存し、後で再パブリッシュする必要があるという欠点があります。明らかに、その効率は 1 回の確認リリースよりもはるかに高くなります。 2.3 非同期確認リリース/** * @Description 确认发布——批量确认 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }ログイン後にコピー
は、上記の 2 つよりもプログラミングが複雑ですが、費用対効果が非常に高く、信頼性と効率のいずれにおいても、はるかに優れています。信頼性の高いメッセージ配信を実現するためのコールバック関数。2.4 未確認メッセージの処理/** * @Description 确认发布——异步确认 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未确认的消息:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }ログイン後にコピー
未確認メッセージを処理する最良の方法は、公開スレッドからアクセスできるメモリベースのキューに未確認メッセージを入れることです。例:
ConcurrentLinkedQueue確認キュー
confirm callbacks と公開スレッドの間でメッセージを転送できます。
処理方法:
1. 送信するすべてのメッセージを記録します; 2. リリースが成功したことを確認したら削除します; 3. 未確認のメッセージを出力します。 ハッシュ テーブルを使用してメッセージを保存することの利点:可以将需要和消息进行关联;轻松批量删除条目;支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
以上がJava RabbitMQにおける永続化確認とリリース確認の実装メソッドの詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。