Netty의 스레딩 모델은 주로 React를 기반으로 하며 다양한 애플리케이션 시나리오로 인해 여러 버전으로 발전했습니다.
즉, 서비스 요청 수신과 IO 작업 수행이 모두 하나의 스레드로 완료됩니다. IO 다중화와 같은 비차단 IO 작업을 사용하므로 요청 볼륨이 없을 때 단일 스레드 모드가 사용됩니다. 또한 일부 장면 문제를 해결할 수도 있습니다.
요청 수가 증가하면 모든 IO 작업을 처리하는 원래 하나의 스레드가 점점 해당 성능 지표를 지원할 수 없게 되므로 작업자 스레드 풀의 개념이 언급됩니다. 이때 서비스 요청을 받는 것은 여전히 스레드입니다. 요청을 받은 후 요청을 받는 스레드는 후속 작업자 스레드 풀에 위임되고 스레드 풀에서 스레드를 가져와 사용자 요청을 실행합니다.
요청량이 더 많아지면 서비스 요청을 받는 단일 스레드가 모든 클라이언트 연결을 처리할 수 없으므로 서비스 요청을 받는 스레드도 스레드 풀로 확장됩니다. , 여러 개로 구성됨 스레드는 클라이언트 연결 수신도 담당합니다.
위에 언급된 내용은 요청량 증가에 따라 지속적으로 개발되어 온 Netty 자체 스레딩 모델, 최적화 전략입니다. 애플리케이션 시스템의 경우 RPC 요청은 주로 비즈니스 로직 처리에 관한 것이며 이러한 유형의 비즈니스는 계산 집약적이거나 IO 집약적일 수 있습니다. 예를 들어 대부분의 애플리케이션에는 데이터베이스 작업, Redis 또는 기타 네트워크 서비스 등이 수반됩니다. 비즈니스 요청에 이렇게 시간이 많이 걸리는 IO 작업이 있는 경우 비즈니스 요청을 처리하는 작업을 독립적인 스레드 풀에 할당하는 것이 좋습니다. 그렇지 않으면 netty 자체 스레드가 차단될 수 있습니다.
요청-수신 스레드와 작업 스레드 간의 작업 분담
수신-요청 스레드는 주로 링크를 생성한 다음 요청을 작업 스레드에 위임하는 역할을 담당합니다
작업 스레드는 인코딩, 디코딩, IO 읽기 및 기타 작업을 담당합니다.
제가 현재 구현하고 있는 RPC는 서버 측에서 다중 수신자 다중 작업자 스레드 모드를 채택하고 있습니다. :
public void bind(ServiceConfig serviceConfig) {EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(this.rpcServerInitializer) .childOption(ChannelOption.SO_KEEPALIVE,true) ;try {ChannelFuture channelFuture = bootstrap.bind(serviceConfig.getHost(),serviceConfig.getPort()).sync();//...channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) {throw new RpcException(e); } }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }
boosGroup은 서비스 요청을 받는 데 사용되는 그룹
workerGroup은 IO 작업을 특별히 담당하는 그룹입니다.
비즈니스 스레드를 추가하려면 스레드 풀에 핸들 작업을 추가로 위임하면 됩니다. . 확장하려면 여기에서 인터페이스를 정의해야 합니다.
public interface RpcThreadPool {Executor getExecutor(int threadSize,int queues); }
dubbo 스레드 풀 참조
@Qualifier("fixedRpcThreadPool")@Componentpublic class FixedRpcThreadPool implements RpcThreadPool {private Executor executor;@Overridepublic Executor getExecutor(int threadSize,int queues) {if(null==executor) {synchronized (this) {if(null==executor) { executor= new ThreadPoolExecutor(threadSize, threadSize, 0L, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<Runnable>() :(queues < 0 ? new LinkedBlockingQueue<Runnable>(): new LinkedBlockingQueue<Runnable>(queues)),new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //...} }); } } }return executor; } }
간간:
기억합니다. 친구가 갑자기 Java 스레드 풀의 coreSize가 무엇을 의미하는지 물었을 때? 평소에 멀티스레딩을 잘 쓰지 않아서 갑자기 쇼트가 났습니다. 평소에 많이 사용하는 데이터베이스 스레드 풀을 생각하면 그 안에 들어있는 매개변수가 꽤 인상적인데 기억이 나지 않을 뿐입니다. 코어 크기. 나중에 스레드 풀의 일부 매개변수를 자세히 살펴보았습니다. 이제 이 기회를 통해 다시 단락되는 것을 방지하기 위해 자세히 살펴볼 수 있습니다.
여러 스레드 풀 구현이 있는 경우 스레드 풀 이름을 통해 스레드 풀이 동적으로 선택됩니다.
@Componentpublic class RpcThreadPoolFactory {@Autowiredprivate Map<String,RpcThreadPool> rpcThreadPoolMap;public RpcThreadPool getThreadPool(String threadPoolName){return this.rpcThreadPoolMap.get(threadPoolName); } }
메서드 본문을 Task로 래핑하고 실행을 위해 스레드 풀에 넘겨줍니다.
@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) {this.executor.execute(new Runnable() {@Overridepublic void run() {RpcInvoker rpcInvoker=RpcServerInvoker.this.buildInvokerChain(RpcServerInvoker.this);RpcResponse response=(RpcResponse) rpcInvoker.invoke(RpcServerInvoker.this.buildRpcInvocation(rpcRequest)); channelHandlerContext.writeAndFlush(response); } }); }
현재 스트레스 테스트가 부족하여 아직 명확한 데이터 비교가 없습니다.
위 내용은 Netty 스레딩 모델의 자세한 예의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!