springboot+rockermq는 간단한 메시지 송수신을 실현합니다
일반 메시지를 보내는 방법에는 단방향 전송, 동기 전송, 비동기 전송의 세 가지가 있습니다.
다음은 일반 메시지의 송수신을 실현하기 위한 springboot+rockermq의 통합을 소개합니다
Springboot 프로젝트 생성 및 rockermq 종속성 추가
<!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.1</version> </dependency>
rocketmq 구성
# 포트
server :
포트: 8083# 로켓mq 구성
rocketmq:
name-server: 127.0.0.1:9876
#Producer
producer:
#Producer 그룹 이름은 애플리케이션에서 고유해야 합니다
그룹: group1
#Timeout for 메시지 보내기 기본 시간은 3000ms
send-message-timeout: 3000
# 메시지가 4096바이트에 도달하면 메시지가 압축됩니다. 기본값은 4096
압축 메시지 본문 임계값: 4096
#최대 메시지 제한, 기본값은 128K
최대 메시지 크기: 4194304
# 실패한 동기화 메시지 전송에 대한 재시도 횟수
retry-times-when-send-failed : 3
#내부 전송 실패 시 다른 에이전트를 재시도할지 여부, 이 매개변수는 브로커가 여러 개 있는 경우에만 적용됩니다.
retry-next-server: true
#비동기 메시지 전송 실패 시 재시도 횟수 retry-times-when-send -async-failed: 3
package com.example.springbootrocketdemo.controller; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * 普通信息的三种方式:同步、异步、单向 * @author qzz */ @RestController public class RocketMQCOntroller { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 * convertAndSend(String destination, Object payload) 发送字符串比较方便 */ @RequestMapping("/send") public void send(){ rocketMQTemplate.convertAndSend("test-topic","test-message"); } /** * 发送同步消息 */ @RequestMapping("/testSyncSend") public void testSyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 SendResult sendResult = rocketMQTemplate.syncSend("test-topic","同步消息测试"); System.out.println(sendResult); } /** * 发送异步消息 */ @RequestMapping("/testASyncSend") public void testASyncSend(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 //参数三:回调 rocketMQTemplate.asyncSend("test-topic", "异步消息测试", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } @Override public void onException(Throwable throwable) { System.out.println("消息发送异常"); throwable.printStackTrace(); } }); } /** * 发送单向消息 */ @RequestMapping("/testOneWay") public void testOneWay(){ //参数一:topic 如果想添加tag,可以使用"topic:tag"的写法 //参数二:消息内容 rocketMQTemplate.sendOneWay("test-topic","单向消息测试"); } }
package com.example.springbootrocketdemo.config; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * 消费消息 * 配置RocketMQ监听 * @author qzz */ @Service @RocketMQMessageListener(consumerGroup = "test",topic = "test-topic") public class RocketMQConsumerListener implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费消息:"+s); } }
RocketMQListener
인터페이스를 구현하고 메시지 유형 문자열을 동적으로 지정해야 합니다. RocketMQListener
接口,以及动态指定消息类型String。
类上要加上@RocketMQMessageListener注解
클래스에 @RocketMQMessageListener 주석
을 추가하고 토픽 토픽 test-topic을 지정하고 소비자 그룹 테스트
간단한 메시지 송수신이 완료됩니다!
서비스 시작 및 메시지 소비 테스트
동기 메시지 테스트:
비동기 메시지 테스트:
단방향 메시지 테스트:
위 내용은 RocketMQ는 Springboot에서 메시지 전송 및 수신을 어떻게 구현합니까?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!