Maison > Java > javaDidacticiel > Analyse détaillée du code source du serveur sofa-rpc (avec organigramme)

Analyse détaillée du code source du serveur sofa-rpc (avec organigramme)

不言
Libérer: 2018-10-10 11:27:34
avant
3853 Les gens l'ont consulté

Ce que cet article vous apporte est une analyse détaillée du code source du serveur sofa-rpc (avec organigramme ci-joint). Il a une certaine valeur de référence. Les amis dans le besoin peuvent s'y référer. .

sofa-rpc est un framework RPC open source hautes performances fourni par Alibaba. Cet article est principalement une revue de code du processus de service de démarrage du fournisseur sofa-rpc. Ce qui suit est un processus relationnel de base que j'ai simplement dessiné. Figure

Ci-dessous, nous suivrons et lirons le processus en fonction du code du canapé-rpc. En prenant BoltServer comme exemple

    public static void main(String[] args) {
        ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

        ServerConfig serverConfig = new ServerConfig()
            .setPort(22000)
            .setDaemon(false);

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName())
            .setApplication(application)
            .setRef(new HelloServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()
            .setInterfaceId(EchoService.class.getName())
            .setApplication(application)
            .setRef(new EchoServiceImpl())
            .setServer(serverConfig)
            .setRegister(false);

        providerConfig.export();
        providerConfig2.export();

        LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);
    }
Copier après la connexion

nous pouvons voir que sofa-rpc initialise les informations de configuration du fournisseur de services Provider via la classe ProviderConfig, et également Export est fourni comme point d’entrée pour le démarrage du service.

    public synchronized void export() {        
    if (providerBootstrap == null) {
            providerBootstrap = Bootstraps.from(this);
        }
        providerBootstrap.export();
    }
Copier après la connexion

<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>
Copier après la connexion

/**
     * 发布一个服务
     *
     * @param providerConfig 服务发布者配置
     * @param <T>            接口类型
     * @return 发布启动类
     */
    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
        String bootstrap = providerConfig.getBootstrap();
        if (StringUtils.isEmpty(bootstrap)) {
            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
            providerConfig.setBootstrap(bootstrap);
        }
        ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)
            .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });
        return (ProviderBootstrap<T>) providerBootstrap;
    }
Copier après la connexion

DefaultProviderBootstrap et DubboProviderBootstrap héritent tous deux de ProviderBootstrap.

DefaultProviderBootstrap est hérité par trois classes : BoltProviderBootstrap, Http2ClearTextProviderBootstrap et RestProviderBootstrap. Cela correspond en fait aux trois méthodes de service serveur dans sofa-rpc.

Jetons un coup d'œil au code source de démarrage du service DefaultProviderBootstrap

  @Override
    public void export() {
        if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
            Thread thread = factory.newThread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(providerConfig.getDelay());
                    } catch (Throwable ignore) { // NOPMD
                    }
                    doExport();
                }
            });
            thread.start();
        } else {
            doExport();
        }
    }

    private void doExport() {
        if (exported) {
            return;
        }

        // 检查参数
        checkParameters();

        String appName = providerConfig.getAppName();

        //key  is the protocol of server,for concurrent safe
        Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
        // 将处理器注册到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            String protocol = serverConfig.getProtocol();

            String key = providerConfig.buildKey() + ":" + protocol;

            if (LOGGER.isInfoEnabled(appName)) {
                LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
            }

            // 注意同一interface,同一uniqleId,不同server情况
            AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
            if (cnt == null) { // 没有发布过
                cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
            }
            int c = cnt.incrementAndGet();
            hasExportedInCurrent.put(serverConfig.getProtocol(), true);
            int maxProxyCount = providerConfig.getRepeatedExportLimit();
            if (maxProxyCount > 0) {
                if (c > maxProxyCount) {
                    decrementCounter(hasExportedInCurrent);
                    // 超过最大数量,直接抛出异常
                    throw new SofaRpcRuntimeException("Duplicate provider config with key " + key
                        + " has been exported more than " + maxProxyCount + " times!"
                        + " Maybe it&#39;s wrong config, please check it."
                        + " Ignore this if you did that on purpose!");
                } else if (c > 1) {
                    if (LOGGER.isInfoEnabled(appName)) {
                        LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"
                            + " Maybe it&#39;s wrong config, please check it."
                            + " Ignore this if you did that on purpose!", key);
                    }
                }
            }

        }

        try {
            // 构造请求调用器
            providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
            // 初始化注册中心
            if (providerConfig.isRegister()) {
                List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
                if (CommonUtils.isNotEmpty(registryConfigs)) {
                    for (RegistryConfig registryConfig : registryConfigs) {
                        RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                    }
                }
            }
            // 将处理器注册到server
            for (ServerConfig serverConfig : serverConfigs) {
                try {
                    //构建Server
                    Server server = serverConfig.buildIfAbsent();
                    // 注册序列化接口
                    server.registerProcessor(providerConfig, providerProxyInvoker);
                    if (serverConfig.isAutoStart()) {
                        //启动服务
                        server.start();
                    }

                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Exception e) {
                    LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "
                        + serverConfig.getId(), e);
                }
            }

            // 注册到注册中心
            providerConfig.setConfigListener(new ProviderAttributeListener());
            register();
        } catch (Exception e) {
            decrementCounter(hasExportedInCurrent);

            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException("Build provider proxy error!", e);
            }
        }

        // 记录一些缓存数据
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }
Copier après la connexion

