Home Java javaTutorial Detailed analysis of sofa-rpc server source code (with flow chart)

Detailed analysis of sofa-rpc server source code (with flow chart)

Oct 10, 2018 am 11:27 AM

This article brings you a detailed analysis of the sofa-rpc server source code (with flow chart attached). It has certain reference value. Friends in need can refer to it. I hope it will be helpful to you.

sofa-rpc is a high-performance RPC framework open sourced by Alibaba. This article is mainly a code reading of the sofa-rpc provider startup service process. The following is a basic relationship process that I simply drew. Figure

Below we will track and read the process based on the sofa-rpc code. We take BoltServer as an example

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

public static void main(String[] args) {

    ApplicationConfig application = new ApplicationConfig().setAppName("test-server");

 

    ServerConfig serverConfig = new ServerConfig()

        .setPort(22000)

        .setDaemon(false);

 

    ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()

        .setInterfaceId(HelloService.class.getName())

        .setApplication(application)

        .setRef(new HelloServiceImpl())

        .setServer(serverConfig)

        .setRegister(false);

 

    ProviderConfig<EchoService> providerConfig2 = new ProviderConfig<EchoService>()

        .setInterfaceId(EchoService.class.getName())

        .setApplication(application)

        .setRef(new EchoServiceImpl())

        .setServer(serverConfig)

        .setRegister(false);

 

    providerConfig.export();

    providerConfig2.export();

 

    LOGGER.warn("started at pid {}", RpcRuntimeContext.PID);

}

Copy after login

We can see that sofa-rpc initializes the configuration information of the service provider Provider through the ProviderConfig class, and also provides export serves as the entry point for service startup.

1

2

3

4

5

6

public synchronized void export() {       

if (providerBootstrap == null) {

        providerBootstrap = Bootstraps.from(this);

    }

    providerBootstrap.export();

}

Copy after login

1

<span style="font-size: 14px; font-family: "Microsoft YaHei"">根据ProviderConfig中setBootstrap()配置的Bootstrap类型,我们通过Bootstaps.from(this)</span><span style="font-size: 14px; font-family: "Microsoft YaHei"">可以获取到不同的Bootstrap引导服务,分别是DefaultProviderBootstrap与 DubboProviderBootstrap <br/></span>

Copy after login

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

/**

     * 发布一个服务

     *

     * @param providerConfig 服务发布者配置

     * @param <T>            接口类型

     * @return 发布启动类

     */

    public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {

        String bootstrap = providerConfig.getBootstrap();

        if (StringUtils.isEmpty(bootstrap)) {

            // Use default provider bootstrap 无的话就返回默认DefaultProviderBootstrap

            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);

            providerConfig.setBootstrap(bootstrap);

        }

        ProviderBootstrap providerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ProviderBootstrap.class)

            .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig });

        return (ProviderBootstrap<T>) providerBootstrap;

    }

Copy after login

DefaultProviderBootstrap and DubboProviderBootstrap both inherit from ProviderBootstrap.

DefaultProviderBootstrap is inherited by three classes: BoltProviderBootstrap, Http2ClearTextProviderBootstrap, and RestProviderBootstrap. This actually corresponds to the three server service methods in sofa-rpc.

Let’s take a look at the DefaultProviderBootstrap service startup source code

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

89

90

91

92

93

94

95

96

97

98

99

100

101

102

103

104

105

106

107

108

109

110

111

112

113

114

115

116

117

118

