Récemment, je rénovais un ancien système et j'ai rencontré un scénario où RabbitMq était requis dans le processus précédent, diverses configurations étaient requises aux extrémités d'envoi et de consommation, ce qui semblait gênant. Puis j'ai soudainement pensé au formulaire de @Reference. annotation dans dubbo. Pouvez-vous créer une étagère similaire pour que l'appel de MQ soit aussi pratique et simple que d'appeler l'interface synchrone ? J'ai donc vérifié les informations pertinentes et lu le code source de dubbo, puis j'ai eu une idée
<.> En résumé Dit, le but à atteindre est comme dubbo, le côté consommateur expose l'interface (vous pouvez même réutiliser l'interface définie par le service dubbo, pour que l'écriture d'un service dubbo puisse être synchrone ou asynchrone MQ), et l'envoi side injecte des appels d'objet via des annotations personnalisées. La méthode est traitée en interne par le framework puis convertie en formulaire mq asynchrone et envoyée au consommateur Par exemple, le serveur a une interface : <.> et a une implémentation :public interface MqDemoService { void dealById(Long id); }
@Slf4j @Component("mqDemoServiceImpl") @Service(version = "1.0.0") public class MqDemoServiceImpl implements MqDemoService { @Override public void dealById(Long id) { log.info("执行findById方法"); } }
@Slf4j是lombok注解 @Service是dubbo服务端注解
par eux-mêmes, puis la fin de l'envoi
a des annotations personnalisées :
donc lors de l'appel dans le contrôleur :@Target(ElementType.FIELD) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited public @interface AsyncInvoker { }
@Slf4j @Controller public class MqDemoController { @AsyncInvoker private MqDemoService mqDemoService; @RequestMapping(value = "/deal", method = RequestMethod.POST) public void deal() { mqDemoService.dealById(1L); } }
D'accord, le but à atteindre est très clair Ensuite les problèmes à résoudre sont les aspects suivants :
1. Comment déterminer le format du message envoyé afin que le destinataire puisse déterminer la méthode à appeler1,如何确定发送消息的格式,使消费端可以确定调用的方法 2,发送端中如何为注解@AsyncInvoker注释的对象注入实例 3,接收端中如何在接收到消息后调用对应接口的实现方法 4,多个消费服务如何区分mq队列.
@Data public class MqMethodMeta { //调用的接口名称(包括包名,用于反射) private String interfaceName; //调用的方法名 private String methodName; //调用的方法的参数 private Object[] args; //调用的方法的参数类型 private String[] paramTypeNames; }
Et alors ? Faites savoir à Spring que l'objet annoté avec @AsyncInvoker doit être injecté avec un proxy dynamique ? 🎜>
La réponse est l'interface BeanPostProcessor de Spring ! Cette interface permet à Spring d'insérer une logique définie par l'utilisateur avant et après le traitement de la création d'objets, qui ne sera pas détaillée ici, les étudiants dans le besoin peuvent eux-mêmes rechercher Google/Baidu. 🎜>Il est très simple pour le destinataire d'appeler l'interface correspondante. Il lui suffit d'obtenir l'objet MqMethodMeta et d'effectuer un appel de réflexion. Entrez directement le code :
@Slf4j @Component public class AsyncInvokerBeanProcessor implements BeanPostProcessor { //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用. private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>(); //注入spring amqp处理mq的对象 @Autowired private RabbitTemplate rabbitTemplate; //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理. @Override public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException { //获取该实例中的有@AsyncInvoker注解的field Field[] fields = bean.getClass().getDeclaredFields(); for (Field field : fields) { try { if (!field.isAccessible()) { field.setAccessible(true); } AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class); if (asyncInvoker != null) { //创建代理对象,赋值给该feild Object value = createProxy(field.getType()); if (value != null) { field.set(bean, value); } } } catch (Throwable e) { log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e); } } return bean; } @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { return bean; } private Object createProxy(Class clz) { String interfaceName; if (clz.isInterface()) { interfaceName = clz.getName(); } else { throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface."); } Object proxy = proxyMap.get(interfaceName); if (proxy == null) { Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { log.debug("执行动态代理! method:{} ,args: {}", method, args); if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) { throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法"); } //动态代理中创建mq传输对象并发送. MqMethodMeta mqMethodMeta = new MqMethodMeta(); mqMethodMeta.setInterfaceName(clz.getName()); mqMethodMeta.setMethodName(method.getName()); mqMethodMeta.setArgs(args); String[] paramTypeNames = new String[args.length]; for (int i = 0; i < args.length; i++) { paramTypeNames[i] = args[i].getClass().getName(); } mqMethodMeta.setParamTypeNames(paramTypeNames); RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory()); Exchange exchange = new TopicExchange("exchange.demo.web.adaptor"); admin.declareExchange(exchange); //关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta); return null; } }); proxyMap.putIfAbsent(interfaceName, newProxy); proxy = proxyMap.get(interfaceName); } return proxy; } }
L'échange de type de sujet de Rabbit est utilisé ici.
Tout d'abord, configurez la file d'attente et la clé de route dans l'écouteur côté consommateur pour une gestion configurable :@Slf4j public class AsyncMethodListener implements ApplicationContextAware { private ApplicationContext applicationContext; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" )) public void messageHandle(@Payload MqMethodMeta message) { try { log.info("收到message: {}", message); Class clz = Class.forName(message.getInterfaceName()); String methodName = message.getMethodName(); Object[] args = message.getArgs(); Class[] paramTypes = new Class[message.getParamTypeNames().length]; for (int i = 0; i < message.getParamTypeNames().length; i++) { paramTypes[i] = Class.forName(message.getParamTypeNames()[i]); } //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下 for (int i = 0; i < args.length; i++) { Class c = paramTypes[i]; if (args[i] instanceof Integer && c.equals(Long.class)) { args[i] = ((Integer) args[i]).longValue(); } } //拿到spring管理的对应接口的实现 Object invoker = applicationContext.getBean(clz); Method method = clz.getMethod(methodName, paramTypes); method.invoke(invoker, args); } catch (Exception e) { e.printStackTrace(); } } @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } }
@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${demo.mq.method.queue}", durable = "true"), exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"), key = "${demo.mq.method.routekey}" ))
Par exemple, la configuration dans le système 1 est la suivante :
${demo.mq.method.queue} ${demo.mq.method.routekey}
Dans le système 2, la configuration est la suivante suit :
Regardez le code dans l'expéditeur :demo.mq.method.queue=com.demo.service.project1.# demo.mq.method.routekey=com.demo.service.project1.#
demo.mq.method.queue=com.demo.service.project2.# demo.mq.method.routekey=com.demo.service.project2.#
À ce stade, l'objectif que vous vouliez atteindre au début a été atteint. À l'avenir, lorsque vous devez utiliser mq pour effectuer des appels asynchrones. Il peut être utilisé comme une méthode synchrone
//关注此处clz.getName(),用于处理问题4 rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
Le premier problème doit utiliser l'entreprise correspondante. Réfléchissons-y plus tard. Ou vous pouvez en discuter si vous avez des idées
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!