目录
1,如何确定发送消息的格式,使接收端可以确定调用的方法
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
在这个场景中,发送端是只会引入消费端的接口,不会引入实现的.那么@AsyncInvoker如何注入对象呢?
那么还有如何让Spring知道@AsyncInvoker注解的对象要注入动态代理呢?
3,接收端中如何在接收到消息后调用对应接口的实现方法
4,多个消费服务如何区分mq队列.
总结
首页 Java java教程 利用rabbit mq.模拟dubbo,使MQ异步调用

利用rabbit mq.模拟dubbo,使MQ异步调用

Jun 26, 2017 am 11:22 AM
dubbo Rabbit 使用 异步 模拟

最近在改造老系统,遇到了需要使用rabbitMq的场景.在以前使用的过程中需要在发送端和消费端各种配置,感觉比较麻烦,然后突然想到了dubbo中@Reference注解的形式,可不可以做一个类似的架子,这样调用MQ的时候就像调用同步接口一样方便简单呢?于是查了相关资料和看了dubbo的源码,之后就有了思路.

总的来说,要实现的目标就是像dubbo一样,消费端暴露接口(甚至可以复用dubbo服务定义的接口,这样写一个dubbo服务即可同步也可MQ异步),发送端通过自定义的注解注入对象调用方法,通过框架内部处理之后转换成异步mq形式发送到消费端.

比如服务端有接口:

public interface MqDemoService {
    void dealById(Long id);
}
登录后复制

有实现:

@Slf4j
@Component("mqDemoServiceImpl")
@Service(version = "1.0.0")
public class MqDemoServiceImpl implements MqDemoService {
    @Override
    public void dealById(Long id) {
        log.info("执行findById方法");
    }
}
登录后复制

其中:

@Slf4j是lombok注解
@Service是dubbo服务端注解
登录后复制

有兴趣的同学自行查阅

然后是发送端

有自定义注解:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface AsyncInvoker {
}
登录后复制

于是在调用的controller中:

@Slf4j
@Controller
public class MqDemoController {
  @AsyncInvoker
  private MqDemoService mqDemoService;

  @RequestMapping(value = "/deal", method = RequestMethod.POST)
  public void deal() {
    mqDemoService.dealById(1L);
  }
}
登录后复制

注意Controller中@AsyncInvoker注解的属性mqDemoService,通过这个注解注入的对象调用方法的时候会通过mq发送变为异步调用.

好了,要实现的目标很清晰了.那么要解决的问题就是以下几个方面了:

1,如何确定发送消息的格式,使消费端可以确定调用的方法
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
3,接收端中如何在接收到消息后调用对应接口的实现方法
4,多个消费服务如何区分mq队列.
登录后复制

1,如何确定发送消息的格式,使接收端可以确定调用的方法

这里我先按照java反射调用需要的参数简单定义了一个传输对象:

@Data
public class MqMethodMeta {
  //调用的接口名称(包括包名,用于反射)
  private String interfaceName;
  //调用的方法名
  private String methodName;
  //调用的方法的参数
  private Object[] args;
  //调用的方法的参数类型
  private String[] paramTypeNames;
}
登录后复制

2,发送端中如何为注解@AsyncInvoker注释的对象注入实例

在这个场景中,发送端是只会引入消费端的接口,不会引入实现的.那么@AsyncInvoker如何注入对象呢?

答案就是动态代理.

那么还有如何让Spring知道@AsyncInvoker注解的对象要注入动态代理呢?

答案就是spring的BeanPostProcessor接口了!这个接口允许spring在处理对象创建的前后插入用户自己定义的逻辑,在这里就不细细展开了,有需要的同学自行google/百度了哈.

那么思路出来了,代码如下:

@Slf4j
@Component
public class AsyncInvokerBeanProcessor implements BeanPostProcessor {
  //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用.
  private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>();

  //注入spring amqp处理mq的对象
  @Autowired
  private RabbitTemplate rabbitTemplate;

  //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理.
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    //获取该实例中的有@AsyncInvoker注解的field
    Field[] fields = bean.getClass().getDeclaredFields();
    for (Field field : fields) {
      try {
        if (!field.isAccessible()) {
          field.setAccessible(true);
        }
        AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class);
        if (asyncInvoker != null) {
          //创建代理对象,赋值给该feild
          Object value = createProxy(field.getType());
          if (value != null) {
            field.set(bean, value);
          }
        }
      } catch (Throwable e) {
        log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e);
      }
    }
    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  private Object createProxy(Class clz) {
    String interfaceName;
    if (clz.isInterface()) {
      interfaceName = clz.getName();
    } else {
      throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface.");
    }

    Object proxy = proxyMap.get(interfaceName);
    if (proxy == null) {
      Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          log.debug("执行动态代理! method:{} ,args: {}", method, args);
          if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) {
            throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法");
          }
          //动态代理中创建mq传输对象并发送.
          MqMethodMeta mqMethodMeta = new MqMethodMeta();
          mqMethodMeta.setInterfaceName(clz.getName());
          mqMethodMeta.setMethodName(method.getName());
          mqMethodMeta.setArgs(args);
          String[] paramTypeNames = new String[args.length];
          for (int i = 0; i < args.length; i++) {
            paramTypeNames[i] = args[i].getClass().getName();
          }
          mqMethodMeta.setParamTypeNames(paramTypeNames);
          RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());
          Exchange exchange = new TopicExchange("exchange.demo.web.adaptor");
          admin.declareExchange(exchange);
          //关注此处clz.getName(),用于处理问题4
          rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
          return null;
        }
      });
      proxyMap.putIfAbsent(interfaceName, newProxy);
      proxy = proxyMap.get(interfaceName);
    }
    return proxy;
  }
}
登录后复制

3,接收端中如何在接收到消息后调用对应接口的实现方法

接收端调用对应接口就很简单了,只需要拿到MqMethodMeta对象进行反射调用就好了,直接上代码:

@Slf4j
public class AsyncMethodListener implements ApplicationContextAware {
  private ApplicationContext applicationContext;

  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
  public void messageHandle(@Payload MqMethodMeta message) {
    try {
      log.info("收到message: {}", message);
      Class clz = Class.forName(message.getInterfaceName());
      String methodName = message.getMethodName();
      Object[] args = message.getArgs();
      Class[] paramTypes = new Class[message.getParamTypeNames().length];
      for (int i = 0; i < message.getParamTypeNames().length; i++) {
        paramTypes[i] = Class.forName(message.getParamTypeNames()[i]);
      }

      //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下
      for (int i = 0; i < args.length; i++) {
        Class c = paramTypes[i];
        if (args[i] instanceof Integer && c.equals(Long.class)) {
          args[i] = ((Integer) args[i]).longValue();
        }
      }
      //拿到spring管理的对应接口的实现
      Object invoker = applicationContext.getBean(clz);
      Method method = clz.getMethod(methodName, paramTypes);
      method.invoke(invoker, args);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}
登录后复制

4,多个消费服务如何区分mq队列.

这里就使用到了rabbit的topic类型exchange.
首先对消费端listener中的queue和routekey进行可配置话管理:

@RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
登录后复制

注意这里的

${demo.mq.method.queue}
${demo.mq.method.routekey}
登录后复制

是从配置文件中读取出来的:

比如系统1中是如下配置:

demo.mq.method.queue=com.demo.service.project1.#
demo.mq.method.routekey=com.demo.service.project1.#
登录后复制

系统2中是如下配置:

demo.mq.method.queue=com.demo.service.project2.#
demo.mq.method.routekey=com.demo.service.project2.#
登录后复制

再看发送端中那段代码:

//关注此处clz.getName(),用于处理问题4
rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
登录后复制

这里面的clz.getName(). 由于我们系统是有良好的分包策略,所以系统1的clz.getName()一定是以com.demo.service.project1为开头的.一定会发送到project1中的listener.比如clz.getName()值为com.demo.service.project1.MqDemoService (".#"匹配后面多个标示符,此为rabbitMQ中topic类型exchange的特性).

至此,一开始想要达成的目标已经达成.今后需要用mq做异步调用的时候可以像同步方法一样使用了.

对于mq在spring中的使用在此就不详细列举了,可以参考文档:

http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
登录后复制

稍后会提供一套demo代码出来供记录和参考

总结

目前这套方法中还是存在一些问题的.比如:

1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考.
2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
登录后复制

第一个问题等需要用到对应业务后再做考虑吧.或者有思路的通知可以探讨一下.

第二个问题主要考虑的是如果消费端更改了参数类型或者其他之类的情况下,重新发布后,对于可能残留在mq中的老消息的兼容.这个目前确实没有什么好思路,抛出来也是为了集思广益了.

以上是利用rabbit mq.模拟dubbo,使MQ异步调用的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

如何使用mdf和mds文件 如何使用mdf和mds文件 Feb 19, 2024 pm 05:36 PM

mdf文件和mds文件怎么用随着计算机技术的不断进步,我们可以通过多种方式来存储和共享数据。在数字媒体领域,我们经常会遇到一些特殊的文件格式。在这篇文章中,我们将讨论一种常见的文件格式——mdf和mds文件,并介绍它们的使用方法。首先,我们需要了解mdf文件和mds文件的含义。mdf是CD/DVD镜像文件的扩展名,而mds文件则是mdf文件的元数据文件。

crystaldiskmark是什么软件?-crystaldiskmark如何使用? crystaldiskmark是什么软件?-crystaldiskmark如何使用? Mar 18, 2024 pm 02:58 PM

CrystalDiskMark是一款适用于硬盘的小型HDD基准测试工具,可以快速测量顺序和随机读/写速度。接下来就让小编为大家介绍一下CrystalDiskMark,以及crystaldiskmark如何使用吧~一、CrystalDiskMark介绍CrystalDiskMark是一款广泛使用的磁盘性能测试工具,用于评估机械硬盘和固态硬盘(SSD)的读写速度和随机I/O性能。它是一款免费的Windows应用程序,并提供用户友好的界面和各种测试模式来评估硬盘驱动器性能的不同方面,并被广泛用于硬件评

foobar2000怎么下载?-foobar2000怎么使用 foobar2000怎么下载?-foobar2000怎么使用 Mar 18, 2024 am 10:58 AM

foobar2000是一款能随时收听音乐资源的软件,各种音乐无损音质带给你,增强版本的音乐播放器,让你得到更全更舒适的音乐体验,它的设计理念是将电脑端的高级音频播放器移植到手机上,提供更加便捷高效的音乐播放体验,界面设计简洁明了易于使用它采用了极简的设计风格,没有过多的装饰和繁琐的操作能够快速上手,同时还支持多种皮肤和主题,根据自己的喜好进行个性化设置,打造专属的音乐播放器支持多种音频格式的播放,它还支持音频增益功能根据自己的听力情况调整音量大小,避免过大的音量对听力造成损害。接下来就让小编为大

Rabbit R1经过改装可以运行游戏,性能比预期更好 Rabbit R1经过改装可以运行游戏,性能比预期更好 Jun 29, 2024 am 07:50 AM

本月早些时候,一位名为 HowToMen 的 YouTuber 展示了运行 Android 的 Rabbit R1。有了这个模组,原本比手机更分散注意力的设备就可以像手机一样发挥作用了。不过,这并不是一件坏事,因为兔子没有

网易邮箱大师怎么用 网易邮箱大师怎么用 Mar 27, 2024 pm 05:32 PM

网易邮箱,作为中国网民广泛使用的一种电子邮箱,一直以来以其稳定、高效的服务赢得了用户的信赖。而网易邮箱大师,则是专为手机用户打造的邮箱软件,它极大地简化了邮件的收发流程,让我们的邮件处理变得更加便捷。那么网易邮箱大师该如何使用,具体又有哪些功能呢,下文中本站小编将为大家带来详细的内容介绍,希望能帮助到大家!首先,您可以在手机应用商店搜索并下载网易邮箱大师应用。在应用宝或百度手机助手中搜索“网易邮箱大师”,然后按照提示进行安装即可。下载安装完成后,我们打开网易邮箱账号并进行登录,登录界面如下图所示

百度网盘app怎么用 百度网盘app怎么用 Mar 27, 2024 pm 06:46 PM

在如今云存储已经成为我们日常生活和工作中不可或缺的一部分。百度网盘作为国内领先的云存储服务之一,凭借其强大的存储功能、高效的传输速度以及便捷的操作体验,赢得了广大用户的青睐。而且无论你是想要备份重要文件、分享资料,还是在线观看视频、听取音乐,百度网盘都能满足你的需求。但是很多用户们可能对百度网盘app的具体使用方法还不了解,那么这篇教程就将为大家详细介绍百度网盘app如何使用,还有疑惑的用户们就快来跟着本文详细了解一下吧!百度云网盘怎么用:一、安装首先,下载并安装百度云软件时,请选择自定义安装选

BTCC教学:如何在BTCC交易所绑定使用MetaMask钱包? BTCC教学:如何在BTCC交易所绑定使用MetaMask钱包? Apr 26, 2024 am 09:40 AM

MetaMask(中文也叫小狐狸钱包)是一款免费的、广受好评的加密钱包软件。目前,BTCC已支持绑定MetaMask钱包,绑定后可使用MetaMask钱包进行快速登入,储值、买币等,且首次绑定还可获得20USDT体验金。在BTCCMetaMask钱包教学中,我们将详细介绍如何注册和使用MetaMask,以及如何在BTCC绑定并使用小狐狸钱包。MetaMask钱包是什么?MetaMask小狐狸钱包拥有超过3,000万用户,是当今最受欢迎的加密货币钱包之一。它可免费​​使用,可作为扩充功能安装在网络

小爱音箱怎么使用 小爱音箱怎么连接手机 小爱音箱怎么使用 小爱音箱怎么连接手机 Feb 22, 2024 pm 05:19 PM

长按音箱的播放键后,在软件中连接wifi即可使用。教程适用型号:小米12系统:EMUI11.0版本:小爱同学2.4.21解析1首先找到音箱的播放键,长按进入配网模式。2在手机上的小爱音箱软件中登录小米账号,点击添加新的小爱音箱。3输入wifi的名称和密码后,即可呼唤小爱同学进行使用了。补充:小爱音箱有什么功能1小爱音箱有系统功能、社交功能、娱乐功能、知识功能、生活功能、智能家庭、训练计划。总结/注意事项手机要提前安装好小爱同学APP,方便连接和使用。

See all articles