@Override

  public void export() {

      if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒

          Thread thread = factory.newThread(new Runnable() {

              @Override

              public void run() {

                  try {

                      Thread.sleep(providerConfig.getDelay());

                  } catch (Throwable ignore) { // NOPMD

                  }

                  doExport();

              }

          });

          thread.start();

      } else {

          doExport();

      }

  }

 

  private void doExport() {

      if (exported) {

          return;

      }

 

      // 检查参数

      checkParameters();

 

      String appName = providerConfig.getAppName();

 

      //key  is the protocol of server,for concurrent safe

      Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();

      // 将处理器注册到server

      List<ServerConfig> serverConfigs = providerConfig.getServer();

      for (ServerConfig serverConfig : serverConfigs) {

          String protocol = serverConfig.getProtocol();

 

          String key = providerConfig.buildKey() + ":" + protocol;

 

          if (LOGGER.isInfoEnabled(appName)) {

              LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());

          }

 

          // 注意同一interface,同一uniqleId,不同server情况

          AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器

          if (cnt == null) { // 没有发布过

              cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));

          }

          int c = cnt.incrementAndGet();

          hasExportedInCurrent.put(serverConfig.getProtocol(), true);

          int maxProxyCount = providerConfig.getRepeatedExportLimit();

          if (maxProxyCount > 0) {

              if (c > maxProxyCount) {

                  decrementCounter(hasExportedInCurrent);

                  // 超过最大数量,直接抛出异常

                  throw new SofaRpcRuntimeException("Duplicate provider config with key " + key

                      + " has been exported more than " + maxProxyCount + " times!"

                      + " Maybe it&#39;s wrong config, please check it."

                      + " Ignore this if you did that on purpose!");

              } else if (c > 1) {

                  if (LOGGER.isInfoEnabled(appName)) {

                      LOGGER.infoWithApp(appName, "Duplicate provider config with key {} has been exported!"

                          + " Maybe it&#39;s wrong config, please check it."

                          + " Ignore this if you did that on purpose!", key);

                  }

              }

          }

 

      }

 

      try {

          // 构造请求调用器

          providerProxyInvoker = new ProviderProxyInvoker(providerConfig);

          // 初始化注册中心

          if (providerConfig.isRegister()) {

              List<RegistryConfig> registryConfigs = providerConfig.getRegistry();

              if (CommonUtils.isNotEmpty(registryConfigs)) {

                  for (RegistryConfig registryConfig : registryConfigs) {

                      RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry

                  }

              }

          }

          // 将处理器注册到server

          for (ServerConfig serverConfig : serverConfigs) {

              try {

                  //构建Server

                  Server server = serverConfig.buildIfAbsent();

                  // 注册序列化接口

                  server.registerProcessor(providerConfig, providerProxyInvoker);

                  if (serverConfig.isAutoStart()) {

                      //启动服务

                      server.start();

                  }

 

              } catch (SofaRpcRuntimeException e) {

                  throw e;

              } catch (Exception e) {

                  LOGGER.errorWithApp(appName, "Catch exception when register processor to server: "

                      + serverConfig.getId(), e);

              }

          }

 

          // 注册到注册中心

          providerConfig.setConfigListener(new ProviderAttributeListener());

          register();

      } catch (Exception e) {

          decrementCounter(hasExportedInCurrent);

 

          if (e instanceof SofaRpcRuntimeException) {

              throw (SofaRpcRuntimeException) e;

          } else {

              throw new SofaRpcRuntimeException("Build provider proxy error!", e);

          }

      }

 

      // 记录一些缓存数据

      RpcRuntimeContext.cacheProviderConfig(this);

      exported = true;

  }

Copy after login

In the code, the Server service object is built through serverConfig.buildIfAbsent(), and in buildIfAbsent() In the function, we can see that sever is obtained through the SeverFactory factory. In the getSever() of SeverFactory, the specific instance of Sever is obtained according to the configuration of SeverConfig, and Init() is executed for initialization.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

/**

 * 启动服务

 *

 * @return the server

 */

public synchronized Server buildIfAbsent() {

    if (server != null) {

        return server;

    }

    // 提前检查协议+序列化方式

    // ConfigValueHelper.check(ProtocolType.valueOf(getProtocol()),

    //                SerializationType.valueOf(getSerialization()));

     

    //在sever工厂中拿到sever实例

    server = ServerFactory.getServer(this);

    return server;

}

Copy after login

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

/**

     * 初始化Server实例

     *

     * @param serverConfig 服务端配置

     * @return Server

     */

    public synchronized static Server getServer(ServerConfig serverConfig) {

        try {

            Server server = SERVER_MAP.get(Integer.toString(serverConfig.getPort()));

            if (server == null) {

                // 算下网卡和端口

                resolveServerConfig(serverConfig);

 

                ExtensionClass<Server> ext = ExtensionLoaderFactory.getExtensionLoader(Server.class)

                    .getExtensionClass(serverConfig.getProtocol());

                if (ext == null) {

                    throw ExceptionUtils.buildRuntime("server.protocol", serverConfig.getProtocol(),

                        "Unsupported protocol of server!");

                }

                server = ext.getExtInstance();

                //服务初始化

                server.init(serverConfig);

                SERVER_MAP.put(serverConfig.getPort() + "", server);

            }

            return server;

        } catch (SofaRpcRuntimeException e) {

            throw e;

        } catch (Throwable e) {

            throw new SofaRpcRuntimeException(e.getMessage(), e);

        }

    }

