<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> </dependency>
rocketmq: consumer: group: springboot_consumer_group # 一次拉取消息最大值,注意是拉取消息的最大值而非消费最大值 pull-batch-size: 10 name-server: 10.5.103.6:9876 producer: # 发送同一类消息的设置为同一个group,保证唯一 group: springboot_producer_group # 发送消息超时时间,默认3000 sendMessageTimeout: 10000 # 发送消息失败重试次数,默认2 retryTimesWhenSendFailed: 2 # 异步消息重试此处,默认2 retryTimesWhenSendAsyncFailed: 2 # 消息最大长度,默认1024 * 1024 * 4(默认4M) maxMessageSize: 4096 # 压缩消息阈值,默认4k(1024 * 4) compressMessageBodyThreshold: 4096 # 是否在内部发送失败时重试另一个broker,默认false retryNextServer: false
@RestController public class NormalProduceController { @Setter(onMethod_ = @Autowired) private RocketMQTemplate rocketmqTemplate; @GetMapping("/test") public SendResult test() { Message<String> msg = MessageBuilder.withPayload("Hello,RocketMQ").build(); SendResult sendResult = rocketmqTemplate.send(topic, msg); } }
RocketMQLog: AMARAN Tiada penambah boleh ditemui untuk logger (io.netty.util.internal.InternalThreadLocalMap).
RocketMQLog:WARN Sila mulakan sistem logger dengan betul.Pada masa ini kita hanya perlu menetapkan pembolehubah persekitaran dalam kelas permulaan
Untuk kebenaran, nyatakan secara eksplisit rangka kerja pengelogan RocketMQlogam:import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @Component @RocketMQMessageListener(topic = "your_topic_name", consumerGroup = "your_consumer_group_name") public class MyConsumer implements RocketMQListener<String> { @Override public void onMessage(String message) { // 处理消息的逻辑 System.out.println("Received message: " + message); } }Salin selepas log masukdan juga laraskan tahap log dalam fail konfigurasi, jika tidak, anda akan sentiasa melihat maklumat log broker dalam konsol
tahap:rocketmq.client.logUseSlf4j
RocketmqClient: ERROR
netty: ERROR
2.2 Tidak menyokong LocalDate dan LocalDateTime
Sebagai contoh, kod pengeluar adalah seperti berikut:
yang sering digunakan selepas menggunakan Java8
Kedua-dua medan jenis masa ini, bagaimanapun, konfigurasi asal RocketMQ tidak menyokong jenis masa Java Apabila mesej entiti yang kami hantar mengandungi dua medan di atas, pengguna akan mengalami ralat berikut semasa menggunakan.
@SpringBootApplication public class RocketDemoApplication { public static void main(String[] args) { /* * 指定使用的日志框架,否则将会告警 * RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.InternalThreadLocalMap). * RocketMQLog:WARN Please initialize the logger system properly. */ System.setProperty("rocketmq.client.logUseSlf4j", "true"); SpringApplication.run(RocketDemoApplication.class, args); } }
@GetMapping("/test") public void test(){ //普通消息无返回值,只负责发送消息⽽不等待服务器回应且没有回调函数触发。 RocketMessage rocketMessage = RocketMessage.builder(). id(1111L). message("hello,world") .localDate(LocalDate.now()) .localDateTime(LocalDateTime.now()) .build(); rocketmqTemplate.convertAndSend(destination,rocketMessage); }
Ralat pengecualian penukaran jenis akan berlaku apabila pengguna mula menggunakan LocalDate/LocalDateTime
, butiran ralat Seperti berikut:
Cannot construct instance of java.time.LocalDate
Penyelesaian: Anda perlu menyesuaikan penukar mesej, menggantikan MappingJackson2MessageConverter dan menambah modul masa sokongan
@Component @RocketMQMessageListener(consumerGroup = "springboot_consumer_group",topic = "consumer_topic") public class RocketMQConsumer implements RocketMQListener<RocketMessage> { @Override public void onMessage(RocketMessage message) { System.out.println("消费消息-" + message); } }
2.3 Pengasingan persekitaran RockeMQ
Apabila menggunakan RocketMQ, biasanya Tentukan topik mesej secara langsung dalam kod, dan persekitaran pembangunan dan persekitaran ujian mungkin berkongsi persekitaran RocketMQ. Jika tidak diproses, mesej yang dihantar dalam persekitaran pembangunan boleh digunakan oleh pengguna dalam persekitaran ujian, dan mesej yang dihantar dalam persekitaran ujian juga boleh digunakan oleh pengguna dalam persekitaran pembangunan, mengakibatkan kekeliruan data. Untuk menyelesaikan masalah ini, kami boleh melaksanakan pengasingan automatik mengikut persekitaran yang berbeza. Dengan hanya mengkonfigurasi pilihan untuk persekitaran yang berbeza seperti dev, test, prod, dll., semua mesej akan diasingkan secara automatik. Contohnya, apabila topik mesej yang dihantar ialah, anda boleh menambah akhiran persekitaran secara automatik selepas topik, seperti
. Jadi, bagaimana kita hendak mencapainya? Anda boleh menulis kelas konfigurasi untuk melaksanakan BeanPostProcessor dan mengatasi kaedah postProcessBeforeInitialization untuk mengubah suai topik yang sepadan sebelum tika pendengar dimulakan.consumer_topic
consumer_topic_dev
@Configuration public class RocketMQEnhanceConfig { /** * 解决RocketMQ Jackson不支持Java时间类型配置 * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} */ @Bean @Primary public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ RocketMQMessageConverter converter = new RocketMQMessageConverter(); CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); for (MessageConverter messageConverter : messageConverterList) { if(messageConverter instanceof MappingJackson2MessageConverter){ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); objectMapper.registerModules(new JavaTimeModule()); } } return converter; } }
rocketmq: enhance: # 启动隔离,用于激活配置类EnvironmentIsolationConfig # 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果 enabledIsolation: true # 隔离环境名称,拼接到topic后,topic_dev,默认空字符串 environment: dev
2023-03-23 17:04:59.726 [main] INFO o.a.r.s.support.DefaultRocketMQListenerContainer:290 - running container: DefaultRocketMQListenerContainer{consumerGroup='springboot_consumer_group', nameServer='10.5.103.6:9876', topic='consumer_topic_dev', consumeMode=CONCURRENTLY, selectorType=TAG, selectorExpression='*', messageModel=CLUSTERING}
在解释为什么要二次封装之前先来看看RocketMQ官方文档中推荐的最佳实践
消息发送成功或者失败要打印消息日志,用于业务排查问题。
如果消息量较少,建议在消费入口方法打印消息,消费耗时等,方便后续排查问题。
RocketMQ 无法避免消息重复(Exactly-Once),所以如果业务对消费重复非常敏感,务必要在业务层面进行去重处理。可以借助关系数据库进行去重。首先需要确定消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等。
上面三个步骤基本每次发送消息或者消费消息都要实现,属于重复动作。
接下来讨论的是在RocketMQ中发送消息时选择何种消息类型最为合适。
在RocketMQ中有四种可选格式:
发送Json对象
发送转Json后的String对象
根据业务封装对应实体类
直接使用原生MessageExt接收。
对于如何选择消息类型,需要考虑到消费者在不查看消息发送者的情况下,如何获取消息的含义。因此,在这种情况下,使用第三种方式即根据业务封装对应实体类的方式最为合适,也是大多数开发者在发送消息时的常用方式。
有了上面两点结论以后我们来看看为什么要对RocketMQ二次封装。
按照上述最佳实践,一个完整的消息传递链路从生产到消费应包括 准备消息、发送消息、记录消息日志、处理发送失败、记录接收消息日志、处理业务逻辑、异常处理和异常重试 等步骤。
虽然使用原生RocketMQ可以完成这些动作,但每个生产者和消费者都需要编写大量重复的代码来完成相同的任务,这就是需要进行二次封装的原因。我们希望通过二次封装,**生产者只需准备好消息实体并调用封装后的工具类发送,而消费者只需处理核心业务逻辑,其他公共逻辑会得到统一处理。 **
在二次封装中,关键是找出框架在日常使用中所涵盖的许多操作,以及区分哪些操作是可变的,哪些是不变的。以上述例子为例,实际上只有生产者的消息准备和消费者的业务处理是可变的操作,需要根据需求进行处理,而其他步骤可以固定下来形成一个模板。
当然,本文提到的二次封装不是指对源代码进行封装,而是针对工具的原始使用方式进行的封装。可以将其与Mybatis和Mybatis-plus区分开来。这两者都能完成任务,只不过Mybatis-plus更为简单便捷。
实现二次封装需要创建一个自定义的starter,这样其他项目只需要依赖此starter即可使用封装功能。同时,在自定义starter中还需要解决文章第二部分中提到的一些问题。
代码结构如下所示:
/** * 消息实体,所有消息都需要继承此类 * 公众号:JAVA日知录 */ @Data public abstract class BaseMessage { /** * 业务键,用于RocketMQ控制台查看消费情况 */ protected String key; /** * 发送消息来源,用于排查问题 */ protected String source = ""; /** * 发送时间 */ protected LocalDateTime sendTime = LocalDateTime.now(); /** * 重试次数,用于判断重试次数,超过重试次数发送异常警告 */ protected Integer retryTimes = 0; }
后面所有发送的消息实体都需要继承此实体类。
@Slf4j @RequiredArgsConstructor(onConstructor = @__(@Autowired)) public class RocketMQEnhanceTemplate { private final RocketMQTemplate template; @Resource private RocketEnhanceProperties rocketEnhanceProperties; public RocketMQTemplate getTemplate() { return template; } /** * 根据系统上下文自动构建隔离后的topic * 构建目的地 */ public String buildDestination(String topic, String tag) { topic = reBuildTopic(topic); return topic + ":" + tag; } /** * 根据环境重新隔离topic * @param topic 原始topic */ private String reBuildTopic(String topic) { if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ return topic +"_" + rocketEnhanceProperties.getEnvironment(); } return topic; } /** * 发送同步消息 */ public <T extends BaseMessage> SendResult send(String topic, String tag, T message) { // 注意分隔符 return send(buildDestination(topic,tag), message); } public <T extends BaseMessage> SendResult send(String destination, T message) { // 设置业务键,此处根据公共的参数进行处理 // 更多的其它基础业务处理... Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage); // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集 log.info("[{}]同步消息[{}]发送结果[{}]", destination, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } /** * 发送延迟消息 */ public <T extends BaseMessage> SendResult send(String topic, String tag, T message, int delayLevel) { return send(buildDestination(topic,tag), message, delayLevel); } public <T extends BaseMessage> SendResult send(String destination, T message, int delayLevel) { Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build(); SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel); log.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JSONObject.toJSON(message), JSONObject.toJSON(sendResult)); return sendResult; } }
这里封装了一个消息发送类,实现了日志记录以及自动重建topic的功能(即生产者实现环境隔离),后面项目中只需要注入RocketMQEnhanceTemplate来实现消息的发送。
@Slf4j public abstract class EnhanceMessageHandler<T extends BaseMessage> { /** * 默认重试次数 */ private static final int MAX_RETRY_TIMES = 3; /** * 延时等级 */ private static final int DELAY_LEVEL = EnhanceMessageConstant.FIVE_SECOND; @Resource private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; /** * 消息处理 * * @param message 待处理消息 * @throws Exception 消费异常 */ protected abstract void handleMessage(T message) throws Exception; /** * 超过重试次数消息,需要启用isRetry * * @param message 待处理消息 */ protected abstract void handleMaxRetriesExceeded(T message); /** * 是否需要根据业务规则过滤消息,去重逻辑可以在此处处理 * @param message 待处理消息 * @return true: 本次消息被过滤,false:不过滤 */ protected boolean filter(T message) { return false; } /** * 是否异常时重复发送 * * @return true: 消息重试,false:不重试 */ protected abstract boolean isRetry(); /** * 消费异常时是否抛出异常 * 返回true,则由rocketmq机制自动重试 * false:消费异常(如果没有开启重试则消息会被自动ack) */ protected abstract boolean throwException(); /** * 最大重试次数 * * @return 最大重试次数,默认5次 */ protected int getMaxRetryTimes() { return MAX_RETRY_TIMES; } /** * isRetry开启时,重新入队延迟时间 * @return -1:立即入队重试 */ protected int getDelayLevel() { return DELAY_LEVEL; } /** * 使用模板模式构建消息消费框架,可自由扩展或删减 */ public void dispatchMessage(T message) { // 基础日志记录被父类处理了 log.info("消费者收到消息[{}]", JSONObject.toJSON(message)); if (filter(message)) { log.info("消息id{}不满足消费条件,已过滤。",message.getKey()); return; } // 超过最大重试次数时调用子类方法处理 if (message.getRetryTimes() > getMaxRetryTimes()) { handleMaxRetriesExceeded(message); return; } try { long now = System.currentTimeMillis(); handleMessage(message); long costTime = System.currentTimeMillis() - now; log.info("消息{}消费成功,耗时[{}ms]", message.getKey(),costTime); } catch (Exception e) { log.error("消息{}消费异常", message.getKey(),e); // 是捕获异常还是抛出,由子类决定 if (throwException()) { //抛出异常,由DefaultMessageListenerConcurrently类处理 throw new RuntimeException(e); } //此时如果不开启重试机制,则默认ACK了 if (isRetry()) { handleRetry(message); } } } protected void handleRetry(T message) { // 获取子类RocketMQMessageListener注解拿到topic和tag RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class); if (annotation == null) { return; } //重新构建消息体 String messageSource = message.getSource(); if(!messageSource.startsWith(EnhanceMessageConstant.RETRY_PREFIX)){ message.setSource(EnhanceMessageConstant.RETRY_PREFIX + messageSource); } message.setRetryTimes(message.getRetryTimes() + 1); SendResult sendResult; try { // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息) sendResult = rocketMQEnhanceTemplate.send(annotation.topic(), annotation.selectorExpression(), message, getDelayLevel()); } catch (Exception ex) { // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息 //由生产者直接发送 throw new RuntimeException(ex); } // 发送失败的处理就是不进行ACK,由RocketMQ重试 if (sendResult.getSendStatus() != SendStatus.SEND_OK) { throw new RuntimeException("重试消息发送失败"); } } }
使用模版设计模式定义了消息消费的骨架,实现了日志打印,异常处理,异常重试等公共逻辑,消息过滤(查重)、业务处理则交由子类实现。
@Configuration @EnableConfigurationProperties(RocketEnhanceProperties.class) public class RocketMQEnhanceAutoConfiguration { /** * 注入增强的RocketMQEnhanceTemplate */ @Bean public RocketMQEnhanceTemplate rocketMQEnhanceTemplate(RocketMQTemplate rocketMQTemplate){ return new RocketMQEnhanceTemplate(rocketMQTemplate); } /** * 解决RocketMQ Jackson不支持Java时间类型配置 * 源码参考:{@link org.apache.rocketmq.spring.autoconfigure.MessageConverterConfiguration} */ @Bean @Primary public RocketMQMessageConverter enhanceRocketMQMessageConverter(){ RocketMQMessageConverter converter = new RocketMQMessageConverter(); CompositeMessageConverter compositeMessageConverter = (CompositeMessageConverter) converter.getMessageConverter(); List<MessageConverter> messageConverterList = compositeMessageConverter.getConverters(); for (MessageConverter messageConverter : messageConverterList) { if(messageConverter instanceof MappingJackson2MessageConverter){ MappingJackson2MessageConverter jackson2MessageConverter = (MappingJackson2MessageConverter) messageConverter; ObjectMapper objectMapper = jackson2MessageConverter.getObjectMapper(); objectMapper.registerModules(new JavaTimeModule()); } } return converter; } /** * 环境隔离配置 */ @Bean @ConditionalOnProperty(name="rocketmq.enhance.enabledIsolation", havingValue="true") public EnvironmentIsolationConfig environmentSetup(RocketEnhanceProperties rocketEnhanceProperties){ return new EnvironmentIsolationConfig(rocketEnhanceProperties); } }
public class EnvironmentIsolationConfig implements BeanPostProcessor { private RocketEnhanceProperties rocketEnhanceProperties; public EnvironmentIsolationConfig(RocketEnhanceProperties rocketEnhanceProperties) { this.rocketEnhanceProperties = rocketEnhanceProperties; } /** * 在装载Bean之前实现参数修改 */ @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { if(bean instanceof DefaultRocketMQListenerContainer){ DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; if(rocketEnhanceProperties.isEnabledIsolation() && StringUtils.hasText(rocketEnhanceProperties.getEnvironment())){ container.setTopic(String.join("_", container.getTopic(),rocketEnhanceProperties.getEnvironment())); } return container; } return bean; } }
@ConfigurationProperties(prefix = "rocketmq.enhance") @Data public class RocketEnhanceProperties { private boolean enabledIsolation; private String environment; }
<dependency> <groupId>com.jianzh6</groupId> <artifactId>cloud-rocket-starter</artifactId> </dependency>
rocketmq: ... enhance: # 启动隔离,用于激活配置类EnvironmentIsolationConfig # 启动后会自动在topic上拼接激活的配置文件,达到自动隔离的效果 enabledIsolation: true # 隔离环境名称,拼接到topic后,topic_dev,默认空字符串 environment: dev
@RestController @RequestMapping("enhance") @Slf4j public class EnhanceProduceController { //注入增强后的模板,可以自动实现环境隔离,日志记录 @Setter(onMethod_ = @Autowired) private RocketMQEnhanceTemplate rocketMQEnhanceTemplate; private static final String topic = "rocket_enhance"; private static final String tag = "member"; /** * 发送实体消息 */ @GetMapping("/member") public SendResult member() { String key = UUID.randomUUID().toString(); MemberMessage message = new MemberMessage(); // 设置业务key message.setKey(key); // 设置消息来源,便于查询 message.setSource("MEMBER"); // 业务消息内容 message.setUserName("Java日知录"); message.setBirthday(LocalDate.now()); return rocketMQEnhanceTemplate.send(topic, tag, message); } }
注意这里使用的是封装后的模板工具类,一旦在配置文件中启动环境隔离,则生产者的消息也自动发送到隔离后的topic中。
@Slf4j @Component @RocketMQMessageListener( consumerGroup = "enhance_consumer_group", topic = "rocket_enhance", selectorExpression = "*", consumeThreadMax = 5 //默认是64个线程并发消息,配置 consumeThreadMax 参数指定并发消费线程数,避免太大导致资源不够 ) public class EnhanceMemberMessageListener extends EnhanceMessageHandler<MemberMessage> implements RocketMQListener<MemberMessage> { @Override protected void handleMessage(MemberMessage message) throws Exception { // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试 System.out.println("业务消息处理:"+message.getUserName()); } @Override protected void handleMaxRetriesExceeded(MemberMessage message) { // 当超过指定重试次数消息时此处方法会被调用 // 生产中可以进行回退或其他业务操作 log.error("消息消费失败,请执行后续处理"); } /** * 是否执行重试机制 */ @Override protected boolean isRetry() { return true; } @Override protected boolean throwException() { // 是否抛出异常,false搭配retry自行处理异常 return false; } @Override protected boolean filter() { // 消息过滤 return false; } /** * 监听消费消息,不需要执行业务处理,委派给父类做基础操作,父类做完基础操作后会调用子类的实际处理类型 */ @Override public void onMessage(MemberMessage memberMessage) { super.dispatchMessage(memberMessage); } }
为了方便消费者对RocketMQ中的消息进行处理,我们可以使用EnhanceMessageHandler来进行消息的处理和逻辑的处理。
Walaupun pengguna melaksanakan RocketMQListener, mereka boleh mewarisi EnhanceMessageHandler untuk memproses logik awam, manakala logik perniagaan teras perlu melaksanakan kaedah handleMessage
dengan sendirinya. Jika anda perlu menapis atau mengalih keluar mesej pendua, anda boleh mengatasi kaedah penapis kelas induk untuk melaksanakannya. Ini menjadikannya lebih mudah untuk memproses mesej dan mengurangkan beban kerja pembangun.
Atas ialah kandungan terperinci Bagaimanakah SpringBoot menyepadukan RocketMQ?. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!