> Java > java지도 시간 > 본문

Java RabbitMQ 고급 기능 예시 분석

WBOY
풀어 주다: 2023-04-29 20:25:05
앞으로
848명이 탐색했습니다.

    신뢰할 수 있는 메시지 전달

    RabbitMQ를 메시지 발신자로 사용하면 메시지 손실이나 전달 실패 시나리오를 방지할 수 있습니다. RabbitMQ는 메시지의 전달 안정성 모드를 제어하는 ​​두 가지 방법을 제공합니다.

    • confirm 확인 모드

    • return return 모드

    rabbitmq 전체 메시지 전달 경로는 다음과 같습니다. 소비자

      Producer에서 exchange로 메시지가 전송되면 verifyCallback이 반환됩니다
    • 메시지가 exchange 대기열에서 전달되지 않으면 returnCallback이 반환됩니다
    • 다음을 사용할 수 있습니다. 메시지의 안정적인 전달을 제어하는 ​​두 개의 콜백

    확인 모드

    메시지는 생산자에서 교환기로 확인 콜백을 반환합니다

    스프링 통합 Rabbitmq를 예로 들어, Rabbitmq 구성 파일을 수정하고 게시자 확인 속성을 추가합니다. ConnectionFactory에서 값을 true로 설정합니다

    <!--
    * 确认模式:
    * 步骤:
    * 1. 确认模式开启:ConnectionFactory中开启publisher-confirms="true"
    -->
    <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-confirms="true"/>
    로그인 후 복사
    /*
     * 确认模式:
     * 步骤:
     * 2. 在rabbitTemplate定义ConfirmCallBack回调函数
     */
    @Test
        public void queueTest(){
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                /**
                 *
                 * @param correlationData 相关配置信息
                 * @param ack exchange交换机 是否成功收到了消息。true 成功,false代表失败
                 * @param cause 失败原因
                 */
                    System.out.println("confirm方法被执行了....");
                    if (ack) {
                        //接收成功
                        System.out.println("接收成功消息" + cause);
                    } else {
                        //接收失败
                        System.out.println("接收失败消息" + cause);
                        //做一些处理,让消息再次发送。
                    }
                }
            });
            //路由键与队列同名
            rabbitTemplate.convertAndSend("spring_queue", "message confirm....");
        }
    로그인 후 복사

    Java RabbitMQ 고급 기능 예시 분석큐에 정상적으로 메시지가 전송되므로 반환된 원인 값은 비어 있습니다. 예외가 발생하면 원인이 예외의 이유

    반환 모드

    입니다.

    교환 대기열에서 메시지가 전달되지 않으면 returnCallback이 반환됩니다

    1. 대체 모드를 켭니다. 게시자-returns=“true”

        <!-- 定义rabbitmq connectionFactory -->
        <rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}"
                                   port="${rabbitmq.port}"
                                   username="${rabbitmq.username}"
                                   password="${rabbitmq.password}"
                                   virtual-host="${rabbitmq.virtual-host}"
                                   publisher-returns="true"/>
    로그인 후 복사

    2. : setMandatory, then set ReturnCallBack

        @Test
        public void queueTest(){
            //1.设置交换机处理失败消息的模式
            rabbitTemplate.setMandatory(true);
            //2.设置ReturnCallBack
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                /**
                 * @param message    消息对象
                 * @param replyCode  错误码
                 * @param replyText  错误信息
                 * @param exchange   交换机
                 * @param routingKey 路由键
                 */
                @Override
                public void returnedMessage(Message message, int replyCode, String
                        replyText, String exchange, String routingKey) {
                    System.out.println("return 执行了....");
                    System.out.println(message);
                    System.out.println(replyCode);
                    System.out.println(replyText);
                    System.out.println(exchange);
                    System.out.println(routingKey);
                    //处理
                }
            });
            //手动添加错误路由模拟错误发生
            rabbitTemplate.convertAndSend("spring_topic_exchange", "return123", "return message...");
        }
    로그인 후 복사

    A 메시지는 오류가 발생한 경우에만 반환되므로 수동으로 오류를 추가하고 보낸 메시지에 라우팅 값 return123을 추가하면 실제로 그러한 경로가 없으며 다음을 실행하여 메시지가 반환됩니다. 다음과 같다.

    Java RabbitMQ 고급 기능 예시 분석Consumer Ack

    ack는 승인, 확인을 의미합니다. 소비자가 메시지를 받은 후 확인 방법을 나타냅니다.

    확인하는 방법은 세 가지가 있습니다.

      자동 확인: recognition="none"
    • 수동 확인: recognition="manual"
    • 이상 상황에 따라 확인: recognition="auto", ( 이렇게 하면 사용하기 번거롭고 학습할 필요가 없습니다)
    • 자동 확인은 소비자가 메시지를 받으면 자동으로 수신을 확인하고 RabbitMQ의 메시지 캐시에서 해당 메시지를 제거한다는 의미입니다. 그러나 실제 업무 처리에서는 메시지 수신 후 업무 처리 과정에서 예외가 발생하면 메시지가 유실될 가능성이 매우 높다. 수동 확인 방법이 설정된 경우에는 비즈니스 처리가 성공한 후 채널.basicAck()를 호출하고 수동으로 로그인해야 하며, 예외가 발생하면 채널.basicNack() 메서드를 호출하여 자동으로 메시지를 다시 보낼 수 있습니다.

    Spring의 Rabbitmq 통합을 예로 들어보겠습니다. 확인 방법은 Rabbitmq 구성 파일에 설정되어 있습니다.

    <rabbit:listener-container connection-factory="connectionFactory"
    acknowledge="manual">
    .....
    로그인 후 복사

    모니터링 클래스 코드는 다음과 같습니다.

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                // 3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
                //channel.basicReject(deliveryTag,true);
            }
        }
    }
    로그인 후 복사

    예외가 발생하므로 Channel.basicNack() 메서드가 호출됩니다. 자동으로 메시지를 재전송할 수 있도록 하기 위해 무한 루프 출력 Content

    Java RabbitMQ 고급 기능 예시 분석소비자 측 현재 제한

    Java RabbitMQ 고급 기능 예시 분석Rabbitmq 서버에 처리되지 않은 수만 개의 메시지 백로그가 있으면 다음에서 소비자 클라이언트를 엽니다. 그러면 다음과 같은 일이 일어날 것입니다. 엄청난 양의 메시지가 모두 즉시 푸시되지만 단일 클라이언트는 동시에 많은 양의 데이터를 처리할 수 없습니다! 때로는 동시성이 매우 크기 때문에 생산 종료 흐름을 제한할 수 없습니다. 따라서 소비자 측의 흐름을 제한해야 합니다. Rabbitmq는 qos(서비스 품질) 기능을 제공합니다. 즉, 채널에 특정 수의 메시지(Qos 값이 설정되어 있음)가 있는 경우 메시지의 비자동 확인을 전제로 합니다. 또는 소비자) 이전에 확인되지 않았으므로 새 메시지를 소비하지 마십시오.

    1. 확인 메커니즘이 수동 확인인지 확인하세요.

    2. 리스너-컨테이너 구성 속성 perfetch = 1은 소비자가 매번 소비를 위해 메시지를 가져오며 소비가 완료될 때까지 계속 가져오지 않음을 의미합니다. 메시지를 수동으로 확인했습니다.

    <rabbit:listener-container connection-factory="connectionFactory" auto-declare="true" acknowledge="manual" prefetch="1">
            <rabbit:listener ref="topicListenerACK" queue-names="spring_topic_queue_well2"/>
    </rabbit:listener-container>
    로그인 후 복사

    Producer, 5개의 메시지를 보내세요

        @Test
        public void topicTest(){
    /**
     * 参数1:交换机名称
     * 参数2:路由键名
     * 参数3:发送的消息内容
     */
            for (int i=0;i<5;i++){
                rabbitTemplate.convertAndSend("spring_topic_exchange", "xzk.a", "发送到spring_topic_exchange交换机xzk.cn的消息"+i);
            }
        }
    }
    로그인 후 복사

    Producer가 Channel.basicAck(deliveryTag,true)를 주석 처리하고 메시지 수신을 확인하지 않습니다.

    public class AckListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                // 3. 手动签收
                //channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                //4.拒绝签收
                /*
                 *第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会
                 *重新发送该消息给消费端
                 */
                channel.basicNack(deliveryTag,true,true);
            }
        }
    }
    로그인 후 복사

    Consumer를 시작하고 다시 생성자를 실행한 후 Consumer가 5개의 메시지 메시지를 보냈는데, 실제로 생산자는 1개의 메시지만 받았는데, 이는 현재 제한 효과에 도달했습니다

    Java RabbitMQ 고급 기능 예시 분석rabbitmq 콘솔을 관찰한 결과 1개의 unack 메시지가 있는 것으로 나타났습니다. 4개의 준비 메시지가 아직 소비자에게 도달하지 않았습니다. 이는 우리가 설정한 prefetchCount=1의 현재 제한 상황과 일치합니다.

    Java RabbitMQ 고급 기능 예시 분석

    把channel.basicAck(deliveryTag,true)的注释取消掉,即可以自动确认收到消息,重新运行消费者,接收到了另外的四条消息

    Java RabbitMQ 고급 기능 예시 분석

    Java RabbitMQ 고급 기능 예시 분석

    TTL(Time To Live)

    Time To Live,消息过期时间设置

    设置某个队列为过期队列

    设置交换机,队列以及队列过期时间为10000ms

     <!--ttl-->
        <rabbit:queue name="test_queue_ttl" id="test_queue_ttl">
            <rabbit:queue-arguments>
                <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            </rabbit:queue-arguments>
        </rabbit:queue>
        <rabbit:topic-exchange name="test_exchange_ttl">
            <rabbit:bindings>
                <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>
            </rabbit:bindings>
        </rabbit:topic-exchange>
    로그인 후 복사

    生产者发送10条消息

        @Test
        public void testTtl() {
            for (int i = 0; i < 10; i++) {
                rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...");
            }
    로그인 후 복사

    Java RabbitMQ 고급 기능 예시 분석

    十秒钟后,过期消息消失

    Java RabbitMQ 고급 기능 예시 분석

    设置单独某个消息过期

    设置交换机和队列

    <rabbit:queue name="test_queue_ttl" id="test_queue_ttl"/>
    <rabbit:topic-exchange name="test_exchange_ttl">
        <rabbit:bindings>
            <rabbit:binding pattern="ttl.#" queue="test_queue_ttl"/>     
        </rabbit:bindings>
    </rabbit:topic-exchange>
    로그인 후 복사

    生产者发送特定过期消息,用到了MessagePostProcessor这个api

     @Test
        public void testTtl() {
            MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //1.设置message信息
                    message.getMessageProperties().setExpiration("5000");//消息的过期时间
                    //2.返回该消息
                    return message;
                }
            };
            //消息单独过期
            rabbitTemplate.convertAndSend("test_exchange_ttl","ttl.hehe","message ttl...",messagePostProcessor);
        }
    로그인 후 복사

    Java RabbitMQ 고급 기능 예시 분석

    5s之后

    Java RabbitMQ 고급 기능 예시 분석

    注:

    1.如果同时设置队列过期和消息过期,系统会根据哪个过期的时间短而选用哪儿个。

    2.设置单独消息过期时,如果该消息不为第一个接受的消息,则不过期。

    死信队列

    死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Deadmessage后,可以被重新发送到另一个交换机,这个交换机就是DLX。

    Java RabbitMQ 고급 기능 예시 분석

    消息成为死信的三种情况:

    • 队列消息长度到达限制;

    • 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;

    • 原队列存在消息过期设置,消息到达超时时间未被消费;

    队列绑定死信交换机:

    给队列设置参数: x-dead-letter-exchange 和 x-dead-letter-routing-key

    Java RabbitMQ 고급 기능 예시 분석

    实现

    1.声明正常的队列(test_queue_dlx)和交换机(test_exchange_dlx)

    <rabbit:queue name="test_queue_dlx" id="test_queue_dlx">
        <!--正常队列绑定死信交换机-->
        <rabbit:queue-arguments>
            <!--x-dead-letter-exchange:死信交换机名称-->
            <entry key="x-dead-letter-exchange" value="exchange_dlx" />
            <!--3.2 x-dead-letter-routing-key:发送给死信交换机的routingkey-->
            <entry key="x-dead-letter-routing-key" value="dlx.hehe" />
            <!--4.1 设置队列的过期时间 ttl-->
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
            <!--4.2 设置队列的长度限制 max-length -->
            <entry key="x-max-length" value="10" value-type="java.lang.Integer" />
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="test_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="test.dlx.#" queue="test_queue_dlx">
            </rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    로그인 후 복사

    2.声明死信队列(queue_dlx)和死信交换机(exchange_dlx)

    <rabbit:queue name="queue_dlx" id="queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.#" queue="queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    로그인 후 복사

    3.生产端测试

    /**
    * 发送测试死信消息:
    * 1. 过期时间
    * 2. 长度限制
    * 3. 消息拒收
    */
    @Test
    public void testDlx(){
        //1. 测试过期时间,死信消息
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        //2. 测试长度限制后,消息死信
        /* for (int i = 0; i < 20; i++) {
        rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
        }*/
        //3. 测试消息拒收
        //rabbitTemplate.convertAndSend("test_exchange_dlx","test.dlx.haha","我是一条消息,我会死吗?");
    }
    로그인 후 복사

    4.消费端监听

    public class DlxListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
            long deliveryTag = message.getMessageProperties().getDeliveryTag();
            try {
                //1.接收转换消息
                System.out.println(new String(message.getBody()));
                //2. 处理业务逻辑
                System.out.println("处理业务逻辑...");
                int i = 3/0;//出现错误
                //3. 手动签收
                channel.basicAck(deliveryTag,true);
            } catch (Exception e) {
                //e.printStackTrace();
                System.out.println("出现异常,拒绝接受");
                //4.拒绝签收,不重回队列 requeue=false
                channel.basicNack(deliveryTag,true,false);
            }
        }
    }
    로그인 후 복사
    <rabbit:listener ref="dlxListener" queue-names="test_queue_dlx">
    </rabbit:listener>
    로그인 후 복사

    延迟队列

    延迟队列,即消息进入队列后不会立即被消费,只有到达指定时间后,才会被消费。c

    需求:

    1.下单后,30分钟未支付,取消订单,回滚库存。

    2.新用户注册成功7天后,发送短信问候。

    实现方式:

    • 定时器

    • 延迟队列

    定时器的实现方式不够优雅,我们采取延迟队列的方式

    Java RabbitMQ 고급 기능 예시 분석

    不过很可惜,在RabbitMQ中并未提供延迟队列功能。

    但是可以使用:TTL+死信队列 组合实现延迟队列的效果。

    Java RabbitMQ 고급 기능 예시 분석

    配置

    <!--
    延迟队列:
            1. 定义正常交换机(order_exchange)和队列(order_queue)
            2. 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)
            3. 绑定,设置正常队列过期时间为30分钟
    -->
    <!-- 定义正常交换机(order_exchange)和队列(order_queue)-->
    <rabbit:queue id="order_queue" name="order_queue">
    <!-- 绑定,设置正常队列过期时间为30分钟-->
        <rabbit:queue-arguments>
            <entry key="x-dead-letter-exchange" value="order_exchange_dlx" />
            <entry key="x-dead-letter-routing-key" value="dlx.order.cancel" />
            <entry key="x-message-ttl" value="10000" value-type="java.lang.Integer"/>
        </rabbit:queue-arguments>
    </rabbit:queue>
    <rabbit:topic-exchange name="order_exchange">
        <rabbit:bindings>
            <rabbit:binding pattern="order.#" queue="order_queue"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    <!-- 定义死信交换机(order_exchange_dlx)和队列(order_queue_dlx)-->
    <rabbit:queue id="order_queue_dlx" name="order_queue_dlx"></rabbit:queue>
    <rabbit:topic-exchange name="order_exchange_dlx">
        <rabbit:bindings>
            <rabbit:binding pattern="dlx.order.#" queue="order_queue_dlx"></rabbit:binding>
        </rabbit:bindings>
    </rabbit:topic-exchange>
    로그인 후 복사

    生产端测试

    @Test
    public void testDelay() throws InterruptedException {
        //1.发送订单消息。 将来是在订单系统中,下单成功后,发送消息
        rabbitTemplate.convertAndSend("order_exchange","order.msg","订单信息:id=1,time=2019年8月17日16:41:47");
        /*//2.打印倒计时10秒
        for (int i = 10; i > 0 ; i--) {
            System.out.println(i+"...");
            Thread.sleep(1000);
        }*/
    }
    로그인 후 복사

    消费端监听

    public class OrderListener implements ChannelAwareMessageListener {
        @Override
        public void onMessage(Message message, Channel channel) throws Exception {
    		long deliveryTag = message.getMessageProperties().getDeliveryTag();
    		try {
    			//1.接收转换消息
    			System.out.println(new String(message.getBody()));
    			//2. 处理业务逻辑
    			System.out.println("处理业务逻辑...");
    			System.out.println("根据订单id查询其状态...");
    			System.out.println("判断状态是否为支付成功");
    			System.out.println("取消订单,回滚库存....");
    			//3. 手动签收
    			channel.basicAck(deliveryTag,true);
    		} catch (Exception e) {
    			//e.printStackTrace();
    			System.out.println("出现异常,拒绝接受");
    			//4.拒绝签收,不重回队列 requeue=false
    			channel.basicNack(deliveryTag,true,false);
    		}
    	}
    }
    로그인 후 복사
    <rabbit:listener ref="orderListener" queue-names="order_queue_dlx">
    </rabbit:listener>
    로그인 후 복사

    위 내용은 Java RabbitMQ 고급 기능 예시 분석의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

    관련 라벨:
    원천:yisu.com
    본 웹사이트의 성명
    본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
    인기 튜토리얼
    더>
    최신 다운로드
    더>
    웹 효과
    웹사이트 소스 코드
    웹사이트 자료
    프론트엔드 템플릿