Apabila perkhidmatan RabbitMQ dihentikan, mesej yang dihantar oleh pengeluar mesej tidak akan hilang. Secara lalai, baris gilir dan mesej diabaikan apabila RabbitMQ keluar atau ranap. Untuk memastikan mesej tidak hilang, kedua-dua baris gilir dan mesej perlu ditandakan sebagai berterusan.
1 Kegigihan baris gilir: Tukar parameter kedua channel.queueDeclare();
kepada benar apabila membuat baris gilir.
2. Ketekunan mesej: Apabila menggunakan saluran untuk menghantar mesej, channel.basicPublish();
tukar parameter ketiga kepada: MessageProperties.PERSISTENT_TEXT_PLAIN
untuk menunjukkan kegigihan mesej.
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化队列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
Walau bagaimanapun, terdapat selang cache untuk menyimpan mesej Tiada tulisan sebenar pada cakera, dan jaminan ketahanan tidak cukup kuat, tetapi ia lebih daripada cukup untuk baris gilir yang mudah. .
Kaedah pengagihan pengundian tidak sesuai apabila pengguna mempunyai kecekapan pemprosesan yang berbeza. Oleh itu, keadilan sebenar harus mengikut premis bahawa mereka yang boleh melakukan lebih banyak kerja harus berbuat demikian.
Ubah suai channel.basicQos(1);
di pihak pengguna untuk menunjukkan menghidupkan pengedaran tidak adil
/** * @Description 不公平分发消费者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡三十秒 try { Thread.sleep(30000); System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 设置不公平分发 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
Tujuan ujian : Adakah mungkin untuk menyedari bahawa mereka yang boleh melakukan lebih banyak kerja.
Kaedah ujian: Dua pengguna tidur peristiwa berbeza untuk mensimulasikan peristiwa pemprosesan berbeza Jika masa pemprosesan (masa tidur) cukup singkat untuk memproses berbilang mesej, tujuannya tercapai.
Mula-mula mulakan pengeluar untuk membuat baris gilir, dan kemudian mulakan kedua-dua pengguna masing-masing.
Pengeluar menghantar empat mesej mengikut urutan:
Thread A dengan masa tidur yang singkat menerima tiga mesej
Benang B, yang tidur lama, hanya menerima mesej kedua:
Sebab benang B memakan Ia mengambil masa yang lama , jadi mesej lain diberikan kepada urutan A.
Percubaan berjaya!
Penghantaran dan pengesahan manual mesej diselesaikan secara tidak segerak, jadi terdapat penimbal bagi mesej yang tidak disahkan, dan pembangun berharap untuk mengehadkan Saiz penimbal, digunakan untuk mengelakkan masalah mesej tidak diketahui tanpa had dalam penimbal.
Nilai yang dijangkakan di sini ialah parameter dalam kaedah di atas channel.basicQos();
Jika terdapat mesej yang sama dengan parameter pada saluran semasa, saluran semasa tidak akan diatur untuk menggunakan mesej.
Kaedah ujian:
1 Cipta dua pengguna berbeza dan berikan nilai jangkaan 5 dan 2 masing-masing.
2. Tetapkan masa tidur yang panjang sebagai 5, dan masa tidur yang singkat sebagai 2.
3 Jika mesej diperoleh mengikut nilai jangkaan yang ditetapkan, ini bermakna ujian itu berjaya, tetapi ia tidak bermakna ia akan diedarkan mengikut 5 dan 2. Ini sama dengan pertimbangan berat. .
Kod boleh mengubah suai nilai yang dijangkakan mengikut kod di atas.
Pengesahan keluaran ialah proses di mana selepas pengeluar menerbitkan mesej ke baris gilir, pengesahan baris gilir diteruskan dan kemudian dimaklumkan kepada pengeluar. Ini memastikan bahawa mesej tidak akan hilang.
Perlu diambil perhatian bahawa kegigihan baris gilir perlu dihidupkan untuk menggunakan penerbitan yang disahkan.
Kaedah pembukaan: channel.confirmSelect();
ialah kaedah penerbitan segerak, iaitu selepas menghantar mesej, hanya selepas ia disahkan dan diterbitkan, susulan Mesej akan terus diterbitkan Jika tiada pengesahan dalam masa yang ditetapkan, pengecualian akan dilemparkan. Kelemahannya ialah ia sangat perlahan.
/** * @Description 确认发布——单个确认 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Keluaran pengesahan kelompok demi kelompok boleh meningkatkan daya pengeluaran sistem. Walau bagaimanapun, kelemahannya ialah apabila kegagalan berlaku dan terdapat masalah dengan penerbitan, keseluruhan kumpulan perlu disimpan dalam ingatan dan diterbitkan semula kemudian.
/** * @Description 确认发布——批量确认 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Jelas sekali kecekapannya jauh lebih tinggi daripada keluaran pengesahan tunggal.
adalah lebih rumit daripada dua di atas dari segi pengaturcaraan, tetapi ia sangat menjimatkan kos sama ada kebolehpercayaan atau kecekapan, ia jauh lebih baik . Gunakan fungsi panggil balik untuk Untuk mencapai penghantaran mesej yang boleh dipercayai.
/** * @Description 确认发布——异步确认 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未确认的消息:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Cara terbaik untuk mengendalikan mesej yang tidak diketahui ialah meletakkan mesej yang tidak diketahui dalam baris gilir berasaskan memori yang boleh diakses oleh penerbitan benang.
Contohnya: ConcurrentLinkedQueue
boleh memindahkan mesej antara baris gilir pengesahan confirm callbacks
dan urutan penerbitan.
Kaedah pemprosesan:
1 Rekod semua mesej yang akan dihantar
2
3.Cetak mesej yang belum disahkan. Gunakan jadual cincang untuk menyimpan mesej, kelebihannya:可以将需要和消息进行关联;轻松批量删除条目;支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
Atas ialah kandungan terperinci Kaedah pelaksanaan pengesahan kegigihan dan pelepasan dalam Java RabbitMQ. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!