利用rabbit mq.模拟dubbo,使MQ异步调用
最近在改造老系统,遇到了需要使用rabbitMq的场景.在以前使用的过程中需要在发送端和消费端各种配置,感觉比较麻烦,然后突然想到了dubbo中@Reference注解的形式,可不可以做一个类似的架子,这样调用MQ的时候就像调用同步接口一样方便简单呢?于是查了相关资料和看了dubbo的源码,之后就有了思路.
总的来说,要实现的目标就是像dubbo一样,消费端暴露接口(甚至可以复用dubbo服务定义的接口,这样写一个dubbo服务即可同步也可MQ异步),发送端通过自定义的注解注入对象调用方法,通过框架内部处理之后转换成异步mq形式发送到消费端.
比如服务端有接口:
public interface MqDemoService { void dealById(Long id); }
有实现:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
其中:
@Slf4j是lombok注解 @Service是dubbo服务端注解
有兴趣的同学自行查阅
然后是发送端
有自定义注解:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
于是在调用的controller中:
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
注意Controller中@AsyncInvoker注解的属性mqDemoService,通过这个注解注入的对象调用方法的时候会通过mq发送变为异步调用.
好了,要实现的目标很清晰了.那么要解决的问题就是以下几个方面了:
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
1,如何确定发送消息的格式,使接收端可以确定调用的方法
这里我先按照java反射调用需要的参数简单定义了一个传输对象:
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
在这个场景中,发送端是只会引入消费端的接口,不会引入实现的.那么@AsyncInvoker如何注入对象呢?
答案就是动态代理.
那么还有如何让Spring知道@AsyncInvoker注解的对象要注入动态代理呢?
答案就是spring的BeanPostProcessor接口了!这个接口允许spring在处理对象创建的前后插入用户自己定义的逻辑,在这里就不细细展开了,有需要的同学自行google/百度了哈.
那么思路出来了,代码如下:
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
3,接收端中如何在接收到消息后调用对应接口的实现方法
接收端调用对应接口就很简单了,只需要拿到MqMethodMeta对象进行反射调用就好了,直接上代码:
@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
4,多个消费服务如何区分mq队列.
这里就使用到了rabbit的topic类型exchange.
首先对消费端listener中的queue和routekey进行可配置话管理:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
注意这里的
${demo.mq.method.queue} ${demo.mq.method.routekey}
是从配置文件中读取出来的:
比如系统1中是如下配置:
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
系统2中是如下配置:
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
再看发送端中那段代码:
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
这里面的clz.getName(). 由于我们系统是有良好的分包策略,所以系统1的clz.getName()一定是以com.demo.service.project1为开头的.一定会发送到project1中的listener.比如clz.getName()值为com.demo.service.project1.MqDemoService (".#"匹配后面多个标示符,此为rabbitMQ中topic类型exchange的特性).
至此,一开始想要达成的目标已经达成.今后需要用mq做异步调用的时候可以像同步方法一样使用了.
对于mq在spring中的使用在此就不详细列举了,可以参考文档:
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
稍后会提供一套demo代码出来供记录和参考
总结
目前这套方法中还是存在一些问题的.比如:
1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考. 2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
第一个问题等需要用到对应业务后再做考虑吧.或者有思路的通知可以探讨一下.
第二个问题主要考虑的是如果消费端更改了参数类型或者其他之类的情况下,重新发布后,对于可能残留在mq中的老消息的兼容.这个目前确实没有什么好思路,抛出来也是为了集思广益了.
Atas ialah kandungan terperinci 利用rabbit mq.模拟dubbo,使MQ异步调用. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



Cara menggunakan fail mdf dan fail mds Dengan kemajuan teknologi komputer yang berterusan, kami boleh menyimpan dan berkongsi data dalam pelbagai cara. Dalam bidang media digital, kita sering menghadapi beberapa format fail khas. Dalam artikel ini, kami akan membincangkan format fail biasa - fail mdf dan mds, dan memperkenalkan cara menggunakannya. Pertama, kita perlu memahami maksud fail mdf dan fail mds. mdf ialah lanjutan fail imej CD/DVD, dan fail mds ialah fail metadata bagi fail mdf.

