public class TimeServer {
public void bind(int port) {
try {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
System.out.println("初始化");
arg0.pipeline().addLast(new TimeHandler());
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("执行这里");
future.channel().closeFuture().sync();
System.out.println("执行这里");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TimeServer().bind(10000);
}
}
public class TimeHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
ByteBuf buf=(ByteBuf) msg;
//// byte[] butfs = buf.array();//报错
System.out.println(buf.readableBytes());
byte[] butfs = new byte[buf.readableBytes()];
buf.readBytes(butfs);
System.out.println(new String(butfs,"UTF-8"));
System.out.println(msg);
}
}
客戶端使用的是BIO的模型:
public static void main(String[] args) throws Exception {
final int port = 10000;
// NioServer server = new NioServer(port);
// server.init();
/// ========================================================
// 接下来模拟3个Client并发访问服务器
int poolsize = 1;
ExecutorService pool = Executors.newFixedThreadPool(poolsize);
Collection<Callable> tasks = new ArrayList<Callable>(10);
final String clientname = "clientThread";
for (int i = 0; i < poolsize; i++) {
final int n = i;
// 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。
tasks.add(new Callable() {
@Override
public Object call() throws Exception {
Socket socket = new Socket("127.0.0.1", port);
final InputStream input = socket.getInputStream();
final OutputStream out = socket.getOutputStream();
final String clientname_n = clientname + "_" + n;
// BIO读取数据线程
new Thread(clientname_n + "_read") {
@Override
public void run() {
byte[] bs = new byte[1024];
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int len = 0;
try {
while ((len = input.read(bs)) != -1) {
System.out.println("Clinet thread " + Thread.currentThread().getName()
+ " read: " + new String(bs, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
// BIO写数据线程
new Thread(clientname_n + "_write") {
@Override
public void run() {
int a = 0;
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String str = Thread.currentThread().getName() + " hello, " + a;
try {
out.write(str.getBytes());
out.flush();
a++;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
return null;
}
});
}
pool.invokeAll((Collection<? extends Callable<Object>>) tasks);
// server.go();
}
結果運行的時候出現了以下錯誤:
月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858)
at test.netty.TimeHandler.channelRead(TimeHandler.java:17)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
請問這是為什麼呢?
在netty4中,物件的生命週期由引用計數器控制,ByteBuf就是如此,每個物件的初始化引用計數為1,呼叫一次release方法,引用計數器會減1,當嘗試存取計數器為0的,物件時,會拋出IllegalReferenceCountException,正如ensureAccessible的實現,更加詳細的解釋可以參考官方文件
AbstractByteBuf.java
注意TZ的TimeHandler類別中的 super.channelRead(ctx, msg);這行程式碼。追蹤呼叫路徑,
最後呼叫的程式碼是:ReferenceCountUtil.release(msg)
也就是每次super.channelRead(ctx, msg);後,ByteBuf就會呼叫release()方法,計數器減一,然後在buf.readBytes(butfs);這行程式碼就會校驗ensureAccessible(),計數器為0,netty認為ByteBuf物件已經釋放,就拋出異常。
解決方案:
去掉TimeHandler中這行程式碼 super.channelRead(ctx, msg);
ByteBuf物件誰處理誰釋放。
異常捕獲()事件被觸發,並且到達管道的尾部。通常意味著管道中的最後一個處理程序沒有處理異常。
缺少ExceptionCaught(),在伺服器端最後一個Handler中增加了ExceptionCaught()