RabbitMQ 서비스가 중지되더라도 메시지 생성자가 보낸 메시지는 손실되지 않습니다. 기본적으로 RabbitMQ가 종료되거나 충돌할 때 대기열과 메시지는 무시됩니다. 메시지가 손실되지 않도록 하려면 큐와 메시지를 모두 지속성으로 표시해야 합니다.
1. 대기열 지속성: 대기열을 생성할 때 channel.queueDeclare();
의 두 번째 매개변수를 true로 변경합니다. channel.queueDeclare();
第二个参数改为true。
2.消息持久化:在使用信道发送消息时channel.basicPublish();
将第三个参数改为:MessageProperties.PERSISTENT_TEXT_PLAIN
表示持久化消息。
/** * @Description 持久化MQ * @date 2022/3/7 9:14 */ public class Producer3 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 持久化队列 channel.queueDeclare(LONG_QUEUE,true,false,false,null); Scanner scanner = new Scanner(System.in); int i = 0; while (scanner.hasNext()){ i++; String msg = scanner.next() + i; // 持久化消息 channel.basicPublish("",LONG_QUEUE, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送消息:'" + msg + "'成功"); } } }
但是存储消息还有存在一个缓存的间隔点,没有真正的写入磁盘,持久性保证不够强,但是对于简单队列而言也绰绰有余。
轮询分发的方式在消费者处理效率不同的情况下并不适用。所以真正的公平应该是遵循能者多劳的前提。
在消费者处修改channel.basicQos(1);
表示开启不公平分发
/** * @Description 不公平分发消费者 * @date 2022/3/7 9:27 */ public class Consumer2 { private static final String LONG_QUEUE = "long_queue"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> { // 模拟并发沉睡三十秒 try { Thread.sleep(30000); System.out.println("线程B接收消息:"+ new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(),false); } catch (InterruptedException e) { e.printStackTrace(); } }; // 设置不公平分发 channel.basicQos(1); channel.basicConsume(LONG_QUEUE,false,deliverCallback, consumerTag -> { System.out.println(consumerTag + "消费者取消消费"); }); } }
测试目的:是否能实现能者多劳。
测试方法:两个消费者睡眠不同的事件来模拟处理事件不同,如果处理时间(睡眠时间)短的能够处理多个消息就代表目的达成。
先启动生产者创建队列,再分别启动两个消费者。
生产者按照顺序发四条消息:
睡眠时间短的线程A接收到了三条消息
而睡眠时间长的线程B只接收到的第二条消息:
因为线程B在处理消息时消耗的时间较长,所以就将其他消息分配给了线程A。
实验成功!
消息的发送和手动确认都是异步完成的,因此就存在一个未确认消息的缓冲区,开发人员希望能够限制缓冲区的大小,用来避免缓冲区里面无限制的未确认消息问题。
这里的预期值就值得是上述方法channel.basicQos();
里面的参数,如果在当前信道上存在等于参数的消息就不会在安排当前信道进行消费消息。
测试方法:
1.新建两个不同的消费者分别给定预期值5个2。
2.给睡眠时间长的指定为5,时间短的指定为2。
3.假如按照指定的预期值获取消息则表示测试成功,但并不是代表一定会按照5和2分配,这个类似于权重的判别。
代码根据上述代码修改预期值即可。
发布确认就是生产者发布消息到队列之后,队列确认进行持久化完毕再通知给生产者的过程。这样才能保证消息不会丢失。
需要注意的是需要开启队列持久化才能使用确认发布。
开启方法:channel.confirmSelect();
是一种同步发布的方式,即发送完一个消息之后只有确认它确认发布后,后续的消息才会继续发布,在指定的时间内没有确认就会抛出异常。缺点就是特别慢。
/** * @Description 确认发布——单个确认 * @date 2022/3/7 14:49 */ public class SoloProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_solo"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 单个发布确认 boolean flag = channel.waitForConfirms(); if (flag){ System.out.println("发送消息:" + i); } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
一批一批的确认发布可以提高系统的吞吐量。但是缺点是发生故障导致发布出现问题时,需要将整个批处理保存在内存中,后面再重新发布。
/** * @Description 确认发布——批量确认 * @date 2022/3/7 14:49 */ public class BatchProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_batch"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 设置一个多少一批确认一次。 int batchSize = MESSAGE_COUNT / 10; // 记录开始时间 long beginTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = ""+i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 批量发布确认 if (i % batchSize == 0){ if (channel.waitForConfirms()){ System.out.println("发送消息:" + i); } } } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
显然效率要比单个确认发布的高很多。
在编程上比上述两个要复杂,但是性价比很高,无论是可靠性还行效率的都好很多,利用回调函数来达到消息可靠性传递的。
/** * @Description 确认发布——异步确认 * @date 2022/3/7 14:49 */ public class AsyncProducer { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab,multiple) ->{ System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ System.out.println("未确认的消息:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
最好的处理方式把未确认的消息放到一个基于内存的能被发布线程访问的队列。
例如:ConcurrentLinkedQueue
可以在确认队列confirm callbacks
channel.basicPublish();
는 세 번째 매개변수를 MessageProperties.PERSISTENT_TEXT_PLAIN
으로 변경하여 지속성 메시지를 나타냅니다. ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
폴링 배분 방식은 소비자마다 처리 효율이 다를 경우 적합하지 않습니다. 그러므로 진정한 공정성은 더 많은 일을 할 수 있는 사람이 더 많은 일을 해야 한다는 전제를 따라야 한다.
불공정 분배를 활성화하도록 소비자에서channel.basicQos(1);
를 수정🎜/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
channel.basicQos();
의 매개변수입니다. 현재 채널에 매개변수와 동일한 메시지가 있는 경우 현재 채널은 소비하도록 정렬되지 않습니다. 메시지. 🎜channel.confirmSelect();
🎜🎜2.1 단일 확인 게시🎜🎜🎜는 동기 게시 방법입니다. 즉, 메시지를 보낸 후 확인 및 게시된 후에만 입니다. , 후속 메시지는 계속 게시되며 지정된 시간 내에 확인이 없으면 예외가 발생합니다. 단점은 속도가 매우 느리다는 것입니다. 🎜🎜rrreee🎜2.2 일괄 확인 릴리스🎜🎜🎜 일괄 확인 릴리스는 시스템의 처리량을 향상시킬 수 있습니다. 하지만 게시에 실패하고 문제가 발생하면 전체 배치를 메모리에 저장했다가 나중에 다시 게시해야 한다는 단점이 있습니다. 🎜🎜rrreee🎜분명히 단일 확인 릴리스보다 효율성이 훨씬 높습니다. 🎜🎜2.3 비동기 확인 릴리스🎜🎜🎜는 위의 두 가지보다 프로그래밍이 더 복잡하지만 비용 효율적이며 신뢰성이 있든 효율적이든 콜백 기능을 사용하여 안정적인 메시지 전달을 달성합니다. 🎜🎜rrreee🎜2.4 확인되지 않은 메시지 처리 🎜🎜🎜확인되지 않은 메시지를 처리하는 가장 좋은 방법은 확인되지 않은 메시지를 게시 스레드에서 액세스할 수 있는 메모리 기반 대기열에 넣는 것입니다. 🎜🎜🎜예: ConcurrentLinkedQueue
는 확인 대기열 콜백 확인
과 게시 스레드 간에 메시지를 전송할 수 있습니다. 🎜🎜🎜처리 방법: 🎜🎜🎜1. 보낼 메시지를 모두 녹음하세요. 🎜🎜2. 확인되지 않은 메시지를 인쇄하세요. 🎜🎜해시 테이블을 사용하여 메시지를 저장하면 다음과 같은 장점이 있습니다. 🎜可以将需要和消息进行关联;轻松批量删除条目;支持高并发。
ConcurrentSkipListMap<Long,String > map = new ConcurrentSkipListMap<>();
/** * @Description 异步发布确认,处理未发布成功的消息 * @date 2022/3/7 18:09 */ public class AsyncProducerRemember { private static final int MESSAGE_COUNT = 100; private static final String QUEUE_NAME = "confirm_async_remember"; public static void main(String[] args) throws Exception { Channel channel = RabbitMQUtils.getChannel(); // 产生队列 channel.queueDeclare(QUEUE_NAME,true,false,false,null); // 开启确认发布 channel.confirmSelect(); // 线程安全有序的一个hash表,适用与高并发 ConcurrentSkipListMap< Long, String > map = new ConcurrentSkipListMap<>(); // 记录开始时间 long beginTime = System.currentTimeMillis(); // 确认成功回调 ConfirmCallback ackCallback = (deliveryTab, multiple) ->{ //2. 在发布成功确认处删除; // 批量删除 if (multiple){ ConcurrentNavigableMap<Long, String> confirmMap = map.headMap(deliveryTab); confirmMap.clear(); }else { // 单独删除 map.remove(deliveryTab); } System.out.println("确认成功消息:" + deliveryTab); }; // 确认失败回调 ConfirmCallback nackCallback = (deliveryTab,multiple) ->{ // 3. 打印未确认的消息。 System.out.println("未确认的消息:" + map.get(deliveryTab) + ",标记:" + deliveryTab); }; // 消息监听器 /** * addConfirmListener: * 1. 确认成功的消息; * 2. 确认失败的消息。 */ channel.addConfirmListener(ackCallback,nackCallback); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = "" + i; channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes(StandardCharsets.UTF_8)); // 1. 记录要发送的全部消息; map.put(channel.getNextPublishSeqNo(),msg); } // 记录结束时间 long endTime = System.currentTimeMillis(); System.out.println("发送" + MESSAGE_COUNT + "条消息消耗:"+(endTime - beginTime) + "毫秒"); } }
위 내용은 Java RabbitMQ의 지속성 및 릴리스 확인 구현 방법의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!