CrystalDiskMark ialah alat penanda aras HDD kecil untuk pemacu keras yang cepat mengukur kelajuan baca/tulis berurutan dan rawak. Seterusnya, biarkan editor memperkenalkan CrystalDiskMark kepada anda dan cara menggunakan crystaldiskmark~ 1. Pengenalan kepada CrystalDiskMark CrystalDiskMark ialah alat ujian prestasi cakera yang digunakan secara meluas yang digunakan untuk menilai kelajuan baca dan tulis serta prestasi pemacu keras mekanikal dan pemacu keadaan pepejal (SSD Prestasi I/O rawak. Ia adalah aplikasi Windows percuma dan menyediakan antara muka mesra pengguna dan pelbagai mod ujian untuk menilai aspek prestasi cakera keras yang berbeza dan digunakan secara meluas dalam ulasan perkakasan

foobar2000 ialah perisian yang boleh mendengar sumber muzik pada bila-bila masa Ia membawakan anda semua jenis muzik dengan kualiti bunyi tanpa kehilangan Versi pemain muzik yang dipertingkatkan membolehkan anda mendapatkan pengalaman muzik yang lebih komprehensif dan selesa mainkan audio lanjutan pada komputer Peranti dipindahkan ke telefon mudah alih untuk memberikan pengalaman main balik muzik yang lebih mudah dan cekap Reka bentuk antara muka adalah ringkas, jelas dan mudah digunakan Ia menggunakan gaya reka bentuk minimalis tanpa terlalu banyak hiasan dan operasi yang menyusahkan untuk bermula dengan cepat. Ia juga menyokong pelbagai kulit dan Tema, memperibadikan tetapan mengikut pilihan anda sendiri, dan mencipta pemain muzik eksklusif yang menyokong main balik berbilang format audio. Ia juga menyokong fungsi perolehan audio untuk melaraskan kelantangan kepada keadaan pendengaran anda sendiri untuk mengelakkan kerosakan pendengaran yang disebabkan oleh kelantangan yang berlebihan. Seterusnya, izinkan saya membantu anda

Awal bulan ini, YouTuber bernama HowToMen mempamerkan Rabbit R1 yang menjalankan Android. Dengan mod ini, peranti yang pada asalnya dimaksudkan untuk menjadi kurang mengganggu daripada telefon dapat berfungsi seperti satu. Itu bukan perkara yang buruk, walaupun, kerana Arnab tidak

Storan awan telah menjadi bahagian yang amat diperlukan dalam kehidupan dan kerja harian kita pada masa kini. Sebagai salah satu perkhidmatan storan awan terkemuka di China, Baidu Netdisk telah memenangi hati sebilangan besar pengguna dengan fungsi storan yang berkuasa, kelajuan penghantaran yang cekap dan pengalaman operasi yang mudah. Dan sama ada anda ingin menyandarkan fail penting, berkongsi maklumat, menonton video dalam talian atau mendengar muzik, Baidu Cloud Disk boleh memenuhi keperluan anda. Walau bagaimanapun, ramai pengguna mungkin tidak memahami penggunaan khusus aplikasi Baidu Netdisk, jadi tutorial ini akan memperkenalkan anda tentang cara menggunakan aplikasi Baidu Netdisk secara terperinci Jika anda masih keliru, sila ikuti artikel ini untuk mengetahui lebih lanjut! Cara menggunakan Cakera Rangkaian Awan Baidu: 1. Pemasangan Mula-mula, semasa memuat turun dan memasang perisian Baidu Cloud, sila pilih pilihan pemasangan tersuai.

NetEase Mailbox, sebagai alamat e-mel yang digunakan secara meluas oleh netizen Cina, sentiasa memenangi kepercayaan pengguna dengan perkhidmatannya yang stabil dan cekap. NetEase Mailbox Master ialah perisian e-mel yang dicipta khas untuk pengguna telefon mudah alih. Ia sangat memudahkan proses menghantar dan menerima e-mel dan menjadikan pemprosesan e-mel kami lebih mudah. Jadi bagaimana untuk menggunakan NetEase Mailbox Master, dan apakah fungsi khusus yang ada di bawah, editor tapak ini akan memberi anda pengenalan terperinci, dengan harapan dapat membantu anda. Mula-mula, anda boleh mencari dan memuat turun aplikasi NetEase Mailbox Master di gedung aplikasi mudah alih. Cari "Induk Peti Mel NetEase" dalam App Store atau Baidu Mobile Assistant, dan kemudian ikut gesaan untuk memasangnya. Selepas muat turun dan pemasangan selesai, kami membuka akaun e-mel NetEase dan log masuk. Antara muka log masuk adalah seperti yang ditunjukkan di bawah

MetaMask (juga dipanggil Little Fox Wallet dalam bahasa Cina) ialah perisian dompet penyulitan percuma dan diterima baik. Pada masa ini, BTCC menyokong pengikatan pada dompet MetaMask Selepas mengikat, anda boleh menggunakan dompet MetaMask untuk log masuk dengan cepat, menyimpan nilai, membeli syiling, dsb., dan anda juga boleh mendapatkan bonus percubaan 20 USDT untuk pengikatan pertama. Dalam tutorial dompet BTCCMetaMask, kami akan memperkenalkan secara terperinci cara mendaftar dan menggunakan MetaMask, dan cara mengikat dan menggunakan dompet Little Fox dalam BTCC. Apakah dompet MetaMask? Dengan lebih 30 juta pengguna, MetaMask Little Fox Wallet ialah salah satu dompet mata wang kripto yang paling popular hari ini. Ia percuma untuk digunakan dan boleh dipasang pada rangkaian sebagai sambungan

Selepas lama menekan butang main pembesar suara, sambungkan ke wifi dalam perisian dan anda boleh menggunakannya. Tutorial Model Berkenaan: Xiaomi 12 Sistem: EMUI11.0 Versi: Xiaoai Classmate 2.4.21 Analisis 1 Mula-mula cari butang main pembesar suara, dan tekan dan tahan untuk memasuki mod pengedaran rangkaian. 2 Log masuk ke akaun Xiaomi anda dalam perisian Xiaoai Speaker pada telefon anda dan klik untuk menambah Speaker Xiaoai baharu. 3. Selepas memasukkan nama dan kata laluan wifi, anda boleh menghubungi Xiao Ai untuk menggunakannya. Tambahan: Apakah fungsi yang ada pada Xiaoai Speaker 1 Xiaoai Speaker mempunyai fungsi sistem, fungsi sosial, fungsi hiburan, fungsi pengetahuan, fungsi kehidupan, rumah pintar dan rancangan latihan. Ringkasan/Nota: Apl Xiao Ai mesti dipasang pada telefon mudah alih anda terlebih dahulu untuk sambungan dan penggunaan yang mudah.
