> Java > java지도 시간 > 본문

Rabbit mq를 사용하여 dubbo를 시뮬레이션하고 MQ 비동기 호출을 수행합니다.

怪我咯
풀어 주다: 2017-06-26 11:22:21
원래의
2967명이 탐색했습니다.

최근에 오래된 시스템을 개조하다가 RabbitMq가 필요한 시나리오를 접하게 되었습니다. 이전 프로세스에서 전송 및 소비 측에 다양한 구성이 필요했는데, 그게 번거로웠던 것 같습니다. 그러다 문득 @Reference 주석 형식이 생각났습니다. dubbo. 가능할까요? MQ 호출이 동기 인터페이스 호출만큼 편리하고 간단하도록 유사한 쉘프를 만드는 방법은 무엇입니까? 그래서 관련 정보를 확인하고 dubbo의 소스 코드를 읽어보니 아이디어가 생겼습니다. 일반적으로 달성하려는 목표는 dubbo와 마찬가지로 소비자 측에서 인터페이스를 노출하고(dubbo 서비스에서 정의한 인터페이스를 재사용할 수도 있으므로 dubbo 서비스 작성이 동기식 또는 MQ 비동기식이 될 수 있음) 송신 측에서 주입합니다. 사용자 정의 주석을 통해 메소드를 호출하고 이를 프레임워크 내에서 내부적으로 처리합니다. 그런 다음 비동기식 mq 형식으로 변환되어 소비자에게 전송됩니다.

예를 들어 서버에는 인터페이스가 있습니다.

public interface MqDemoService {
    void dealById(Long id);
}
로그인 후 복사

구현:

@Slf4j
@Component("mqDemoServiceImpl")
@Service(version = "1.0.0")
public class MqDemoServiceImpl implements MqDemoService {
    @Override
    public void dealById(Long id) {
        log.info("执行findById方法");
    }
}
로그인 후 복사

그 중:

@Slf4j是lombok注解
@Service是dubbo服务端注解
로그인 후 복사

관심 있는 학생이 직접 확인할 수 있습니다

그런 다음 전송 끝

에 사용자 정의가 있습니다. 참고:

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface AsyncInvoker {
}
로그인 후 복사

호출 컨트롤러에서:

@Slf4j
@Controller
public class MqDemoController {
  @AsyncInvoker
  private MqDemoService mqDemoService;

  @RequestMapping(value = "/deal", method = RequestMethod.POST)
  public void deal() {
    mqDemoService.dealById(1L);
  }
}
로그인 후 복사

에서 @AsyncInvoker로 주석이 달린 mqDemoService 속성에 주의하세요. 이 Annotation을 통해 주입된 객체가 메소드를 호출하면 mq를 통해 전송되어 비동기 호출이 됩니다.

자, 달성하려는 목표는 매우 명확합니다. 그러면 해결해야 할 문제는 다음과 같습니다. :

1,如何确定发送消息的格式,使消费端可以确定调用的方法
2,发送端中如何为注解@AsyncInvoker注释的对象注入实例
3,接收端中如何在接收到消息后调用对应接口的实现方法
4,多个消费服务如何区分mq队列.
로그인 후 복사

1. 수신측에서 호출 방법을 결정할 수 있도록 메시지 전송 형식을 결정하는 방법

여기에서는 먼저 필수 Java 반사 호출을 따릅니다. 매개변수는 단순히 전송 개체를 정의합니다.

@Data
public class MqMethodMeta {
  //调用的接口名称(包括包名,用于反射)
  private String interfaceName;
  //调用的方法名
  private String methodName;
  //调用的方法的参数
  private Object[] args;
  //调用的方法的参数类型
  private String[] paramTypeNames;
}
로그인 후 복사

2. 보낸 사람의 @AsyncInvoker 주석이 달린 개체에 인스턴스를 삽입합니다

이 시나리오에서 보낸 사람은 구현이 아닌 소비자 측의 인터페이스만 소개합니다. 그렇다면 @AsyncInvoker는 어떻게 개체를 삽입합니까?

답은 동적 프록시입니다. .

그렇다면 @AsyncInvoker에 의해 주석이 달린 객체가 동적 프록시에 주입되어야 한다는 것을 Spring에게 어떻게 알릴 수 있을까요?

답은 Spring의 BeanPostProcessor 인터페이스입니다! 이 인터페이스는 Spring이 처리할 수 있도록 해줍니다. 사용자 정의 로직은 전후에 삽입됩니다. 여기서는 자세한 내용을 다루지 않겠습니다. 필요하신 분들은 직접 google/Baidu를 이용하시면 됩니다.

그러면 코드는 다음과 같습니다.

@Slf4j
@Component
public class AsyncInvokerBeanProcessor implements BeanPostProcessor {
  //缓存生成的动态代理对象,用于多个Controller注入同一类型对象时使用.
  private final ConcurrentMap<String, Object> proxyMap = new ConcurrentHashMap<>();

  //注入spring amqp处理mq的对象
  @Autowired
  private RabbitTemplate rabbitTemplate;

  //BeanPostProcessor接口方法,在spring创建每个实例前插入的用户自定义逻辑.这里我们需要的是在每个Controller对象创建的时候为其中的@AsyncInvoker注解对象注入动态代理.
  @Override
  public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
    //获取该实例中的有@AsyncInvoker注解的field
    Field[] fields = bean.getClass().getDeclaredFields();
    for (Field field : fields) {
      try {
        if (!field.isAccessible()) {
          field.setAccessible(true);
        }
        AsyncInvoker asyncInvoker = field.getAnnotation(AsyncInvoker.class);
        if (asyncInvoker != null) {
          //创建代理对象,赋值给该feild
          Object value = createProxy(field.getType());
          if (value != null) {
            field.set(bean, value);
          }
        }
      } catch (Throwable e) {
        log.error("Failed to init remote mq service at filed " + field.getName() + " in class " + bean.getClass().getName() + ", cause: " + e.getMessage(), e);
      }
    }
    return bean;
  }

  @Override
  public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
    return bean;
  }

  private Object createProxy(Class clz) {
    String interfaceName;
    if (clz.isInterface()) {
      interfaceName = clz.getName();
    } else {
      throw new IllegalStateException("The @MqInvoker property type " + clz.getName() + " is not a interface.");
    }

    Object proxy = proxyMap.get(interfaceName);
    if (proxy == null) {
      Object newProxy = Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{clz}, new InvocationHandler() {
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
          log.debug("执行动态代理! method:{} ,args: {}", method, args);
          if (method.getParameters().length != 1 || !method.getParameters()[0].getType().equals(Long.class)) {
            throw new IllegalAccessException("MQ Service 目前仅支持单参数Long类型方法");
          }
          //动态代理中创建mq传输对象并发送.
          MqMethodMeta mqMethodMeta = new MqMethodMeta();
          mqMethodMeta.setInterfaceName(clz.getName());
          mqMethodMeta.setMethodName(method.getName());
          mqMethodMeta.setArgs(args);
          String[] paramTypeNames = new String[args.length];
          for (int i = 0; i < args.length; i++) {
            paramTypeNames[i] = args[i].getClass().getName();
          }
          mqMethodMeta.setParamTypeNames(paramTypeNames);
          RabbitAdmin admin = new RabbitAdmin(rabbitTemplate.getConnectionFactory());
          Exchange exchange = new TopicExchange("exchange.demo.web.adaptor");
          admin.declareExchange(exchange);
          //关注此处clz.getName(),用于处理问题4
          rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
          return null;
        }
      });
      proxyMap.putIfAbsent(interfaceName, newProxy);
      proxy = proxyMap.get(interfaceName);
    }
    return proxy;
  }
}
로그인 후 복사

3. , 해당 인터페이스의 구현 메소드를 호출하세요

수신 측에서 해당 인터페이스를 호출하는 것은 매우 간단합니다. 리플렉션 호출을 수행하려면 MqMethodMeta 객체만 가져오면 됩니다.

@Slf4j
public class AsyncMethodListener implements ApplicationContextAware {
  private ApplicationContext applicationContext;

