sofa-rpc服务端源码的详细分析(附流程图)
本篇文章给大家带来的内容是关于sofa-rpc服务端源码的详细分析(附流程图),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。
sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图
下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例
public static void main(String[] args) { ApplicationConfig application = new ApplicationConfig().setAppName("test-server"); ServerConfig serverConfig = new ServerConfig() .setPort(22000) .setDaemon(false); ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) .setApplication(application) .setRef(new HelloServiceImpl()) .setServer(serverConfig) .setRegister(false); ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>() .setInterfaceId(EchoService.class.getName()) .setApplication(application) .setRef(new EchoServiceImpl()) .setServer(serverConfig) .setRegister(false); providerConfig.export(); providerConfig2.export(); LOGGER.warn("started at pid {}", RpcRuntimeContext.PID); }
可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。
public synchronized void export() { if (providerBootstrap == null) { providerBootstrap = Bootstraps.from(this); } providerBootstrap.export(); }
<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>
/** * 发布一个服务 * * @param providerConfig 服务发布者配置 * @param <T> 接口类型 * @return 发布启动类 */ public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) { String bootstrap = providerConfig.getBootstrap(); if (StringUtils.isEmpty(bootstrap)) { // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP); providerConfig.setBootstrap(bootstrap); } ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class) .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); return (ProviderBootstrap<T>) providerBootstrap; }
DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。
DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。
我们看下DefaultProviderBootstrap服务启动源码
@Override public void export() { if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒 Thread thread = factory.newThread(new Runnable() { @Override public void run() { try { Thread.sleep(providerConfig.getDelay()); } catch (Throwable ignore) { // NOPMD } doExport(); } }); thread.start(); } else { doExport(); } } private void doExport() { if (exported) { return; } // 检查参数 checkParameters(); String appName = providerConfig.getAppName(); //key is the protocol of server,for concurrent safe Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>(); // 将处理器注册到server List<ServerConfig> serverConfigs = providerConfig.getServer(); for (ServerConfig serverConfig : serverConfigs) { String protocol = serverConfig.getProtocol(); String key = providerConfig.buildKey() + ":" + protocol; if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId()); } // 注意同一interface,同一uniqleId,不同server情况 AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器 if (cnt == null) { // 没有发布过 cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0)); } int c = cnt.incrementAndGet(); hasExportedInCurrent.put(serverConfig.getProtocol(), true); int maxProxyCount = providerConfig.getRepeatedExportLimit(); if (maxProxyCount > 0) { if (c > maxProxyCount) { decrementCounter(hasExportedInCurrent); // 超过最大数量,直接抛出异常 throw new SofaRpcRuntimeException("Duplicate provider config with key " + key + " has been exported more than " + maxProxyCount + " times!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!"); } else if (c > 1) { if (LOGGER.isInfoEnabled(appName)) { LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!" + " Maybe it's wrong config, please check it." + " Ignore this if you did that on purpose!", key); } } } } try { // 构造请求调用器 providerProxyInvoker = new ProviderProxyInvoker(providerConfig); // 初始化注册中心 if (providerConfig.isRegister()) { List<RegistryConfig> registryConfigs = providerConfig.getRegistry(); if (CommonUtils.isNotEmpty(registryConfigs)) { for (RegistryConfig registryConfig : registryConfigs) { RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry } } } // 将处理器注册到server for (ServerConfig serverConfig : serverConfigs) { try { //构建Server Server server = serverConfig.buildIfAbsent(); // 注册序列化接口 server.registerProcessor(providerConfig, providerProxyInvoker); if (serverConfig.isAutoStart()) { //启动服务 server.start(); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { LOGGER.errorWithApp(appName, "Catch exception when register processor to server: " + serverConfig.getId(), e); } } // 注册到注册中心 providerConfig.setConfigListener(new ProviderAttributeListener()); register(); } catch (Exception e) { decrementCounter(hasExportedInCurrent); if (e instanceof SofaRpcRuntimeException) { throw (SofaRpcRuntimeException) e; } else { throw new SofaRpcRuntimeException("Build provider proxy error!", e); } } // 记录一些缓存数据 RpcRuntimeContext.cacheProviderConfig(this); exported = true; }
代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。
/** * 启动服务 * * @return the server */ public synchronized Server buildIfAbsent() { if (server != null) { return server; } // 提前检查协议+序列化方式 // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()), // SerializationType.valueOf(getSerialization())); //在sever工厂中拿到sever实例 server = ServerFactory.getServer(this); return server; }
/** * 初始化Server实例 * * @param serverConfig 服务端配置 * @return Server */ public synchronized static Server getServer(ServerConfig serverConfig) { try { Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort())); if (server == null) { // 算下网卡和端口 resolveServerConfig(serverConfig); ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class) .getExtensionClass(serverConfig.getProtocol()); if (ext == null) { throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(), "Unsupported protocol of server!"); } server = ext.getExtInstance(); //服务初始化 server.init(serverConfig); SERVER_MAP.put(serverConfig.getPort() + "", server); } return server; } catch (SofaRpcRuntimeException e) { throw e; } catch (Throwable e) { throw new SofaRpcRuntimeException(e.getMessage(), e); } }
sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer
BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。
/** * Bolt服务端 */ protected RemotingServer remotingServer; @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } // 生成阿里基于netty的bolt服务Server对象 remotingServer = initRemotingServer(); try { if (remotingServer.start(serverConfig.getBoundHost())) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(), serverConfig.getPort()); } } else { throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log."); } started = true; if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start bolt server!", e); } } }
AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的
/** * 服务端通讯层 */ private ServerTransport serverTransport; @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; this.serverTransportConfig = convertConfig(serverConfig); // 启动线程池 this.bizThreadPool = initThreadPool(serverConfig); // 服务端处理器 this.serverHandler = new HttpServerHandler(); // set default transport config this.serverTransportConfig.setContainer(container); this.serverTransportConfig.setServerHandler(serverHandler); } @Override public void start() { if (started) { return; } synchronized (this) { if (started) { return; } try { // 启动线程池 this.bizThreadPool = initThreadPool(serverConfig); this.serverHandler.setBizThreadPool(bizThreadPool); //实例化服务,具体代码见 serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig); started = serverTransport.start(); if (started) { if (EventBus.isEnable(ServerStartedEvent.class)) { EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool)); } } } catch (SofaRpcRuntimeException e) { throw e; } catch (Exception e) { throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e); } } }
ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport
/** * 构造函数 * * @param transportConfig 服务端配置 */ protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) { super(transportConfig); } @Override public boolean start() { if (serverBootstrap != null) { return true; } synchronized (this) { if (serverBootstrap != null) { return true; } boolean flag = false; SslContext sslCtx = SslContextBuilder.build(); // Configure the server. EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig); //可以看到然是基于Netty HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler(); bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool()); serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, bizGroup) .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog()) .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr()) .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator()) .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator()) .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive()) .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay()) .childOption(ChannelOption.SO_RCVBUF, 8192 * 128) .childOption(ChannelOption.SO_SNDBUF, 8192 * 128) .handler(new LoggingHandler(LogLevel.DEBUG)) .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator()) .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark( transportConfig.getBufferMin(), transportConfig.getBufferMax())) .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx, httpServerHandler, transportConfig.getPayload())); // 绑定到全部网卡 或者 指定网卡 ChannelFuture future = serverBootstrap.bind( new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort())); ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("HTTP/2 Server bind to {}:{} success!", transportConfig.getHost(), transportConfig.getPort()); } } else { LOGGER.error("HTTP/2 Server bind to {}:{} failed!", transportConfig.getHost(), transportConfig.getPort()); stop(); } } }); try { channelFuture.await(); if (channelFuture.isSuccess()) { flag = Boolean.TRUE; } else { throw new SofaRpcRuntimeException("Server start fail!", future.cause()); } } catch (InterruptedException e) { LOGGER.error(e.getMessage(), e); } return flag; } }
RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。
/** * Rest服务端 */ protected SofaNettyJaxrsServer httpServer; @Override public void init(ServerConfig serverConfig) { this.serverConfig = serverConfig; httpServer = buildServer(); }
SofaNettyJaxrsServer中服务启动的具体代码
@Override public void start() { // CHANGE: 增加线程名字 boolean daemon = serverConfig.isDaemon(); boolean isEpoll = serverConfig.isEpoll(); NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon); NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon); eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory) : new NioEventLoopGroup(ioWorkerCount, ioFactory); eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory) : new NioEventLoopGroup(executorThreadCount, bizFactory); // Configure the server. bootstrap = new ServerBootstrap() .group(eventLoopGroup) .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class) .childHandler(createChannelInitializer()) .option(ChannelOption.SO_BACKLOG, backlog) .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) { bootstrap.option(entry.getKey(), entry.getValue()); } for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) { bootstrap.childOption(entry.getKey(), entry.getValue()); } final InetSocketAddress socketAddress; if (null == hostname || hostname.isEmpty()) { socketAddress = new InetSocketAddress(port); } else { socketAddress = new InetSocketAddress(hostname, port); } bootstrap.bind(socketAddress).syncUninterruptibly(); }
OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。
Atas ialah kandungan terperinci sofa-rpc服务端源码的详细分析(附流程图). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Alat AI Hot

Undresser.AI Undress
Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover
Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool
Gambar buka pakaian secara percuma

Clothoff.io
Penyingkiran pakaian AI

AI Hentai Generator
Menjana ai hentai secara percuma.

Artikel Panas

Alat panas

Notepad++7.3.1
Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina
Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1
Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6
Alat pembangunan web visual

SublimeText3 versi Mac
Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Topik panas



Kelas kelas Java melibatkan pemuatan, menghubungkan, dan memulakan kelas menggunakan sistem hierarki dengan bootstrap, lanjutan, dan pemuat kelas aplikasi. Model delegasi induk memastikan kelas teras dimuatkan dahulu, yang mempengaruhi LOA kelas tersuai

Artikel ini membincangkan pelaksanaan caching pelbagai peringkat di Java menggunakan kafein dan cache jambu untuk meningkatkan prestasi aplikasi. Ia meliputi persediaan, integrasi, dan faedah prestasi, bersama -sama dengan Pengurusan Dasar Konfigurasi dan Pengusiran PRA Terbaik

Artikel ini membincangkan menggunakan JPA untuk pemetaan objek-relasi dengan ciri-ciri canggih seperti caching dan pemuatan malas. Ia meliputi persediaan, pemetaan entiti, dan amalan terbaik untuk mengoptimumkan prestasi sambil menonjolkan potensi perangkap. [159 aksara]

Artikel ini membincangkan menggunakan Maven dan Gradle untuk Pengurusan Projek Java, membina automasi, dan resolusi pergantungan, membandingkan pendekatan dan strategi pengoptimuman mereka.

Artikel ini membincangkan membuat dan menggunakan perpustakaan Java tersuai (fail balang) dengan pengurusan versi dan pergantungan yang betul, menggunakan alat seperti Maven dan Gradle.
