Rumah > Java > javaTutorial > Bagaimana untuk menyelesaikan masalah yang dihadapi semasa menyepadukan RocketMQ dengan SpringBoot

Bagaimana untuk menyelesaikan masalah yang dihadapi semasa menyepadukan RocketMQ dengan SpringBoot

WBOY
Lepaskan: 2023-05-19 11:25:23
ke hadapan
1805 orang telah melayarinya

Senario Aplikasi

Apabila melaksanakan penggunaan RocketMQ, anotasi @RocketMQMessageListener biasanya digunakan untuk mentakrifkan Kumpulan, Topik dan SelectorExpression (peraturan penapisan dan pemilihan data untuk menyokong penapisan data yang dinamik, ungkapan secara amnya). used , dan kemudian beralih secara dinamik melalui apollo atau konfigurasi awan.

Memperkenalkan kebergantungan

 <!-- RocketMq Spring Boot Starter-->
 <dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.4</version>
  </dependency>
Salin selepas log masuk

Kod pengguna

@RocketMQMessageListener(consumerGroup = "${rocketmq.group}",topic ="${rocketmq.topic}",selectorExpression = "${rocketmq.selectorExpression}")
public class Consumer implements RocketMQListener<String> {
    @Override
    public void onMessage(String s) {
        System.out.println("消费到的数据为:"+s);
    }
}
Salin selepas log masuk

Menyelesaikan masalah

Ekspresi pemilih lalai bagi keseluruhan anotasi RocketMQMessageListener ialah *, yang bermaksud semua data menerima Topik semasa. Jika kami ingin mengkonfigurasi teg secara dinamik, kami akan mendapati bahawa semua data ditapis apabila menggunakan ungkapan ${rocketmq.selectorExpression} Menjejaki kod sumber (ListenerContainerConfiguration.java) mendapati bahawa data selectorExpression berada dalam pembolehubah persekitaran. apabila mencipta pendengar Selepas memperoleh data yang sepadan, ia telah ditimpa, menyebabkan keseluruhan keadaan penapis ditukar kepada ungkapan.

@Override
    public void afterSingletonsInstantiated() {
    	// 获取所有所有使用了RocketMQMessageListener注解的bean
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        if (Objects.nonNull(beans)) {
        	// 循环注册容器
            beans.forEach(this::registerContainer);
        }
    }
    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
		// 校验当前bean是否实现了RocketMQListener接口
        if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " is not instance of " + RocketMQListener.class.getName());
        }
		// 获取bean上的annotation
        RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
		// 解析group及topic,可支持表达式
        String consumerGroup = this.environment.resolvePlaceholders(annotation.consumerGroup());
        String topic = this.environment.resolvePlaceholders(annotation.topic());
        boolean listenerEnabled =
            (boolean)rocketMQProperties.getConsumer().getListeners().getOrDefault(consumerGroup, Collections.EMPTY_MAP)
                .getOrDefault(topic, true);
        if (!listenerEnabled) {
            log.debug(
                "Consumer Listener (group:{},topic:{}) is not enabled by configuration, will ignore initialization.",
                consumerGroup, topic);
            return;
        }
        validate(annotation);
        String containerBeanName = String.format("%s_%s", DefaultRocketMQListenerContainer.class.getName(),
            counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext)applicationContext;
		// 注册bean的,调用createRocketMQListenerContainer
        genericApplicationContext.registerBean(containerBeanName, DefaultRocketMQListenerContainer.class,
            () -> createRocketMQListenerContainer(containerBeanName, bean, annotation));
        DefaultRocketMQListenerContainer container = genericApplicationContext.getBean(containerBeanName,
            DefaultRocketMQListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }
    private DefaultRocketMQListenerContainer createRocketMQListenerContainer(String name, Object bean,
        RocketMQMessageListener annotation) {
        DefaultRocketMQListenerContainer container = new DefaultRocketMQListenerContainer();        
        container.setRocketMQMessageListener(annotation);        
        String nameServer = environment.resolvePlaceholders(annotation.nameServer());
        nameServer = StringUtils.isEmpty(nameServer) ? rocketMQProperties.getNameServer() : nameServer;
        String accessChannel = environment.resolvePlaceholders(annotation.accessChannel());
        container.setNameServer(nameServer);
        if (!StringUtils.isEmpty(accessChannel)) {
            container.setAccessChannel(AccessChannel.valueOf(accessChannel));
        }
        container.setTopic(environment.resolvePlaceholders(annotation.topic()));
        // 此处已经根据表达式将数据取出
        String tags = environment.resolvePlaceholders(annotation.selectorExpression());
        if (!StringUtils.isEmpty(tags)) {
            container.setSelectorExpression(tags);
        }
        container.setConsumerGroup(environment.resolvePlaceholders(annotation.consumerGroup()));
        // 此处将SelectorExpression的数据覆盖成了表达式
        container.setRocketMQMessageListener(annotation);
        container.setRocketMQListener((RocketMQListener)bean);
        container.setObjectMapper(objectMapper);
        container.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
        container.setName(name);  // REVIEW ME, use the same clientId or multiple?
        return container;
    }
Salin selepas log masuk

Selesai Masalah

Oleh kerana kelas ListenerContainerConfiguration melaksanakan kaedah afterSingletonsInstantiated antara muka SmartInitializingSingleton, kami boleh menghuraikan data selectorExpression melalui refleksi dan menetapkannya semula sebelum memulakan ListenerContainerContainer.

/**
 * 在springboot初始化后,RocketMQ容器初始化前利用反射动态改变数据
**/
@Configuration
public class ChangeSelectorExpressionBeforeMQInit implements InitializingBean {
    @Autowired
    private ApplicationContext applicationContext;
    @Autowired
    private StandardEnvironment environment;
    @Override
    public void afterPropertiesSet() throws Exception {
        Map<String,Object> beans =applicationContext.getBeansWithAnnotation(RocketMQMessageListener.class);
        for (Object bean : beans.values()){
            Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
            if (!RocketMQListener.class.isAssignableFrom(bean.getClass())) {
                continue;
            }
            RocketMQMessageListener annotation = clazz.getAnnotation(RocketMQMessageListener.class);
            InvocationHandler invocationHandler = Proxy.getInvocationHandler(annotation);
            Field field = invocationHandler.getClass().getDeclaredField("memberValues");
            field.setAccessible(true);
            Map<String, Object> memberValues = (Map<String, Object>) field.get(invocationHandler);
            for (Map.Entry<String,Object> entry: memberValues.entrySet()) {
                if(Objects.nonNull(entry)){
                    memberValues.put(entry.getKey(),environment.resolvePlaceholders(String.valueOf(entry.getValue())));
                }
            }
        }
    }
}
Salin selepas log masuk

Kecuali buat kali pertama, pepijat ini telah dibetulkan dalam versi 2.1.0 pakej pergantungan. Adalah disyorkan untuk menggunakan pakej versi 2.1.0 atau lebih tinggi tanpa menyebabkan konflik pergantungan.

Atas ialah kandungan terperinci Bagaimana untuk menyelesaikan masalah yang dihadapi semasa menyepadukan RocketMQ dengan SpringBoot. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:yisu.com
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan