Rumah Java javaTutorial sofa-rpc服务端源码的详细分析(附流程图)

sofa-rpc服务端源码的详细分析(附流程图)

Oct 10, 2018 am 11:27 AM

本篇文章给大家带来的内容是关于sofa-rpc服务端源码的详细分析(附流程图),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。

sofa-rpc是阿里开源的一款高性能的rpc框架,这篇文章主要是对sofa-rpc provider启动服务流程的一个代码走读,下面是我简单绘制的一个基本的关系流程图

下面我们根据sofa-rpc代码,对流程进行一个跟踪与走读。我们以BoltServer的为例

    public static void main(String[] args) {
        ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

        ServerConfig serverConfig = new ServerConfig()
            .setPort(22000)
            .setDaemon(false);

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())
            .setApplication(application)
            .setRef(new HelloServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
            .setInterfaceId(EchoService.class.getName())
            .setApplication(application)
            .setRef(new EchoServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        providerConfig.export();
        providerConfig2.export();

        LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
    }
Salin selepas log masuk

可以看到sofa-rpc通过ProviderConfig类对服务提供方Provider进行了配置信息的初始化,同时也提供了export做为服务启动的入口。

    public synchronized void export() {        
    if (providerBootstrap == null) {
            providerBootstrap = Bootstraps.from(this);
        }
        providerBootstrap.export();
    }
Salin selepas log masuk

<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>
Salin selepas log masuk

/**
     * 发布一个服务
     *
     * @param providerConfig 服务发布者配置
     * @param <T>            接口类型
     * @return 发布启动类
     */
    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
        String bootstrap = providerConfig.getBootstrap();
        if (StringUtils.isEmpty(bootstrap)) {
            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
            providerConfig.setBootstrap(bootstrap);
        }
        ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
            .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
        return (ProviderBootstrap<T>) providerBootstrap;
    }
Salin selepas log masuk

DefaultProviderBootstrap与 DubboProviderBootstrap 都继承自ProviderBootstrap。

DefaultProviderBootstrap又被BoltProviderBootstrap、Http2ClearTextProviderBootstrap、RestProviderBootstrap三个类所继承,这其实对应了sofa-rpc中的三种server服务方式。

我们看下DefaultProviderBootstrap服务启动源码

  @Override
    public void export() {
        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(providerConfig.getDelay());
                    } catch (Throwable ignore) { // NOPMD
                    }
                    doExport();
                }
            });
            thread.start();
        } else {
            doExport();
        }
    }

    private void doExport() {
        if (exported) {
            return;
        }

        // 检查参数
        checkParameters();

        String appName = providerConfig.getAppName();

        //key  is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            String protocol = serverConfig.getProtocol();

            String key = providerConfig.buildKey() + ":" + protocol;

            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
            }

            // 注意同一interface,同一uniqleId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            hasExportedInCurrent.put(serverConfig.getProtocol(), true);
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    decrementCounter(hasExportedInCurrent);
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
                        + " has been exported more than " + maxProxyCount + " times!"
                        + " Maybe it&#39;s wrong config, please check it."
                        + " Ignore this if you did that on purpose!");
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                            + " Maybe it&#39;s wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key);
                    }
                }
            }

        }

        try {
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) {
                try {
                    //构建Server
                    Server server = serverConfig.buildIfAbsent();
                    // 注册序列化接口
                    server.registerProcessor(providerConfig, providerProxyInvoker);
                    if (serverConfig.isAutoStart()) {
                        //启动服务
                        server.start();
                    }

                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                        + serverConfig.getId(), e);
                }
            }

            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
        } catch (Exception e) {
            decrementCounter(hasExportedInCurrent);

            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException("Build provider proxy error!", e);
            }
        }

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }
Salin selepas log masuk

代码中通过serverConfig.buildIfAbsent()构建Server服务对象,而在buildIfAbsent()函数中我们可以看到,sever是通过SeverFactory工厂获取到的,在SeverFactory的getSever()中根据SeverConfig的配置获取Sever的具体实例,并执行Init()进行初始化。

    /**
     * 启动服务
     *
     * @return the server
     */
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        // 提前检查协议+序列化方式
        // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
        //                SerializationType.valueOf(getSerialization()));
        
        //在sever工厂中拿到sever实例
        server = ServerFactory.getServer(this);
        return server;
    }
Salin selepas log masuk

/**
     * 初始化Server实例
     *
     * @param serverConfig 服务端配置
     * @return Server
     */
    public synchronized static Server getServer(ServerConfig serverConfig) {
        try {
            Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
            if (server == null) {
                // 算下网卡和端口
                resolveServerConfig(serverConfig);

                ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
                    .getExtensionClass(serverConfig.getProtocol());
                if (ext == null) {
                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
                        "Unsupported protocol of server!");
                }
                server = ext.getExtInstance();
                //服务初始化
                server.init(serverConfig);
                SERVER_MAP.put(serverConfig.getPort() + "", server);
            }
            return server;
        } catch (SofaRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new SofaRpcRuntimeException(e.getMessage(), e);
        }
    }
