Netty のスレッド モデルは主に React に基づいており、さまざまなアプリケーション シナリオに応じて複数のバージョンに進化しました。
つまり、サービスリクエストの受信とIO操作の実行がすべて1つのスレッドで完了します。IO多重化などのノンブロッキングIO操作が使用されるため、リクエスト量が少ない場合にシングルスレッドモードが使用されます。いくつかのシーンの問題も解決できます。
リクエストの数が増加すると、すべてのIO操作を処理する元の1つのスレッドが対応するパフォーマンス指標をサポートできなくなるため、ワーカースレッドプールの概念が言及されます。この時点では、サービスリクエストを受信したスレッドはまだスレッドです。リクエストを受信した後、リクエストを受信したスレッドは後続のワーカースレッドプールに割り当てられ、スレッドプールからスレッドを取得してユーザーリクエストを実行します。
リクエスト量がさらに増加すると、サービスリクエストを受信する単一のスレッドではすべてのクライアント接続を処理できなくなるため、サービスリクエストを受信するスレッドもスレッドプールに拡張されます、複数のスレッドで構成されます。スレッドはクライアント接続の受信も担当します。
上記は、リクエスト量の増加に伴い継続的に開発されてきた Netty 独自のスレッド モデル、最適化戦略です。アプリケーション システムの場合、RPC リクエストは主にビジネス ロジックの処理に関するものであり、この種のビジネスは、たとえば、ほとんどのアプリケーションでデータベース操作、Redis、またはその他のネットワーク サービスを伴う場合があります。ビジネス リクエストにこのような時間のかかる IO 操作がある場合は、ビジネス リクエストを処理するタスクを独立したスレッド プールに割り当てることをお勧めします。そうしないと、netty 自体のスレッドがブロックされる可能性があります。
リクエスト受信スレッドと作業スレッドの分業
リクエスト受信スレッドは主にリンクの作成と、その後のリクエストの作業スレッドへの委任を担当します
作業スレッドは、エンコード、デコード、IO の読み取り、その他の操作を担当します
現在、私が実装している RPC は、サーバー上でこのようにマルチレシーバー マルチワーカー スレッド モードを使用しています。 Side:
public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.rpcServerInitializer) .childOption(ChannelOption.SO_KEEPALIVE,true) ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {throw new RpcException(e); } }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
boosGroupはサービスリクエストを受信するために使用されるグループです
workerGroupはIO操作を特に担当するグループです
ビジネススレッドを追加するには、ハンドルの操作をスレッドにさらに委任するだけです拡張するには、ここでインターフェイスを定義する必要があります:
public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues); }
ダボ スレッド プールを参照しました
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) { executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //...} }); } } }return executor; } }
幕間:
かつて友人が突然、Java スレッド プールの coreSize は何を意味するのかと尋ねたのを覚えています。私は普段マルチスレッドを書かないので、突然ショートしました。私が普段よく使用するデータベーススレッドプールのことを考えると、そのパラメータに非常に感銘を受けましたが、まったく思い出せません。コアサイズ。その後、スレッド プールのいくつかのパラメータを詳しく調べました。この機会にもう一度ショートを避けるために詳しく見てみることができます。
複数のスレッド プール実装がある場合、スレッド プールはスレッド プール名を通じて動的に選択されます。
@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName); } }
メソッド本体をタスクにラップし、実行のためにスレッド プールに渡します。
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest)); channelHandlerContext.writeAndFlush(response); } }); }
現在、ストレステストが不足しているため、当面は明確なデータ比較がありません。
以上がNetty スレッド モデルの詳細な例の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。