public class TimeServer {
public void bind(int port) {
try {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
System.out.println("初始化");
arg0.pipeline().addLast(new TimeHandler());
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("执行这里");
future.channel().closeFuture().sync();
System.out.println("执行这里");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TimeServer().bind(10000);
}
}
public class TimeHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
ByteBuf buf=(ByteBuf) msg;
//// byte[] butfs = buf.array();//报错
System.out.println(buf.readableBytes());
byte[] butfs = new byte[buf.readableBytes()];
buf.readBytes(butfs);
System.out.println(new String(butfs,"UTF-8"));
System.out.println(msg);
}
}
客户端使用的是BIO的模型:
public static void main(String[] args) throws Exception {
final int port = 10000;
// NioServer server = new NioServer(port);
// server.init();
/// ========================================================
// 接下来模拟3个Client并发访问服务器
int poolsize = 1;
ExecutorService pool = Executors.newFixedThreadPool(poolsize);
Collection<Callable> tasks = new ArrayList<Callable>(10);
final String clientname = "clientThread";
for (int i = 0; i < poolsize; i++) {
final int n = i;
// 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。
tasks.add(new Callable() {
@Override
public Object call() throws Exception {
Socket socket = new Socket("127.0.0.1", port);
final InputStream input = socket.getInputStream();
final OutputStream out = socket.getOutputStream();
final String clientname_n = clientname + "_" + n;
// BIO读取数据线程
new Thread(clientname_n + "_read") {
@Override
public void run() {
byte[] bs = new byte[1024];
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int len = 0;
try {
while ((len = input.read(bs)) != -1) {
System.out.println("Clinet thread " + Thread.currentThread().getName()
+ " read: " + new String(bs, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
// BIO写数据线程
new Thread(clientname_n + "_write") {
@Override
public void run() {
int a = 0;
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String str = Thread.currentThread().getName() + " hello, " + a;
try {
out.write(str.getBytes());
out.flush();
a++;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
return null;
}
});
}
pool.invokeAll((Collection<? extends Callable<Object>>) tasks);
// server.go();
}
结果运行的时候出现了以下错误:
月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858)
at test.netty.TimeHandler.channelRead(TimeHandler.java:17)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
请问这是为什么呢?
Dans netty4, le cycle de vie d'un objet est contrôlé par un compteur de références. C'est le cas de ByteBuf. Le nombre de références initial de chaque objet est de 1. Lorsque la méthode release est appelée une fois, le compteur de références sera décrémenté. par 1. Lorsque vous essayez d'accéder à un objet avec un compteur de 0, When, IllegalReferenceCountException sera levée, tout comme l'implémentation de EnsureAccessible. Pour une explication plus détaillée, veuillez vous référer à la documentation officielle
.AbstractByteBuf.java
Faites attention à la ligne de code super.channelRead(ctx, msg); dans la classe TimeHandler de TZ. Tracez le chemin de l'appel,
Le code final appelé est : ReferenceCountUtil.release(msg)
C'est-à-dire qu'à chaque fois que super.channelRead(ctx, msg);, ByteBuf appellera la méthode release(), le compteur sera décrémenté de un, puis dans buf.readBytes(butfs); vérifiez EnsureAccessible() , le compteur est 0, netty pense que l'objet ByteBuf a été libéré et lève une exception.
Solution :
Supprimez cette ligne de code dans TimeHandler super.channelRead(ctx, msg);
Celui qui traite l'objet ByteBuf le libère.
Un événement exceptionCaught() a été déclenché et a atteint la queue du pipeline. Cela signifie généralement que le dernier gestionnaire du pipeline n'a pas géré l'exception.