Salin selepas log masuk

sofa-rpc提供了三种server类型 BoltServer,RestServer与AbstractHttpServer

BoltServer中通讯底层通过RemotingServer实现的,RemotingServer是基于阿里sofa-bolt通信框架开发的。

  /**
     * Bolt服务端
     */
    protected RemotingServer       remotingServer;

   @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            // 生成阿里基于netty的bolt服务Server对象 
            remotingServer = initRemotingServer();
            try {
                if (remotingServer.start(serverConfig.getBoundHost())) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
                            serverConfig.getPort());
                    }
                } else {
                    throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
                }
                started = true;

                if (EventBus.isEnable(ServerStartedEvent.class)) {
                    EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                }

            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
            }
        }
    }
Salin selepas log masuk

AbstractHttpServer 提供http服务,底层通信通过ServerTransport类实现的

 /**
     * 服务端通讯层
     */
    private ServerTransport         serverTransport;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        this.serverTransportConfig = convertConfig(serverConfig);
        // 启动线程池
        this.bizThreadPool = initThreadPool(serverConfig);
        // 服务端处理器
        this.serverHandler = new HttpServerHandler();

        // set default transport config
        this.serverTransportConfig.setContainer(container);
        this.serverTransportConfig.setServerHandler(serverHandler);
    }

    @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            try {
                // 启动线程池
                this.bizThreadPool = initThreadPool(serverConfig);
                this.serverHandler.setBizThreadPool(bizThreadPool);
                //实例化服务,具体代码见
                serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
                started = serverTransport.start();

                if (started) {
                    if (EventBus.isEnable(ServerStartedEvent.class)) {
                        EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                    }
                }
            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
            }
        }
    }
Salin selepas log masuk

ServerTransport是个抽象类,具体实现为transport包下AbstractHttp2ServerTransport

  /**
     * 构造函数
     *
     * @param transportConfig 服务端配置
     */
    protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
        super(transportConfig);
    }

 @Override
    public boolean start() {
        if (serverBootstrap != null) {
            return true;
        }
        synchronized (this) {
            if (serverBootstrap != null) {
                return true;
            }
            boolean flag = false;
            SslContext sslCtx = SslContextBuilder.build();

            // Configure the server.
            EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);

            //可以看到然是基于Netty
            HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
            bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, bizGroup)
                .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
                .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
                .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
                .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
                .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    transportConfig.getBufferMin(), transportConfig.getBufferMax()))
                .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
                    httpServerHandler, transportConfig.getPayload()));

            // 绑定到全部网卡 或者 指定网卡
            ChannelFuture future = serverBootstrap.bind(
                new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
            ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("HTTP/2 Server bind to {}:{} success!",
                                transportConfig.getHost(), transportConfig.getPort());
                        }
                    } else {
                        LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
                            transportConfig.getHost(), transportConfig.getPort());
                        stop();
                    }
                }
            });

            try {
                channelFuture.await();
                if (channelFuture.isSuccess()) {
                    flag = Boolean.TRUE;
                } else {
                    throw new SofaRpcRuntimeException("Server start fail!", future.cause());
                }
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return flag;
        }
    }
Salin selepas log masuk

RestServer 提供Rest服务,底层通信实现具体可见SofaNettyJaxrsServer。

  /**
     * Rest服务端
     */
    protected SofaNettyJaxrsServer httpServer;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        httpServer = buildServer();
    }
Salin selepas log masuk

SofaNettyJaxrsServer中服务启动的具体代码

@Override
    public void start() {
        // CHANGE: 增加线程名字
        boolean daemon = serverConfig.isDaemon();
        boolean isEpoll = serverConfig.isEpoll();
        NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
        NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
        eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
            : new NioEventLoopGroup(ioWorkerCount, ioFactory);
        eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
            : new NioEventLoopGroup(executorThreadCount, bizFactory);
        // Configure the server.
        bootstrap = new ServerBootstrap()
            .group(eventLoopGroup)
            .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .childHandler(createChannelInitializer())
            .option(ChannelOption.SO_BACKLOG, backlog)
            .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

        for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
            bootstrap.option(entry.getKey(), entry.getValue());
        }

        for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {
            bootstrap.childOption(entry.getKey(), entry.getValue());
        }

        final InetSocketAddress socketAddress;
        if (null == hostname || hostname.isEmpty()) {
            socketAddress = new InetSocketAddress(port);
        } else {
            socketAddress = new InetSocketAddress(hostname, port);
        }

        bootstrap.bind(socketAddress).syncUninterruptibly();
    }
Salin selepas log masuk

 OK,以上就是sofa-rpc服务端启动的一个基本的流程,这里关注的只是简单的服务启动流程,没有深入代码功能进行分析,在此基础上,我们可以进一步探究代码的具体实现。

