이 기사는 소파-RPC 서버 소스 코드에 대한 자세한 분석을 제공합니다(플로우 차트 포함). 도움이 필요한 친구들이 참고할 수 있기를 바랍니다.
sofa-rpc는 Alibaba에서 오픈소스로 제공하는 고성능 RPC 프레임워크입니다. 이 글은 주로 소파-rpc 제공업체 시작 서비스 프로세스의 코드 읽기입니다. 다음은 제가 간단히 그린 기본 다이어그램입니다. 관계 흐름도
아래에서는 소파-rpc 코드를 기반으로 프로세스를 추적하고 읽어보겠습니다. BoltServer를 예로 들어보겠습니다
public static void main(String[] args) { ApplicationConfig application = new ApplicationConfig().setAppName("test-server"); ServerConfig serverConfig = new ServerConfig() .setPort(22000) .setDaemon(false); ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setApplication(application) .setRef(new HelloServiceImpl()) .setServer(serverConfig) .setRegister(false); ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>() .setInterfaceId(EchoService.class.getName()) .setApplication(application) .setRef(new EchoServiceImpl()) .setServer(serverConfig) .setRegister(false); providerConfig.export(); providerConfig2.export(); LOGGER.warn("started at pid {}", RpcRuntimeContext.PID); }
sofa-rpc는 ProviderConfig 클래스를 통해 서비스 제공자 Provider를 구성하는 것을 볼 수 있습니다. 서비스 시작을 위한 진입점으로 내보내기를 제공합니다. #🎜🎜 ##### 🎜🎜#r#🎜🎜 ## 🎜🎜
🎜🎜🎜🎜
#DEFAULTPROVIDEROTSTRAP and DubBOPROVIDEROTSTRAP are inherited from ProviderBootstrap. DefaultProviderBootstrap은 BoltProviderBootstrap, Http2ClearTextProviderBootstrap 및 RestProviderBootstrap의 세 가지 클래스에 의해 상속됩니다. 이는 실제로 소파-rpc의 세 가지 서버 서비스 메서드에 해당합니다. DefaultProviderBootstrap 서비스 시작 소스코드를 살펴보겠습니다public synchronized void export() { if (providerBootstrap == null) { providerBootstrap = Bootstraps.from(this); } providerBootstrap.export(); }
<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>
/** * 发布一个服务 * * @param providerConfig 服务发布者配置 * @param <T> 接口类型 * @return 发布启动类 */ public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) { String bootstrap = providerConfig.getBootstrap(); if (StringUtils.isEmpty(bootstrap)) { // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP); providerConfig.setBootstrap(bootstrap); } ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class) .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); return (ProviderBootstrap<T>) providerBootstrap; }
@Override public void export() { if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒 Thread thread = factory.newThread(new Runnable() { @Override public void run() { try { Thread.sleep(providerConfig.getDelay()); } catch (Throwable ignore) { // NOPMD } doExport(); } }); thread.start(); } else { doExport(); } } private void doExport() { if (exported) { return; } // 检查参数 checkParameters(); String appName = providerConfig.getAppName(); //key is the protocol of server,for concurrent safe Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>(); // 将处理器注册到server List<ServerConfig> serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) { String protocol = serverConfig.getProtocol(); String key = providerConfig.buildKey() + ":" + protocol; if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId()); } // 注意同一interface,同一uniqleId,不同server情况 AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 if (cnt == null) { // 没有发布过 cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); } int c = cnt.incrementAndGet(); hasExportedInCurrent.put(serverConfig.getProtocol(), true); int maxProxyCount = providerConfig.getRepeatedExportLimit(); if (maxProxyCount > 0) { if (c > maxProxyCount) { decrementCounter(hasExportedInCurrent); // 超过最大数量,直接抛出异常 throw new SofaRpcRuntimeException("Duplicate provider config with key " + key + " has been exported more than " + maxProxyCount + " times!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!"); } else if (c > 1) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!", key); } } } } try { // 构造请求调用器 providerProxyInvoker = new ProviderProxyInvoker(providerConfig); // 初始化注册中心 if (providerConfig.isRegister()) { List<RegistryConfig> registryConfigs = providerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { for (RegistryConfig registryConfig : registryConfigs) { RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry } } } // 将处理器注册到server for (ServerConfig serverConfig : serverConfigs) { try { //构建Server Server server = serverConfig.buildIfAbsent(); // 注册序列化接口 server.registerProcessor(providerConfig, providerProxyInvoker); if (serverConfig.isAutoStart()) { //启动服务 server.start(); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { LOGGER.errorWithApp(appName, "Catch exception when register processor to server: " + serverConfig.getId(), e); } } // 注册到注册中心 providerConfig.setConfigListener(new ProviderAttributeListener()); register(); } catch (Exception e) { decrementCounter(hasExportedInCurrent); if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e; } else { throw new SofaRpcRuntimeException("Build provider proxy error!", e); } } // 记录一些缓存数据 RpcRuntimeContext.cacheProviderConfig(this); exported = true; }
/** * 启动服务 * * @return the server */ public synchronized Server buildIfAbsent() { if (server != null) { return server; } // 提前检查协议+序列化方式 // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()), // SerializationType.valueOf(getSerialization())); //在sever工厂中拿到sever实例 server = ServerFactory.getServer(this); return server; }
ServerTransport는 추상 클래스이며 구체적인 구현은 전송 패키지 아래의 AbstractHttp2ServerTransport입니다
/** * 初始化Server实例 * * @param serverConfig 服务端配置 * @return Server */ public synchronized static Server getServer(ServerConfig serverConfig) { try { Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort())); if (server == null) { // 算下网卡和端口 resolveServerConfig(serverConfig); ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class) .getExtensionClass(serverConfig.getProtocol()); if (ext == null) { throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(), "Unsupported protocol of server!"); } server = ext.getExtInstance(); //服务初始化 server.init(serverConfig); SERVER_MAP.put(serverConfig.getPort() + "", server); } return server; } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } }
#🎜🎜 #
R estServer Rest 서비스를 제공하며, 기본 통신 구현은 SofaNettyJaxrsServer에서 찾을 수 있습니다./** * Bolt服务端 */ protected RemotingServer remotingServer; @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } // 生成阿里基于netty的bolt服务Server对象 remotingServer = initRemotingServer(); try { if (remotingServer.start(serverConfig.getBoundHost())) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(), serverConfig.getPort()); } } else { throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log."); } started = true; if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start bolt server!", e); } } }
/** * 服务端通讯层 */ private ServerTransport serverTransport; @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; this.serverTransportConfig = convertConfig(serverConfig); // 启动线程池 this.bizThreadPool = initThreadPool(serverConfig); // 服务端处理器 this.serverHandler = new HttpServerHandler(); // set default transport config this.serverTransportConfig.setContainer(container); this.serverTransportConfig.setServerHandler(serverHandler); } @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } try { // 启动线程池 this.bizThreadPool = initThreadPool(serverConfig); this.serverHandler.setBizThreadPool(bizThreadPool); //实例化服务,具体代码见 serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig); started = serverTransport.start(); if (started) { if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e); } } }
이상은 소파-RPC 서버 시작을 위한 기본 프로세스입니다. 여기서 우리가 중점을 두는 것은 코드 기능에 대한 심층적인 분석 없이 단순한 서비스 시작 프로세스입니다. 코드별 구현을 더 자세히 살펴볼 수 있습니다.
위 내용은 소파-RPC 서버 소스 코드 상세 분석(흐름도 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!