Netty’s threading model is mainly based on React, and has evolved into multiple versions due to different application scenarios.
That is, receiving service requests and performing IO operations are all completed by one thread. Since non-blocking IO operations such as IO multiplexing are used, the amount of requests increases. In small cases, single-threaded mode can also solve some scene problems.
When the number of requests increases, the original one thread processing all IO operations becomes increasingly unsupportable Corresponding performance indicators, so the concept of a working thread pool is mentioned. At this time, receiving the service request is still a thread. After receiving the request, the thread receiving the request will be entrusted to the subsequent working thread pool and obtain a thread from the thread pool for execution. User request.
When the request volume further increases, a single thread that receives service requests cannot handle all client connections, so The thread pool that receives service requests is also expanded, and multiple threads are responsible for receiving client connections at the same time.
The above mentioned are Netty’s own threading models, optimization strategies that have been continuously developed with the increase in request volume. For RPC requests, the most important thing for application systems is the processing of business logic, and this type of business may be computationally intensive or IO-intensive. For example, most applications are accompanied by database operations, redis or other connections. Network services, etc. If there are such time-consuming IO operations in the business request, it is recommended to allocate the task of processing the business request to an independent thread pool, otherwise netty's own threads may be blocked.
Division of work between the receiving request thread and the working thread
The receiving request thread is mainly responsible for creating the link, and then Delegate the request to the worker thread
The worker thread is responsible for encoding, decoding, reading IO and other operations
Currently I The implemented RPC adopts the multi-receiving and multi-worker thread mode. The port is bound on the server side like this:
public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.rpcServerInitializer) .childOption(ChannelOption.SO_KEEPALIVE,true) ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {throw new RpcException(e); } }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
To add business threads, you only need to further delegate the handle operations to the thread pool. For expansion, an interface needs to be defined here: Define thread pool interfaceboosGroup is a group of ## used to receive service requests. #workerGroup is a group of people specifically responsible for IO operations.
public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues); }
Refer to dubbo thread pool
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) { executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //...} }); } } }return executor; } }
Interlude:Thread Pool FactoryWhen there are multiple thread pool implementations, the thread pool is dynamically selected through the thread pool name.I remember once a friend suddenly asked what the coreSize in the java thread pool meant? I was suddenly short-circuited, because I don't usually write multi-threading. When I think of the database thread pool that I usually use a lot, I am quite impressed by the parameters in it, but I just can't remember coreSize. Later, I took a closer look at some parameters of the thread pool. Now I can take this opportunity to take a closer look to avoid short-circuiting again.
@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName); } }
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest)); channelHandlerContext.writeAndFlush(response); } }); }
The above is the detailed content of Detailed examples of Netty threading model. For more information, please follow other related articles on the PHP Chinese website!