Atas ialah kandungan terperinci sofa-rpc服务端源码的详细分析(附流程图). Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn

Alat AI Hot

Undresser.AI Undress

Undresser.AI Undress

Apl berkuasa AI untuk mencipta foto bogel yang realistik

AI Clothes Remover

AI Clothes Remover

Alat AI dalam talian untuk mengeluarkan pakaian daripada foto.

Undress AI Tool

Undress AI Tool

Gambar buka pakaian secara percuma

Clothoff.io

Clothoff.io

Penyingkiran pakaian AI

AI Hentai Generator

AI Hentai Generator

Menjana ai hentai secara percuma.

Artikel Panas

R.E.P.O. Kristal tenaga dijelaskan dan apa yang mereka lakukan (kristal kuning)
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Tetapan grafik terbaik
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Cara Memperbaiki Audio Jika anda tidak dapat mendengar sesiapa
4 minggu yang lalu By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: Cara Membuka Segala -galanya Di Myrise
1 bulan yang lalu By 尊渡假赌尊渡假赌尊渡假赌

Alat panas

Notepad++7.3.1

Notepad++7.3.1

Editor kod yang mudah digunakan dan percuma

SublimeText3 versi Cina

SublimeText3 versi Cina

Versi Cina, sangat mudah digunakan

Hantar Studio 13.0.1

Hantar Studio 13.0.1

Persekitaran pembangunan bersepadu PHP yang berkuasa

Dreamweaver CS6

Dreamweaver CS6

Alat pembangunan web visual

SublimeText3 versi Mac

SublimeText3 versi Mac

Perisian penyuntingan kod peringkat Tuhan (SublimeText3)

Bagaimanakah mekanisme kelas muatan Java berfungsi, termasuk kelas yang berbeza dan model delegasi mereka? Bagaimanakah mekanisme kelas muatan Java berfungsi, termasuk kelas yang berbeza dan model delegasi mereka? Mar 17, 2025 pm 05:35 PM

Kelas kelas Java melibatkan pemuatan, menghubungkan, dan memulakan kelas menggunakan sistem hierarki dengan bootstrap, lanjutan, dan pemuat kelas aplikasi. Model delegasi induk memastikan kelas teras dimuatkan dahulu, yang mempengaruhi LOA kelas tersuai

Bagaimanakah saya melaksanakan caching pelbagai peringkat dalam aplikasi java menggunakan perpustakaan seperti kafein atau cache jambu? Bagaimanakah saya melaksanakan caching pelbagai peringkat dalam aplikasi java menggunakan perpustakaan seperti kafein atau cache jambu? Mar 17, 2025 pm 05:44 PM

Artikel ini membincangkan pelaksanaan caching pelbagai peringkat di Java menggunakan kafein dan cache jambu untuk meningkatkan prestasi aplikasi. Ia meliputi persediaan, integrasi, dan faedah prestasi, bersama -sama dengan Pengurusan Dasar Konfigurasi dan Pengusiran PRA Terbaik

Bagaimanakah saya boleh menggunakan JPA (Java Constence API) untuk pemetaan objek-objek dengan ciri-ciri canggih seperti caching dan malas malas? Bagaimanakah saya boleh menggunakan JPA (Java Constence API) untuk pemetaan objek-objek dengan ciri-ciri canggih seperti caching dan malas malas? Mar 17, 2025 pm 05:43 PM

Artikel ini membincangkan menggunakan JPA untuk pemetaan objek-relasi dengan ciri-ciri canggih seperti caching dan pemuatan malas. Ia meliputi persediaan, pemetaan entiti, dan amalan terbaik untuk mengoptimumkan prestasi sambil menonjolkan potensi perangkap. [159 aksara]

Bagaimanakah saya menggunakan Maven atau Gradle untuk Pengurusan Projek Java Lanjutan, Membina Automasi, dan Resolusi Ketergantungan? Bagaimanakah saya menggunakan Maven atau Gradle untuk Pengurusan Projek Java Lanjutan, Membina Automasi, dan Resolusi Ketergantungan? Mar 17, 2025 pm 05:46 PM

Artikel ini membincangkan menggunakan Maven dan Gradle untuk Pengurusan Projek Java, membina automasi, dan resolusi pergantungan, membandingkan pendekatan dan strategi pengoptimuman mereka.

Bagaimanakah saya membuat dan menggunakan perpustakaan Java Custom (fail JAR) dengan pengurusan versi dan pergantungan yang betul? Bagaimanakah saya membuat dan menggunakan perpustakaan Java Custom (fail JAR) dengan pengurusan versi dan pergantungan yang betul? Mar 17, 2025 pm 05:45 PM

Artikel ini membincangkan membuat dan menggunakan perpustakaan Java tersuai (fail balang) dengan pengurusan versi dan pergantungan yang betul, menggunakan alat seperti Maven dan Gradle.

See all articles