  @RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
  public void messageHandle(@Payload MqMethodMeta message) {
    try {
      log.info("收到message: {}", message);
      Class clz = Class.forName(message.getInterfaceName());
      String methodName = message.getMethodName();
      Object[] args = message.getArgs();
      Class[] paramTypes = new Class[message.getParamTypeNames().length];
      for (int i = 0; i < message.getParamTypeNames().length; i++) {
        paramTypes[i] = Class.forName(message.getParamTypeNames()[i]);
      }

      //由于使用Object[]数组传送参数,所以Jackson2JsonMessageConverter会将id转换为Integer,反射调用时会报错,此处强转一下
      for (int i = 0; i < args.length; i++) {
        Class c = paramTypes[i];
        if (args[i] instanceof Integer && c.equals(Long.class)) {
          args[i] = ((Integer) args[i]).longValue();
        }
      }
      //拿到spring管理的对应接口的实现
      Object invoker = applicationContext.getBean(clz);
      Method method = clz.getMethod(methodName, paramTypes);
      method.invoke(invoker, args);
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
    this.applicationContext = applicationContext;
  }
}
로그인 후 복사

4. 다중 소비자 서비스는 어떻습니까?

여기서는 토끼의 주제 유형 교환이 사용됩니다.

먼저 소비자 리스너에서 대기열과 경로 키를 구성합니다.

@RabbitListener(bindings = @QueueBinding(
      value = @Queue(value = "${demo.mq.method.queue}", durable = "true"),
      exchange = @Exchange(value = "exchange.demo.web.adaptor", type = ExchangeTypes.TOPIC, durable = "true"),
      key = "${demo.mq.method.routekey}"
  ))
로그인 후 복사

여기에서
${demo.mq.method.queue}
${demo.mq.method.routekey}
로그인 후 복사

를 읽습니다. 구성 파일:

예를 들어 시스템 1의 구성은 다음과 같습니다.

demo.mq.method.queue=com.demo.service.project1.#
demo.mq.method.routekey=com.demo.service.project1.#
로그인 후 복사

시스템 2의 구성은 다음과 같습니다.

demo.mq.method.queue=com.demo.service.project2.#
demo.mq.method.routekey=com.demo.service.project2.#
로그인 후 복사

발신자의 코드를 살펴보세요.

//关注此处clz.getName(),用于处理问题4
rabbitTemplate.convertAndSend("exchange.demo.web.adaptor", clz.getName(), mqMethodMeta);
로그인 후 복사

여기의 clz.getName() 우리 시스템에는 좋은 하도급 전략이 있으므로 시스템 1의 clz.getName()은 반드시 com.demo.service.project1로 시작해야 합니다. 예를 들어, 값은 다음과 같습니다. clz.getName()은 com.demo .service.project1.MqDemoService입니다(".#"은 다음 다중 식별자와 일치하며 이는 RabbitMQ의 주제 유형 교환 기능입니다).

이 시점에서 우리가 달성하고자 했던 목표는 앞으로는 비동기식 호출을 위해 mq를 사용해야 합니다. 동기식 방법처럼 사용할 수 있습니다.

스프링에서 mq의 사용은 여기에 자세히 나열되지 않습니다. 문서를 참조하세요. :

http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/htmlsingle/
로그인 후 복사

녹음 및 참고용으로 데모 코드 세트가 제공됩니다

요약

이 방법에는 여전히 몇 가지 문제가 있습니다. 예:

1,因为目前业务场景,没有考虑异步回调的问题. 需要的话可以考虑和rabbitmq本身的异步回调方式结合. 目前还没有思考.
2,因为对消费端版本更新问题的考虑,目前仅仅支持单参数(整型)方法的调用.
로그인 후 복사

첫 번째 질문은 해당 비즈니스를 사용한 후에 고려해야 합니다. 아니면 아이디어가 있으면 토론해도 됩니다.

두 번째 질문은 주로 소비자가 매개변수 유형이나 다른 상황을 변경하면 다시 게시한 후 남아 있을 수 있는 이전 메시지와 호환된다는 점을 고려합니다. mq. 현재로서는 이에 대한 좋은 아이디어가 없으며 단지 브레인스토밍용일 뿐입니다.

위 내용은 Rabbit mq를 사용하여 dubbo를 시뮬레이션하고 MQ 비동기 호출을 수행합니다.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:php.cn
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿
회사 소개 부인 성명 Sitemap
PHP 중국어 웹사이트:공공복지 온라인 PHP 교육,PHP 학습자의 빠른 성장을 도와주세요!