Dans le code, l'objet de service Server est construit via serverConfig .buildIfAbsent(), et dans la fonction buildIfAbsent(), nous pouvons voir que le serveur est obtenu via la fabrique SeverFactory. Dans le getSever() de SeverFactory, l'instance spécifique de Sever est obtenue selon la configuration de SeverConfig, et Init(). est exécuté pour l’initialisation.

    /**
     * 启动服务
     *
     * @return the server
     */
    public synchronized Server buildIfAbsent() {
        if (server != null) {
            return server;
        }
        // 提前检查协议+序列化方式
        // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),
        //                SerializationType.valueOf(getSerialization()));
        
        //在sever工厂中拿到sever实例
        server = ServerFactory.getServer(this);
        return server;
    }
Copier après la connexion

/**
     * 初始化Server实例
     *
     * @param serverConfig 服务端配置
     * @return Server
     */
    public synchronized static Server getServer(ServerConfig serverConfig) {
        try {
            Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));
            if (server == null) {
                // 算下网卡和端口
                resolveServerConfig(serverConfig);

                ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)
                    .getExtensionClass(serverConfig.getProtocol());
                if (ext == null) {
                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),
                        "Unsupported protocol of server!");
                }
                server = ext.getExtInstance();
                //服务初始化
                server.init(serverConfig);
                SERVER_MAP.put(serverConfig.getPort() + "", server);
            }
            return server;
        } catch (SofaRpcRuntimeException e) {
            throw e;
        } catch (Throwable e) {
            throw new SofaRpcRuntimeException(e.getMessage(), e);
        }
    }
Copier après la connexion

sofa-rpc propose trois types de serveurs : BoltServer, RestServer et AbstractHttpServer

La couche inférieure de communication dans BoltServer est implémentée via RemotingServer, qui est développé sur la base du cadre de communication canapé-bolt d'Alibaba.

  /**
     * Bolt服务端
     */
    protected RemotingServer       remotingServer;

   @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            // 生成阿里基于netty的bolt服务Server对象 
            remotingServer = initRemotingServer();
            try {
                if (remotingServer.start(serverConfig.getBoundHost())) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),
                            serverConfig.getPort());
                    }
                } else {
                    throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");
                }
                started = true;

                if (EventBus.isEnable(ServerStartedEvent.class)) {
                    EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                }

            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start bolt server!", e);
            }
        }
    }
Copier après la connexion

AbstractHttpServer fournit un service http et la communication sous-jacente est implémentée via la classe ServerTransport

 /**
     * 服务端通讯层
     */
    private ServerTransport         serverTransport;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        this.serverTransportConfig = convertConfig(serverConfig);
        // 启动线程池
        this.bizThreadPool = initThreadPool(serverConfig);
        // 服务端处理器
        this.serverHandler = new HttpServerHandler();

        // set default transport config
        this.serverTransportConfig.setContainer(container);
        this.serverTransportConfig.setServerHandler(serverHandler);
    }

    @Override
    public void start() {
        if (started) {
            return;
        }
        synchronized (this) {
            if (started) {
                return;
            }
            try {
                // 启动线程池
                this.bizThreadPool = initThreadPool(serverConfig);
                this.serverHandler.setBizThreadPool(bizThreadPool);
                //实例化服务,具体代码见
                serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);
                started = serverTransport.start();

                if (started) {
                    if (EventBus.isEnable(ServerStartedEvent.class)) {
                        EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));
                    }
                }
            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);
            }
        }
    }
