면접관은 다음 질문을 좋아합니다.
오늘은 "RPC 구현 원칙"에 대해 이야기해보겠습니다. (XX 시리즈를 디자인하는 방법에 대한 다른 관련 주제는 Knowledge Planet에 게시되어 있습니다.)그러므로 먼저 명확히 해두겠습니다 질문: RPC는 무엇인가요?
RPC는 Remote Procedure Call의 약어, 즉 Remote Procedure Call입니다.
RPC는 컴퓨터 통신 프로토콜입니다. 이 프로토콜을 사용하면 개발자가 이 상호 작용을 추가로 프로그래밍할 필요 없이 한 컴퓨터에서 실행되는 프로그램이 다른 컴퓨터의 서브루틴을 호출할 수 있습니다.
두 개 이상의 애플리케이션이 서로 다른 서버에 배포될 때 이들 간의 호출은 로컬 메서드 호출과 유사하다는 점에 주목할 가치가 있습니다. 다음으로 RPC 호출에서 어떤 일이 발생하는지 분석해 보겠습니다.
Dubbo와 같이 업계에서 더 널리 사용되는 RPC 프레임워크 중 일부는 인터페이스 기반 원격 메서드 호출을 제공합니다. 즉, 클라이언트는 다음의 정의만 알면 됩니다. 원격 메서드를 호출하는 인터페이스입니다. Java에서는 인터페이스가 인스턴스 메소드를 직접 호출할 수 없습니다. 이는 클라이언트가 이러한 인터페이스에 대한 프록시 객체를 생성해야 함을 의미합니다. 이를 위해 Java는 동적 프록시 생성을 지원합니다. 프록시 개체가 있는 경우 각 특정 메서드는 어떻게 호출됩니까? JDK 동적 프록시에 의해 생성된 프록시 객체는 지정된 메소드를 호출할 때 실제로 InvocationHandler에 정의된 #invoke 메소드를 실행하게 되며, 원격 메소드 호출이 완료되고 결과를 얻는다.
클라이언트를 제쳐두고 돌이켜 보면 RPC는 두 컴퓨터 사이의 호출이며 본질적으로 두 호스트 사이의 네트워크 통신입니다. 네트워크 통신에는 직렬화, 역직렬화, 인코딩 및 디코딩 등이 있어야 합니다. ; 동시에 대부분의 시스템은 클러스터에 배포됩니다. 여러 호스트/컨테이너가 외부 세계에 동일한 서비스를 제공합니다. 클러스터의 노드 수가 많으면 서비스 주소 관리도 매우 번거로운 문제입니다. . , 일반적인 관행은 각 서비스 노드가 자신의 주소와 제공된 서비스 목록을 등록 센터에 등록하고 등록 센터가 서비스 목록을 균일하게 관리하는 것입니다. 이 접근 방식은 몇 가지 문제를 해결하고 클라이언트에 새로운 작업을 추가합니다. 서비스 검색이란 일반 용어로 등록 센터에서 원격 방식에 해당하는 서비스 목록을 찾아 특정 전략을 통해 서비스 주소를 선택하여 네트워크 통신을 완료하는 것을 의미합니다.
클라이언트와 등록 센터에 대해 이야기한 후, 서버의 또 다른 중요한 역할은 서비스 인터페이스의 실제 구현을 제공하고 특정 포트에서 네트워크 요청을 모니터링하는 것입니다. 요청은 네트워크에서 전송됩니다. 요청에서 해당 매개변수(예: 서비스 인터페이스, 메소드, 요청 매개변수 등)를 얻은 다음 이러한 매개변수를 기반으로 리플렉션을 통해 인터페이스의 실제 구현을 호출합니다. 결과를 얻고 해당 응답 스트림에 기록됩니다.
요약하면 기본 RPC 호출 프로세스는 대략 다음과 같습니다.
서버(생산자)
서비스 인터페이스:
RPC에서는 생산자와 소비자는 공통 서비스 인터페이스 API를 갖습니다. 다음과 같이 HelloService 인터페이스를 정의합니다.
/** * @Descrption 服务接口 ***/ public interface HelloService { String sayHello(String somebody); }
서비스 구현:
생산자는 서비스 인터페이스 구현을 제공하고 HelloServiceImpl 구현 클래스를 생성해야 합니다.
/** * @Descrption 服务实现 ***/ public class HelloServiceImpl implements HelloService { @Override public String sayHello(String somebody) { return "hello " + somebody + "!"; } }
서비스 등록:
이 예제에서는 Spring을 사용하여 Bean을 관리하고 사용자 정의 XML 및 파서를 사용하여 서비스 구현 클래스를 컨테이너에 로드합니다(물론 사용자 정의 주석도 사용할 수 있지만 자세히 설명하지는 않겠습니다) 자세한 내용은 여기를 참조하세요.) 그리고 서비스 인터페이스 정보를 등록 센터에 등록하세요.
먼저 XSD를 사용자 정의하세요.
<xsd:element name="service"> <xsd:complexType> <xsd:complexContent> <xsd:extension base="beans:identifiedType"> <xsd:attribute name="interface" type="xsd:string" use="required"/> <xsd:attribute name="timeout" type="xsd:int" use="required"/> <xsd:attribute name="serverPort" type="xsd:int" use="required"/> <xsd:attribute name="ref" type="xsd:string" use="required"/> <xsd:attribute name="weight" type="xsd:int" use="optional"/> <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/> <xsd:attribute name="appKey" type="xsd:string" use="required"/> <xsd:attribute name="groupName" type="xsd:string" use="optional"/> </xsd:extension> </xsd:complexContent> </xsd:complexType> </xsd:element>
스키마와 XSD를 각각 지정하고, 스키마와 해당 핸들러의 매핑을 지정하세요.
Schema:
http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
Handler:
http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler
작성된 파일을 Classpath 아래 META-INF 디렉터리에 넣습니다.
在 Spring 配置文件中配置服务类:
<!-- 发布远程服务 --> <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/> <storm:service id="helloServiceRegister" interface="com.hsunfkqm.storm.framework.test.HelloService" ref="helloService" groupName="default" weight="2" appKey="ares" workerThreads="100" serverPort="8081" timeout="600"/>
编写对应的 Handler 和 Parser:
StormServiceNamespaceHandler:
import org.springframework.beans.factory.xml.NamespaceHandlerSupport; /** * @author 孙浩 * @Descrption 服务发布自定义标签 ***/ public class StormServiceNamespaceHandler extends NamespaceHandlerSupport { @Override public void init() { registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser()); } }
ProviderFactoryBeanDefinitionParser:
protected Class getBeanClass(Element element) { return ProviderFactoryBean.class; } protected void doParse(Element element, BeanDefinitionBuilder bean) { try { String serviceItf = element.getAttribute("interface"); String serverPort = element.getAttribute("serverPort"); String ref = element.getAttribute("ref"); // .... bean.addPropertyValue("serverPort", Integer.parseInt(serverPort)); bean.addPropertyValue("serviceItf", Class.forName(serviceItf)); bean.addPropertyReference("serviceObject", ref); //... if (NumberUtils.isNumber(weight)) { bean.addPropertyValue("weight", Integer.parseInt(weight)); } //... } catch (Exception e) { // ... } }
ProviderFactoryBean:
/** * @Descrption 服务发布 ***/ public class ProviderFactoryBean implements FactoryBean, InitializingBean { //服务接口 private Class<?> serviceItf; //服务实现 private Object serviceObject; //服务端口 private String serverPort; //服务超时时间 private long timeout; //服务代理对象,暂时没有用到 private Object serviceProxyObject; //服务提供者唯一标识 private String appKey; //服务分组组名 private String groupName = "default"; //服务提供者权重,默认为 1 , 范围为 [1-100] private int weight = 1; //服务端线程数,默认 10 个线程 private int workerThreads = 10; @Override public Object getObject() throws Exception { return serviceProxyObject; } @Override public Class<?> getObjectType() { return serviceItf; } @Override public void afterPropertiesSet() throws Exception { //启动 Netty 服务端 NettyServer.singleton().start(Integer.parseInt(serverPort)); //注册到 zk, 元数据注册中心 List<ProviderService> providerServiceList = buildProviderServiceInfos(); IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton(); registerCenter4Provider.registerProvider(providerServiceList); } } //================RegisterCenter#registerProvider====================== @Override public void registerProvider(final List<ProviderService> serviceMetaData) { if (CollectionUtils.isEmpty(serviceMetaData)) { return; } //连接 zk, 注册服务 synchronized (RegisterCenter.class) { for (ProviderService provider : serviceMetaData) { String serviceItfKey = provider.getServiceItf().getName(); List<ProviderService> providers = providerServiceMap.get(serviceItfKey); if (providers == null) { providers = Lists.newArrayList(); } providers.add(provider); providerServiceMap.put(serviceItfKey, providers); } if (zkClient == null) { zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer()); } //创建 ZK 命名空间/当前部署应用 APP 命名空间/ String APP_KEY = serviceMetaData.get(0).getAppKey(); String ZK_PATH = ROOT_PATH + "/" + APP_KEY; boolean exist = zkClient.exists(ZK_PATH); if (!exist) { zkClient.createPersistent(ZK_PATH, true); } for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) { //服务分组 String groupName = entry.getValue().get(0).getGroupName(); //创建服务提供者 String serviceNode = entry.getKey(); String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE; exist = zkClient.exists(servicePath); if (!exist) { zkClient.createPersistent(servicePath, true); } //创建当前服务器节点 int serverPort = entry.getValue().get(0).getServerPort();//服务端口 int weight = entry.getValue().get(0).getWeight();//服务权重 int workerThreads = entry.getValue().get(0).getWorkerThreads();//服务工作线程 String localIp = IPHelper.localIp(); String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName; exist = zkClient.exists(currentServiceIpNode); if (!exist) { //注意,这里创建的是临时节点 zkClient.createEphemeral(currentServiceIpNode); } //监听注册服务的变化,同时更新数据到本地缓存 zkClient.subscribeChildChanges(servicePath, new IZkChildListener() { @Override public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { if (currentChilds == null) { currentChilds = Lists.newArrayList(); } //存活的服务 IP 列表 List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() { @Override public String apply(String input) { return StringUtils.split(input, "|")[0]; } })); refreshActivityService(activityServiceIpList); } }); } } }
至此服务实现类已被载入 Spring 容器中,且服务接口信息也注册到了注册中心。
网络通信:
作为生产者对外提供 RPC 服务,必须有一个网络程序来来监听请求和做出响应。在 Java 领域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 来实现的,本例中也采用它当做通信服务器。
构建并启动 Netty 服务监听指定端口:
public void start(final int port) { synchronized (NettyServer.class) { if (bossGroup != null || workerGroup != null) { return; } bossGroup = new NioEventLoopGroup(); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { //注册解码器 NettyDecoderHandler ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType)); //注册编码器 NettyEncoderHandler ch.pipeline().addLast(new NettyEncoderHandler(serializeType)); //注册服务端业务逻辑处理器 NettyServerInvokeHandler ch.pipeline().addLast(new NettyServerInvokeHandler()); } }); try { channel = serverBootstrap.bind(port).sync().channel(); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
上面的代码中向 Netty 服务的 Pipeline 中添加了编解码和业务处理器,当接收到请求时,经过编解码后,真正处理业务的是业务处理器,即 NettyServerInvokeHandler,该处理器继承自 SimpleChannelInboundHandler,当数据读取完成将触发一个事件,并调用 NettyServerInvokeHandler#channelRead0 方法来处理请求。
@Override protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception { if (ctx.channel().isWritable()) { //从服务调用对象里获取服务提供者信息 ProviderService metaDataModel = request.getProviderService(); long consumeTimeOut = request.getInvokeTimeout(); final String methodName = request.getInvokedMethodName(); //根据方法名称定位到具体某一个服务提供者 String serviceKey = metaDataModel.getServiceItf().getName(); //获取限流工具类 int workerThread = metaDataModel.getWorkerThreads(); Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey); if (semaphore == null) { synchronized (serviceKeySemaphoreMap) { semaphore = serviceKeySemaphoreMap.get(serviceKey); if (semaphore == null) { semaphore = new Semaphore(workerThread); serviceKeySemaphoreMap.put(serviceKey, semaphore); } } } //获取注册中心服务 IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton(); List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey); Object result = null; boolean acquire = false; try { ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() { @Override public boolean apply(ProviderService input) { return StringUtils.equals(input.getServiceMethod().getName(), methodName); } }).iterator().next(); Object serviceObject = localProviderCache.getServiceObject(); //利用反射发起服务调用 Method method = localProviderCache.getServiceMethod(); //利用 semaphore 实现限流 acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS); if (acquire) { result = method.invoke(serviceObject, request.getArgs()); //System.out.println("---------------"+result); } } catch (Exception e) { System.out.println(JSON.toJSONString(localProviderCaches) + " " + methodName+" "+e.getMessage()); result = e; } finally { if (acquire) { semaphore.release(); } } //根据服务调用结果组装调用返回对象 StormResponse response = new StormResponse(); response.setInvokeTimeout(consumeTimeOut); response.setUniqueKey(request.getUniqueKey()); response.setResult(result); //将服务调用返回对象回写到消费端 ctx.writeAndFlush(response); } else { logger.error("------------channel closed!---------------"); } }
此处还有部分细节如自定义的编解码器等,篇幅所限不在此详述,继承 MessageToByteEncoder 和 ByteToMessageDecoder 覆写对应的 encode 和 decode 方法即可自定义编解码器,使用到的序列化工具如 Hessian/Proto 等可参考对应的官方文档。
请求和响应包装:
为便于封装请求和响应,定义两个 bean 来表示请求和响应。
请求:
/** * @author 孙浩 * @Descrption ***/ public class StormRequest implements Serializable { private static final long serialVersionUID = -5196465012408804755L; //UUID,唯一标识一次返回值 private String uniqueKey; //服务提供者信息 private ProviderService providerService; //调用的方法名称 private String invokedMethodName; //传递参数 private Object[] args; //消费端应用名 private String appName; //消费请求超时时长 private long invokeTimeout; // getter/setter }
响应:
/** * @Descrption ***/ public class StormResponse implements Serializable { private static final long serialVersionUID = 5785265307118147202L; //UUID, 唯一标识一次返回值 private String uniqueKey; //客户端指定的服务超时时间 private long invokeTimeout; //接口调用返回的结果对象 private Object result; //getter/setter }
客户端(消费者)
客户端(消费者)在 RPC 调用中主要是生成服务接口的代理对象,并从注册中心获取对应的服务列表发起网络请求。
客户端和服务端一样采用 Spring 来管理 bean 解析 XML 配置等不再赘述,重点看下以下几点:
1、通过 JDK 动态代理来生成引入服务接口的代理对象
public Object getProxy() { return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this); }
2、从注册中心获取服务列表并依据某种策略选取其中一个服务节点
//服务接口名称 String serviceKey = targetInterface.getName(); //获取某个接口的服务提供者列表 IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton(); List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey); //根据软负载策略,从服务提供者列表选取本次调用的服务提供者 ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy); ProviderService providerService = clusterStrategyService.select(providerServices);
3、通过 Netty 建立连接,发起网络请求
/** * @author 孙浩 * @Descrption Netty 消费端 bean 代理工厂 ***/ public class RevokerProxyBeanFactory implements InvocationHandler { private ExecutorService fixedThreadPool = null; //服务接口 private Class<?> targetInterface; //超时时间 private int consumeTimeout; //调用者线程数 private static int threadWorkerNumber = 10; //负载均衡策略 private String clusterStrategy; @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { ... //复制一份服务提供者信息 ProviderService newProvider = providerService.copy(); //设置本次调用服务的方法以及接口 newProvider.setServiceMethod(method); newProvider.setServiceItf(targetInterface); //声明调用 AresRequest 对象,AresRequest 表示发起一次调用所包含的信息 final StormRequest request = new StormRequest(); //设置本次调用的唯一标识 request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId()); //设置本次调用的服务提供者信息 request.setProviderService(newProvider); //设置本次调用的方法名称 request.setInvokedMethodName(method.getName()); //设置本次调用的方法参数信息 request.setArgs(args); try { //构建用来发起调用的线程池 if (fixedThreadPool == null) { synchronized (RevokerProxyBeanFactory.class) { if (null == fixedThreadPool) { fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber); } } } //根据服务提供者的 ip,port, 构建 InetSocketAddress 对象,标识服务提供者地址 String serverIp = request.getProviderService().getServerIp(); int serverPort = request.getProviderService().getServerPort(); InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort); //提交本次调用信息到线程池 fixedThreadPool, 发起调用 Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request)); //获取调用的返回结果 StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS); if (response != null) { return response.getResult(); } } catch (Exception e) { throw new RuntimeException(e); } return null; } // ... }
Netty 的响应是异步的,为了在方法调用返回前获取到响应结果,需要将异步的结果同步化。
4、Netty 异步返回的结果存入阻塞队列
@Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception { //将 Netty 异步返回的结果存入阻塞队列,以便调用端同步获取 RevokerResponseHolder.putResultValue(response); }
5、请求发出后同步获取结果
//提交本次调用信息到线程池 fixedThreadPool, 发起调用 Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request)); //获取调用的返回结果 StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS); if (response != null) { return response.getResult(); } //=================================================== //从返回结果容器中获取返回结果,同时设置等待超时时间为 invokeTimeout long invokeTimeout = request.getInvokeTimeout(); StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
Server:
/** * @Descrption ***/ public class MainServer { public static void main(String[] args) throws Exception { //发布服务 final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml"); System.out.println(" 服务发布完成"); } }
Client:
public class Client { private static final Logger logger = LoggerFactory.getLogger(Client.class); public static void main(String[] args) throws Exception { final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml"); final HelloService helloService = (HelloService) context.getBean("helloService"); String result = helloService.sayHello("World"); System.out.println(result); for (;;) { } } }
结果
生产者:
消费者:
注册中心:
本文简单介绍了 RPC 的整个流程,并实现了一个简单的 RPC 调用。希望阅读完本文之后,能加深你对 RPC 的一些认识。
生产者端流程:
소비자 측 프로세스:
위 내용은 Alibaba 면접관: RPC 프레임워크를 직접 작성해 주세요.의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!