最近在改造舊系統,遇到了需要使用rabbitMq的場景.在以前使用的過程中需要在發送端和消費端各種配置,感覺比較麻煩,然後突然想到了dubbo中@Reference註解的形式,可不可以做一個類似的架子,這樣調用MQ的時候就像調用同步接口一樣方便簡單呢?於是查了相關資料和看了dubbo的源碼,之後就有了思路.
總的來說,要實現的目標就是像dubbo一樣,消費端暴露接口(甚至可以復用dubbo服務定義的接口,這樣寫一個dubbo服務即可同步也可MQ異步),發送端通過自定義的註解注入對象調用方法,透過框架內部處理之後轉換成非同步mq形式傳送到消費端.
例如服務端有介面:
public interface MqDemoService { void dealById(Long id); }
有實作:
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
其中:
@Slf4j是lombok注解 @Service是dubbo服务端注解
有興趣的同學自行查閱
然後是發送端
有自訂註解:
@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
於是在呼叫的controller中:
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
注意Controller中@AsyncInvoker註解的屬性mqDemoService,透過這個註解注入的物件呼叫方法的時候會透過mq發送變為非同步呼叫.
好了,要實現的目標很清晰了.那麼要解決的問題就是以下幾個方面了:
1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
這裡我先按照java反射調用需要的參數簡單定義了一個傳輸物件:
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
答案就是動態代理.
答案就是spring的BeanPostProcessor接口了!這個接口允許spring在處理對象創建的前後插入用戶自己定義的邏輯,在這裡就不細細展開了,有需要的同學自行google/百度了哈.
那麼思路出來了,程式碼如下:
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
接收端呼叫對應接口就很簡單了,只需要拿到MqMethodMeta物件進行反射呼叫就好了,直接上程式碼:
@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
這裡就使用到了rabbit的topic類型exchange.
首先對消費端listener中的queue和routekey進行可設定話管理:
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
注意這裡的
${demo.mq.method.queue} ${demo.mq.method.routekey}
是從設定檔讀取出來的:
例如係統1中是如下配置:
demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
系統2中是如下配置:
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
再看發送端中那段程式碼:
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
這裡面的clz.getName(). 由於我們系統是有良好的分包策略,所以系統1的clz.getName()一定是以com.demo.service.project1為開頭的.一定會發送到project1中的listener.例如clz.getName()值為com.demo.service.project1.MqDemoService (".#"匹配後面多個標示符,此為rabbitMQ中topic類型exchange的特性).
至此,一開始想要達成的目標已經達成.今後需要用mq做異步調用的時候可以像同步方法一樣使用了.
對於mq在spring中的使用在此就不詳細列舉了,可以參考文件:
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
稍後會提供一套demo程式碼出來供記錄和參考
#目前這套方法中還是存在一些問題的.例如:
1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考. 2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
第一個問題等需要用到對應業務後再做考慮吧.或者有思路的通知可以探討一下.
第二個問題主要考慮的是如果消費端更改了參數類型或者其他之類的情況下,重新發布後,對於可能殘留在mq中的老消息的兼容.這個目前確實沒有什麼好思路,拋出也是為了集思廣益了.
以上是利用rabbit mq.模擬dubbo,使MQ非同步調用的詳細內容。更多資訊請關注PHP中文網其他相關文章!