Copy after login

sofa-rpc provides three server types: BoltServer, RestServer and AbstractHttpServer

The bottom layer of communication in BoltServer is implemented through RemotingServer, which is developed based on Alibaba's sofa-bolt communication framework.

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

/**

   * Bolt服务端

   */

  protected RemotingServer       remotingServer;

 

 @Override

  public void start() {

      if (started) {

          return;

      }

      synchronized (this) {

          if (started) {

              return;

          }

          // 生成阿里基于netty的bolt服务Server对象

          remotingServer = initRemotingServer();

          try {

              if (remotingServer.start(serverConfig.getBoundHost())) {

                  if (LOGGER.isInfoEnabled()) {

                      LOGGER.info("Bolt server has been bind to {}:{}", serverConfig.getBoundHost(),

                          serverConfig.getPort());

                  }

              } else {

                  throw new SofaRpcRuntimeException("Failed to start bolt server, see more detail from bolt log.");

              }

              started = true;

 

              if (EventBus.isEnable(ServerStartedEvent.class)) {

                  EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));

              }

 

          } catch (SofaRpcRuntimeException e) {

              throw e;

          } catch (Exception e) {

              throw new SofaRpcRuntimeException("Failed to start bolt server!", e);

          }

      }

  }

Copy after login

AbstractHttpServer provides http service, and the underlying communication is implemented through the ServerTransport class

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

/**

    * 服务端通讯层

    */

   private ServerTransport         serverTransport;

 

   @Override

   public void init(ServerConfig serverConfig) {

       this.serverConfig = serverConfig;

       this.serverTransportConfig = convertConfig(serverConfig);

       // 启动线程池

       this.bizThreadPool = initThreadPool(serverConfig);

       // 服务端处理器

       this.serverHandler = new HttpServerHandler();

 

       // set default transport config

       this.serverTransportConfig.setContainer(container);

       this.serverTransportConfig.setServerHandler(serverHandler);

   }

 

   @Override

   public void start() {

       if (started) {

           return;

       }

       synchronized (this) {

           if (started) {

               return;

           }

           try {

               // 启动线程池

               this.bizThreadPool = initThreadPool(serverConfig);

               this.serverHandler.setBizThreadPool(bizThreadPool);

               //实例化服务,具体代码见

               serverTransport = ServerTransportFactory.getServerTransport(serverTransportConfig);

               started = serverTransport.start();

 

               if (started) {

                   if (EventBus.isEnable(ServerStartedEvent.class)) {

                       EventBus.post(new ServerStartedEvent(serverConfig, bizThreadPool));

                   }

               }

           } catch (SofaRpcRuntimeException e) {

               throw e;

           } catch (Exception e) {

               throw new SofaRpcRuntimeException("Failed to start HTTP/2 server!", e);

           }

       }

   }

Copy after login

ServerTransport is an abstract class. The specific implementation is AbstractHttp2ServerTransport

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

/**

    * 构造函数

    *

    * @param transportConfig 服务端配置

    */

   protected AbstractHttp2ServerTransport(ServerTransportConfig transportConfig) {

       super(transportConfig);

   }

 

