首页 Java java教程 sofa-rpc服务端源码的详细分析(附流程图)

sofa-rpc服务端源码的详细分析(附流程图)

Oct 10, 2018 am 11:27 AM

本篇文章给大家带来的内容是关于sofa-rpc服务端源码的详细分析(附流程图),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图

下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例

    public static void main(String[] args) {
        ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

        ServerConfig serverConfig = new ServerConfig()
            .setPort(22000)
            .setDaemon(false);

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())
            .setApplication(application)
            .setRef(new HelloServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
            .setInterfaceId(EchoService.class.getName())
            .setApplication(application)
            .setRef(new EchoServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        providerConfig.export();
        providerConfig2.export();

        LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
    }
登录后复制

可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。

    public synchronized void export() {        
    if (providerBootstrap == null) {
            providerBootstrap = Bootstraps.from(this);
        }
        providerBootstrap.export();
    }
登录后复制

<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>
登录后复制

/**
     * 发布一个服务
     *
     * @param providerConfig 服务发布者配置
     * @param <T>            接口类型
     * @return 发布启动类
     */
    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
        String bootstrap = providerConfig.getBootstrap();
        if (StringUtils.isEmpty(bootstrap)) {
            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
            providerConfig.setBootstrap(bootstrap);
        }
        ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
            .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
        return (ProviderBootstrap<T>) providerBootstrap;
    }
登录后复制

DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。

DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。

我们看下DefaultProviderBootstrap服务启动源码

  @Override
    public void export() {
        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(providerConfig.getDelay());
                    } catch (Throwable ignore) { // NOPMD
                    }
                    doExport();
                }
            });
            thread.start();
        } else {
            doExport();
        }
    }

    private void doExport() {
        if (exported) {
            return;
        }

        // 检查参数
        checkParameters();

        String appName = providerConfig.getAppName();

        //key  is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            String protocol = serverConfig.getProtocol();

            String key = providerConfig.buildKey() + ":" + protocol;

            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
            }

            // 注意同一interface,同一uniqleId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            hasExportedInCurrent.put(serverConfig.getProtocol(), true);
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    decrementCounter(hasExportedInCurrent);
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
                        + " has been exported more than " + maxProxyCount + " times!"
                        + " Maybe it&#39;s wrong config, please check it."
                        + " Ignore this if you did that on purpose!");
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                            + " Maybe it&#39;s wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key);
                    }
                }
            }

        }

        try {
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) {
                try {
                    //构建Server
                    Server server = serverConfig.buildIfAbsent();
                    // 注册序列化接口
                    server.registerProcessor(providerConfig, providerProxyInvoker);
                    if (serverConfig.isAutoStart()) {
                        //启动服务
                        server.start();
                    }

                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                        + serverConfig.getId(), e);
                }
            }

            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
        } catch (Exception e) {
            decrementCounter(hasExportedInCurrent);

            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException("Build provider proxy error!", e);
            }
        }

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }
登录后复制

代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。

    /**
     * 启动服务
     *
     * @return the server
     */
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        // 提前检查协议+序列化方式
        // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
        //                SerializationType.valueOf(getSerialization()));
        
        //在sever工厂中拿到sever实例
        server = ServerFactory.getServer(this);
        return server;
    }
登录后复制

/**
     * 初始化Server实例
     *
     * @param serverConfig 服务端配置
     * @return Server
     */
    public synchronized static Server getServer(ServerConfig serverConfig) {
        try {
            Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
            if (server == null) {
                // 算下网卡和端口
                resolveServerConfig(serverConfig);

                ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
                    .getExtensionClass(serverConfig.getProtocol());
                if (ext == null) {
                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
                        "Unsupported protocol of server!");
                }
                server = ext.getExtInstance();
                //服务初始化
                server.init(serverConfig);
                SERVER_MAP.put(serverConfig.getPort() + "", server);
            }
            return server;
        } catch (SofaRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new SofaRpcRuntimeException(e.getMessage(), e);
        }
    }
登录后复制

sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer

BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。

  /**
     * Bolt服务端
     */
    protected RemotingServer       remotingServer;

   @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            // 生成阿里基于netty的bolt服务Server对象 
            remotingServer = initRemotingServer();
            try {
                if (remotingServer.start(serverConfig.getBoundHost())) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
                            serverConfig.getPort());
                    }
                } else {
                    throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
                }
                started = true;

                if (EventBus.isEnable(ServerStartedEvent.class)) {
                    EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                }

            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
            }
        }
    }
登录后复制

AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的

 /**
     * 服务端通讯层
     */
    private ServerTransport         serverTransport;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        this.serverTransportConfig = convertConfig(serverConfig);
        // 启动线程池
        this.bizThreadPool = initThreadPool(serverConfig);
        // 服务端处理器
        this.serverHandler = new HttpServerHandler();

        // set default transport config
        this.serverTransportConfig.setContainer(container);
        this.serverTransportConfig.setServerHandler(serverHandler);
    }

    @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            try {
                // 启动线程池
                this.bizThreadPool = initThreadPool(serverConfig);
                this.serverHandler.setBizThreadPool(bizThreadPool);
                //实例化服务,具体代码见
                serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
                started = serverTransport.start();

                if (started) {
                    if (EventBus.isEnable(ServerStartedEvent.class)) {
                        EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                    }
                }
            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
            }
        }
    }
登录后复制

ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport

  /**
     * 构造函数
     *
     * @param transportConfig 服务端配置
     */
    protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
        super(transportConfig);
    }

 @Override
    public boolean start() {
        if (serverBootstrap != null) {
            return true;
        }
        synchronized (this) {
            if (serverBootstrap != null) {
                return true;
            }
            boolean flag = false;
            SslContext sslCtx = SslContextBuilder.build();

            // Configure the server.
            EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);

            //可以看到然是基于Netty
            HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
            bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, bizGroup)
                .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
                .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
                .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
                .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
                .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    transportConfig.getBufferMin(), transportConfig.getBufferMax()))
                .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
                    httpServerHandler, transportConfig.getPayload()));

            // 绑定到全部网卡 或者 指定网卡
            ChannelFuture future = serverBootstrap.bind(
                new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
            ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("HTTP/2 Server bind to {}:{} success!",
                                transportConfig.getHost(), transportConfig.getPort());
                        }
                    } else {
                        LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
                            transportConfig.getHost(), transportConfig.getPort());
                        stop();
                    }
                }
            });

            try {
                channelFuture.await();
                if (channelFuture.isSuccess()) {
                    flag = Boolean.TRUE;
                } else {
                    throw new SofaRpcRuntimeException("Server start fail!", future.cause());
                }
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return flag;
        }
    }
登录后复制

RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。

  /**
     * Rest服务端
     */
    protected SofaNettyJaxrsServer httpServer;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        httpServer = buildServer();
    }
登录后复制

SofaNettyJaxrsServer中服务启动的具体代码

@Override
    public void start() {
        // CHANGE: 增加线程名字
        boolean daemon = serverConfig.isDaemon();
        boolean isEpoll = serverConfig.isEpoll();
        NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
        NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
        eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
            : new NioEventLoopGroup(ioWorkerCount, ioFactory);
        eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
            : new NioEventLoopGroup(executorThreadCount, bizFactory);
        // Configure the server.
        bootstrap = new ServerBootstrap()
            .group(eventLoopGroup)
            .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .childHandler(createChannelInitializer())
            .option(ChannelOption.SO_BACKLOG, backlog)
            .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

        for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
            bootstrap.option(entry.getKey(), entry.getValue());
        }

        for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {
            bootstrap.childOption(entry.getKey(), entry.getValue());
        }

        final InetSocketAddress socketAddress;
        if (null == hostname || hostname.isEmpty()) {
            socketAddress = new InetSocketAddress(port);
        } else {
            socketAddress = new InetSocketAddress(hostname, port);
        }

        bootstrap.bind(socketAddress).syncUninterruptibly();
    }
登录后复制

 OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。

以上是sofa-rpc服务端源码的详细分析(附流程图)的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
4 周前 By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

Java的类负载机制如何起作用,包括不同的类载荷及其委托模型? Java的类负载机制如何起作用,包括不同的类载荷及其委托模型? Mar 17, 2025 pm 05:35 PM

Java的类上载涉及使用带有引导,扩展程序和应用程序类负载器的分层系统加载,链接和初始化类。父代授权模型确保首先加载核心类别,从而影响自定义类LOA

如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存? 如何使用咖啡因或Guava Cache等库在Java应用程序中实现多层缓存? Mar 17, 2025 pm 05:44 PM

本文讨论了使用咖啡因和Guava缓存在Java中实施多层缓存以提高应用程序性能。它涵盖设置,集成和绩效优势,以及配置和驱逐政策管理最佳PRA

如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射? 如何将JPA(Java持久性API)用于具有高级功能(例如缓存和懒惰加载)的对象相关映射? Mar 17, 2025 pm 05:43 PM

本文讨论了使用JPA进行对象相关映射,并具有高级功能,例如缓存和懒惰加载。它涵盖了设置,实体映射和优化性能的最佳实践,同时突出潜在的陷阱。[159个字符]

如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案? 如何将Maven或Gradle用于高级Java项目管理,构建自动化和依赖性解决方案? Mar 17, 2025 pm 05:46 PM

本文讨论了使用Maven和Gradle进行Java项目管理,构建自动化和依赖性解决方案,以比较其方法和优化策略。

如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)? 如何使用适当的版本控制和依赖项管理创建和使用自定义Java库(JAR文件)? Mar 17, 2025 pm 05:45 PM

本文使用Maven和Gradle之类的工具讨论了具有适当的版本控制和依赖关系管理的自定义Java库(JAR文件)的创建和使用。

See all articles