Home > Java > javaTutorial > body text

Detailed examples of Netty threading model

零下一度
Release: 2017-07-03 11:33:00
Original
3682 people have browsed it

Netty Threading Model

Netty’s threading model is mainly based on React, and has evolved into multiple versions due to different application scenarios.

Single-threaded mode

That is, receiving service requests and performing IO operations are all completed by one thread. Since non-blocking IO operations such as IO multiplexing are used, the amount of requests increases. In small cases, single-threaded mode can also solve some scene problems.

Single receiving multi-worker thread mode

When the number of requests increases, the original one thread processing all IO operations becomes increasingly unsupportable Corresponding performance indicators, so the concept of a working thread pool is mentioned. At this time, receiving the service request is still a thread. After receiving the request, the thread receiving the request will be entrusted to the subsequent working thread pool and obtain a thread from the thread pool for execution. User request.

Multiple receiving and multi-worker thread mode

When the request volume further increases, a single thread that receives service requests cannot handle all client connections, so The thread pool that receives service requests is also expanded, and multiple threads are responsible for receiving client connections at the same time.

RPC Business Thread

The above mentioned are Netty’s own threading models, optimization strategies that have been continuously developed with the increase in request volume. For RPC requests, the most important thing for application systems is the processing of business logic, and this type of business may be computationally intensive or IO-intensive. For example, most applications are accompanied by database operations, redis or other connections. Network services, etc. If there are such time-consuming IO operations in the business request, it is recommended to allocate the task of processing the business request to an independent thread pool, otherwise netty's own threads may be blocked.

Division of work between the receiving request thread and the working thread

  • The receiving request thread is mainly responsible for creating the link, and then Delegate the request to the worker thread

  • The worker thread is responsible for encoding, decoding, reading IO and other operations

Solution implementation

Currently I The implemented RPC adopts the multi-receiving and multi-worker thread mode. The port is bound on the server side like this:

public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(this.rpcServerInitializer)
                    .childOption(ChannelOption.SO_KEEPALIVE,true)
            ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync();


            } catch (InterruptedException e) {throw new RpcException(e);
            }
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
Copy after login

boosGroup is a group of ## used to receive service requests. #workerGroup is a group of people specifically responsible for IO operations.

To add business threads, you only need to further delegate the handle operations to the thread pool. For expansion, an interface needs to be defined here:

Define thread pool interface

public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues);
}
Copy after login
Implement fixed size thread pool

Refer to dubbo thread pool

@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) {
                    executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS,
                            queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {                                   //...}
                            });
                }
            }
        }return executor;
    }
}
Copy after login
Interlude:

I remember once a friend suddenly asked what the coreSize in the java thread pool meant? I was suddenly short-circuited, because I don't usually write multi-threading. When I think of the database thread pool that I usually use a lot, I am quite impressed by the parameters in it, but I just can't remember coreSize. Later, I took a closer look at some parameters of the thread pool. Now I can take this opportunity to take a closer look to avoid short-circuiting again.

Thread Pool Factory

When there are multiple thread pool implementations, the thread pool is dynamically selected through the thread pool name.

@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName);
    }
}
Copy after login
Modify the channelRead0 method of ChannelHandle

Wrap the method body into a Task and hand it over to the thread pool for execution.

@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest));
            channelHandlerContext.writeAndFlush(response);
        }
    });
}
Copy after login
Question

Currently there is a lack of stress testing, so there is no clear data comparison for the time being.

The above is the detailed content of Detailed examples of Netty threading model. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template