Copier après la connexion

ServerTransport est une classe abstraite. L'implémentation spécifique est AbstractHttp2ServerTransport

  /**
     * 构造函数
     *
     * @param transportConfig 服务端配置
     */
    protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {
        super(transportConfig);
    }

 @Override
    public boolean start() {
        if (serverBootstrap != null) {
            return true;
        }
        synchronized (this) {
            if (serverBootstrap != null) {
                return true;
            }
            boolean flag = false;
            SslContext sslCtx = SslContextBuilder.build();

            // Configure the server.
            EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);

            //可以看到然是基于Netty
            HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();
            bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

            serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup, bizGroup)
                .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())
                .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())
                .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())
                .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())
                .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())
                .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)
                .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)
                .handler(new LoggingHandler(LogLevel.DEBUG))
                .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())
                .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(
                    transportConfig.getBufferMin(), transportConfig.getBufferMax()))
                .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,
                    httpServerHandler, transportConfig.getPayload()));

            // 绑定到全部网卡 或者 指定网卡
            ChannelFuture future = serverBootstrap.bind(
                new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));
            ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        if (LOGGER.isInfoEnabled()) {
                            LOGGER.info("HTTP/2 Server bind to {}:{} success!",
                                transportConfig.getHost(), transportConfig.getPort());
                        }
                    } else {
                        LOGGER.error("HTTP/2 Server bind to {}:{} failed!",
                            transportConfig.getHost(), transportConfig.getPort());
                        stop();
                    }
                }
            });

            try {
                channelFuture.await();
                if (channelFuture.isSuccess()) {
                    flag = Boolean.TRUE;
                } else {
                    throw new SofaRpcRuntimeException("Server start fail!", future.cause());
                }
            } catch (InterruptedException e) {
                LOGGER.error(e.getMessage(), e);
            }
            return flag;
        }
    }
Copier après la connexion

RestServer fournit le service Rest sous le package de transport sous-jacent. la mise en œuvre de la communication peut être vue dans SofaNettyJaxrsServer.

  /**
     * Rest服务端
     */
    protected SofaNettyJaxrsServer httpServer;

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        httpServer = buildServer();
    }
Copier après la connexion

Le code spécifique pour le démarrage du service dans SofaNettyJaxrsServer

@Override
    public void start() {
        // CHANGE: 增加线程名字
        boolean daemon = serverConfig.isDaemon();
        boolean isEpoll = serverConfig.isEpoll();
        NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);
        NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);
        eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)
            : new NioEventLoopGroup(ioWorkerCount, ioFactory);
        eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)
            : new NioEventLoopGroup(executorThreadCount, bizFactory);
        // Configure the server.
        bootstrap = new ServerBootstrap()
            .group(eventLoopGroup)
            .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            .childHandler(createChannelInitializer())
            .option(ChannelOption.SO_BACKLOG, backlog)
            .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

        for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {
            bootstrap.option(entry.getKey(), entry.getValue());
        }

        for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {
            bootstrap.childOption(entry.getKey(), entry.getValue());
        }

        final InetSocketAddress socketAddress;
        if (null == hostname || hostname.isEmpty()) {
            socketAddress = new InetSocketAddress(port);
        } else {
            socketAddress = new InetSocketAddress(hostname, port);
        }

        bootstrap.bind(socketAddress).syncUninterruptibly();
    }
Copier après la connexion

OK, ce qui précède est un processus de base pour le démarrage du serveur sofa-rpc. Nous nous concentrons ici uniquement sur le simple processus de démarrage du service, sans analyse approfondie des fonctions du code. Sur cette base, nous pouvons explorer davantage l'implémentation spécifique du. code.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:cnblogs.com
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal