最近在改造老系统,遇到了需要使用rabbitMq的场景.在以前使用的过程中需要在发送端和消费端各种配置,感觉比较麻烦,然后突然想到了dubbo中@Reference注解的形式,可不可以做一个类似的架子,这样调用MQ的时候就像调用同步接口一样方便简单呢?于是查了相关资料和看了dubbo的源码,之后就有了思路.
总的来说,要实现的目标就是像dubbo一样,消费端暴露接口(甚至可以复用dubbo服务定义的接口,这样写一个dubbo服务即可同步也可MQ异步),发送端通过自定义的注解注入对象调用方法,通过框架内部处理之后转换成异步mq形式发送到消费端.
比如服务端有接口:
public interface MqDemoService { void dealById(Long id); }
有实现:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
其中:
@Slf4j是lombok注解 @Service是dubbo服务端注解
有兴趣的同学自行查阅
然后是发送端
有自定义注解:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
于是在调用的controller中:
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
注意Controller中@AsyncInvoker注解的属性mqDemoService,通过这个注解注入的对象调用方法的时候会通过mq发送变为异步调用.
好了,要实现的目标很清晰了.那么要解决的问题就是以下几个方面了:
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
这里我先按照java反射调用需要的参数简单定义了一个传输对象:
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
答案就是动态代理.
答案就是spring的BeanPostProcessor接口了!这个接口允许spring在处理对象创建的前后插入用户自己定义的逻辑,在这里就不细细展开了,有需要的同学自行google/百度了哈.
那么思路出来了,代码如下:
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
接收端调用对应接口就很简单了,只需要拿到MqMethodMeta对象进行反射调用就好了,直接上代码:
@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
这里就使用到了rabbit的topic类型exchange.
首先对消费端listener中的queue和routekey进行可配置话管理:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
注意这里的
${demo.mq.method.queue} ${demo.mq.method.routekey}
是从配置文件中读取出来的:
比如系统1中是如下配置:
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
系统2中是如下配置:
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
再看发送端中那段代码:
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
这里面的clz.getName(). 由于我们系统是有良好的分包策略,所以系统1的clz.getName()一定是以com.demo.service.project1为开头的.一定会发送到project1中的listener.比如clz.getName()值为com.demo.service.project1.MqDemoService (".#"匹配后面多个标示符,此为rabbitMQ中topic类型exchange的特性).
至此,一开始想要达成的目标已经达成.今后需要用mq做异步调用的时候可以像同步方法一样使用了.
对于mq在spring中的使用在此就不详细列举了,可以参考文档:
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
稍后会提供一套demo代码出来供记录和参考
目前这套方法中还是存在一些问题的.比如:
1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考. 2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
第一个问题等需要用到对应业务后再做考虑吧.或者有思路的通知可以探讨一下.
第二个问题主要考虑的是如果消费端更改了参数类型或者其他之类的情况下,重新发布后,对于可能残留在mq中的老消息的兼容.这个目前确实没有什么好思路,抛出来也是为了集思广益了.
以上是利用rabbit mq.模拟dubbo,使MQ异步调用的详细内容。更多信息请关注PHP中文网其他相关文章!