When implementing RocketMQ consumption, the @RocketMQMessageListener annotation is generally used to define Group, Topic and selectorExpression (data filtering and selection rules). In order to support dynamic filtering of data, expressions are generally used. , and then dynamically switch through apollo or cloud config.
<!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.0.4</version> </dependency>
@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}") public class Consumer implements RocketMQListener<String> { @Override public void onMessage(String s) { System.out.println("消费到的数据为:"+s); } }
The default selectorExpression of the entire annotation of RocketMQMessageListener is *, which means receiving all data under the current Topic. If we want to dynamically configure tags, we will find that all data is filtered when using the ${rocketmq.selectorExpression} expression. Tracking the source code (ListenerContainerConfiguration.java) found that the selectorExpression data is in the environment environment variable when creating the listener. After obtaining the corresponding data, it was overwritten, causing the entire filter condition to be changed to an expression.
@Override public void afterSingletonsInstantiated() { // 获取所有所有使用了RocketMQMessageListener注解的bean Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); if (Objects.nonNull(beans)) { // 循环注册容器 beans.forEach(this::registerContainer); } } private void registerContainer(String beanName, Object bean) { Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); // 校验当前bean是否实现了RocketMQListener接口 if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName()); } // 获取bean上的annotation RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); // 解析group及topic,可支持表达式 String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup()); String topic = this.environment.resolvePlaceholders(annotation.topic()); boolean listenerEnabled = (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP) .getOrDefault(topic, true); if (!listenerEnabled) { log.debug( "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.", consumerGroup, topic); return; } validate(annotation); String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(), counter.incrementAndGet()); GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext; // 注册bean的,调用createRocketMQListenerContainer genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class, () -> createRocketMQListenerContainer(containerBeanName, bean, annotation)); DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName, DefaultRocketMQListenerContainer.class); if (!container.isRunning()) { try { container.start(); } catch (Exception e) { log.error("Started container failed. {}", container, e); throw new RuntimeException(e); } } log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName); } private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) { DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer(); container.setRocketMQMessageListener(annotation); String nameServer = environment.resolvePlaceholders(annotation.nameServer()); nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer; String accessChannel = environment.resolvePlaceholders(annotation.accessChannel()); container.setNameServer(nameServer); if (!StringUtils.isEmpty(accessChannel)) { container.setAccessChannel(AccessChannel.valueOf(accessChannel)); } container.setTopic(environment.resolvePlaceholders(annotation.topic())); // 此处已经根据表达式将数据取出 String tags = environment.resolvePlaceholders(annotation.selectorExpression()); if (!StringUtils.isEmpty(tags)) { container.setSelectorExpression(tags); } container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup())); // 此处将SelectorExpression的数据覆盖成了表达式 container.setRocketMQMessageListener(annotation); container.setRocketMQListener((RocketMQListener)bean); container.setObjectMapper(objectMapper); container.setMessageConverter(rocketMQMessageConverter.getMessageConverter()); container.setName(name); // REVIEW ME, use the same clientId or multiple? return container; }
Because the ListenerContainerConfiguration class implements the afterSingletonsInstantiated method of the SmartInitializingSingleton interface, we can parse the selectorExpression data through reflection and assign it back before initializing the ListenerContainerConfiguration.
/** * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据 **/ @Configuration public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean { @Autowired private ApplicationContext applicationContext; @Autowired private StandardEnvironment environment; @Override public void afterPropertiesSet() throws Exception { Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class); for (Object bean : beans.values()){ Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean); if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) { continue; } RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class); InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation); Field field = invocationHandler.getClass().getDeclaredField("memberValues"); field.setAccessible(true); Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler); for (Map.Entry<String,Object> entry: memberValues.entrySet()) { if(Objects.nonNull(entry)){ memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue()))); } } } } }
Except for the first time, this bug has been fixed in the 2.1.0 version of the dependency package. On the premise of not causing dependency conflicts, it is recommended to use the 2.1.0 or above version package.
The above is the detailed content of How to solve the pitfalls encountered when integrating RocketMQ with SpringBoot. For more information, please follow other related articles on the PHP Chinese website!