@Override

   public boolean start() {

       if (serverBootstrap != null) {

           return true;

       }

       synchronized (this) {

           if (serverBootstrap != null) {

               return true;

           }

           boolean flag = false;

           SslContext sslCtx = SslContextBuilder.build();

 

           // Configure the server.

           EventLoopGroup bossGroup = NettyHelper.getServerBossEventLoopGroup(transportConfig);

 

           //可以看到然是基于Netty

           HttpServerHandler httpServerHandler = (HttpServerHandler) transportConfig.getServerHandler();

           bizGroup = NettyHelper.getServerBizEventLoopGroup(transportConfig, httpServerHandler.getBizThreadPool());

 

           serverBootstrap = new ServerBootstrap();

 

           serverBootstrap.group(bossGroup, bizGroup)

               .channel(transportConfig.isUseEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

               .option(ChannelOption.SO_BACKLOG, transportConfig.getBacklog())

               .option(ChannelOption.SO_REUSEADDR, transportConfig.isReuseAddr())

               .option(ChannelOption.RCVBUF_ALLOCATOR, NettyHelper.getRecvByteBufAllocator())

               .option(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())

               .childOption(ChannelOption.SO_KEEPALIVE, transportConfig.isKeepAlive())

               .childOption(ChannelOption.TCP_NODELAY, transportConfig.isTcpNoDelay())

               .childOption(ChannelOption.SO_RCVBUF, 8192 * 128)

               .childOption(ChannelOption.SO_SNDBUF, 8192 * 128)

               .handler(new LoggingHandler(LogLevel.DEBUG))

               .childOption(ChannelOption.ALLOCATOR, NettyHelper.getByteBufAllocator())

               .childOption(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(

                   transportConfig.getBufferMin(), transportConfig.getBufferMax()))

               .childHandler(new Http2ServerChannelInitializer(bizGroup, sslCtx,

                   httpServerHandler, transportConfig.getPayload()));

 

           // 绑定到全部网卡 或者 指定网卡

           ChannelFuture future = serverBootstrap.bind(

               new InetSocketAddress(transportConfig.getHost(), transportConfig.getPort()));

           ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() {

               @Override

               public void operationComplete(ChannelFuture future) throws Exception {

                   if (future.isSuccess()) {

                       if (LOGGER.isInfoEnabled()) {

                           LOGGER.info("HTTP/2 Server bind to {}:{} success!",

                               transportConfig.getHost(), transportConfig.getPort());

                       }

                   } else {

                       LOGGER.error("HTTP/2 Server bind to {}:{} failed!",

                           transportConfig.getHost(), transportConfig.getPort());

                       stop();

                   }

               }

           });

 

           try {

               channelFuture.await();

               if (channelFuture.isSuccess()) {

                   flag = Boolean.TRUE;

               } else {

                   throw new SofaRpcRuntimeException("Server start fail!", future.cause());

               }

           } catch (InterruptedException e) {

               LOGGER.error(e.getMessage(), e);

           }

           return flag;

       }

   }

Copy after login

RestServer under the transport package to provide Rest services. The underlying communication implementation can be seen in SofaNettyJaxrsServer.

1

2

3

4

5

6

7

8

9

10

/**

   * Rest服务端

   */

  protected SofaNettyJaxrsServer httpServer;

 

  @Override

  public void init(ServerConfig serverConfig) {

      this.serverConfig = serverConfig;

      httpServer = buildServer();

  }

Copy after login

The specific code for service startup in SofaNettyJaxrsServer

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

@Override

    public void start() {

        // CHANGE: 增加线程名字

        boolean daemon = serverConfig.isDaemon();

        boolean isEpoll = serverConfig.isEpoll();

        NamedThreadFactory ioFactory = new NamedThreadFactory("SEV-REST-IO-" + port, daemon);

        NamedThreadFactory bizFactory = new NamedThreadFactory("SEV-REST-BIZ-" + port, daemon);

        eventLoopGroup = isEpoll ? new EpollEventLoopGroup(ioWorkerCount, ioFactory)

            : new NioEventLoopGroup(ioWorkerCount, ioFactory);

        eventExecutor = isEpoll ? new EpollEventLoopGroup(executorThreadCount, bizFactory)

            : new NioEventLoopGroup(executorThreadCount, bizFactory);

        // Configure the server.

        bootstrap = new ServerBootstrap()

            .group(eventLoopGroup)

            .channel(isEpoll ? EpollServerSocketChannel.class : NioServerSocketChannel.class)

            .childHandler(createChannelInitializer())

            .option(ChannelOption.SO_BACKLOG, backlog)

            .childOption(ChannelOption.SO_KEEPALIVE, serverConfig.isKeepAlive()); // CHANGE: setKeepAlive

 

        for (Map.Entry<ChannelOption, Object> entry : channelOptions.entrySet()) {

            bootstrap.option(entry.getKey(), entry.getValue());

        }

 

        for (Map.Entry<ChannelOption, Object> entry : childChannelOptions.entrySet()) {

            bootstrap.childOption(entry.getKey(), entry.getValue());

        }

 

        final InetSocketAddress socketAddress;

        if (null == hostname || hostname.isEmpty()) {

            socketAddress = new InetSocketAddress(port);

        } else {

            socketAddress = new InetSocketAddress(hostname, port);

        }

 

        bootstrap.bind(socketAddress).syncUninterruptibly();

    }

Copy after login

OK, the above is sofa -A basic process for rpc server startup. What we focus on here is only the simple service startup process, without in-depth analysis of code functions. On this basis, we can further explore the specific implementation of the code.

The above is the detailed content of Detailed analysis of sofa-rpc server source code (with flow chart). For more information, please follow other related articles on the PHP Chinese website!

Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn

Hot AI Tools

Undresser.AI Undress

Undresser.AI Undress

AI-powered app for creating realistic nude photos

AI Clothes Remover

AI Clothes Remover

Online AI tool for removing clothes from photos.

Undress AI Tool

Undress AI Tool

Undress images for free

Clothoff.io

Clothoff.io

AI clothes remover

AI Hentai Generator

AI Hentai Generator

Generate AI Hentai for free.

Hot Article

R.E.P.O. Energy Crystals Explained and What They Do (Yellow Crystal)
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. Best Graphic Settings
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O. How to Fix Audio if You Can't Hear Anyone
3 weeks ago By 尊渡假赌尊渡假赌尊渡假赌
WWE 2K25: How To Unlock Everything In MyRise
4 weeks ago By 尊渡假赌尊渡假赌尊渡假赌

Hot Tools

Notepad++7.3.1

Notepad++7.3.1

Easy-to-use and free code editor

SublimeText3 Chinese version

SublimeText3 Chinese version

Chinese version, very easy to use

Zend Studio 13.0.1

Zend Studio 13.0.1

Powerful PHP integrated development environment

Dreamweaver CS6

Dreamweaver CS6

Visual web development tools

SublimeText3 Mac version

SublimeText3 Mac version

God-level code editing software (SublimeText3)

How does Java's classloading mechanism work, including different classloaders and their delegation models? How does Java's classloading mechanism work, including different classloaders and their delegation models? Mar 17, 2025 pm 05:35 PM

Java's classloading involves loading, linking, and initializing classes using a hierarchical system with Bootstrap, Extension, and Application classloaders. The parent delegation model ensures core classes are loaded first, affecting custom class loa

How do I implement multi-level caching in Java applications using libraries like Caffeine or Guava Cache? How do I implement multi-level caching in Java applications using libraries like Caffeine or Guava Cache? Mar 17, 2025 pm 05:44 PM

The article discusses implementing multi-level caching in Java using Caffeine and Guava Cache to enhance application performance. It covers setup, integration, and performance benefits, along with configuration and eviction policy management best pra

How can I use JPA (Java Persistence API) for object-relational mapping with advanced features like caching and lazy loading? How can I use JPA (Java Persistence API) for object-relational mapping with advanced features like caching and lazy loading? Mar 17, 2025 pm 05:43 PM

The article discusses using JPA for object-relational mapping with advanced features like caching and lazy loading. It covers setup, entity mapping, and best practices for optimizing performance while highlighting potential pitfalls.[159 characters]

How do I use Maven or Gradle for advanced Java project management, build automation, and dependency resolution? How do I use Maven or Gradle for advanced Java project management, build automation, and dependency resolution? Mar 17, 2025 pm 05:46 PM

The article discusses using Maven and Gradle for Java project management, build automation, and dependency resolution, comparing their approaches and optimization strategies.

How do I create and use custom Java libraries (JAR files) with proper versioning and dependency management? How do I create and use custom Java libraries (JAR files) with proper versioning and dependency management? Mar 17, 2025 pm 05:45 PM

The article discusses creating and using custom Java libraries (JAR files) with proper versioning and dependency management, using tools like Maven and Gradle.

See all articles