public class TimeServer {
public void bind(int port) {
try {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel arg0) throws Exception {
System.out.println("初始化");
arg0.pipeline().addLast(new TimeHandler());
}
});
ChannelFuture future = b.bind(port).sync();
System.out.println("执行这里");
future.channel().closeFuture().sync();
System.out.println("执行这里");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
new TimeServer().bind(10000);
}
}
public class TimeHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
ByteBuf buf=(ByteBuf) msg;
//// byte[] butfs = buf.array();//报错
System.out.println(buf.readableBytes());
byte[] butfs = new byte[buf.readableBytes()];
buf.readBytes(butfs);
System.out.println(new String(butfs,"UTF-8"));
System.out.println(msg);
}
}
客户端使用的是BIO的模型:
public static void main(String[] args) throws Exception {
final int port = 10000;
// NioServer server = new NioServer(port);
// server.init();
/// ========================================================
// 接下来模拟3个Client并发访问服务器
int poolsize = 1;
ExecutorService pool = Executors.newFixedThreadPool(poolsize);
Collection<Callable> tasks = new ArrayList<Callable>(10);
final String clientname = "clientThread";
for (int i = 0; i < poolsize; i++) {
final int n = i;
// 若每一个Client都保持使用BIO方式发送数据到Server,并读取数据。
tasks.add(new Callable() {
@Override
public Object call() throws Exception {
Socket socket = new Socket("127.0.0.1", port);
final InputStream input = socket.getInputStream();
final OutputStream out = socket.getOutputStream();
final String clientname_n = clientname + "_" + n;
// BIO读取数据线程
new Thread(clientname_n + "_read") {
@Override
public void run() {
byte[] bs = new byte[1024];
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
int len = 0;
try {
while ((len = input.read(bs)) != -1) {
System.out.println("Clinet thread " + Thread.currentThread().getName()
+ " read: " + new String(bs, 0, len));
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
// BIO写数据线程
new Thread(clientname_n + "_write") {
@Override
public void run() {
int a = 0;
while (true) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
String str = Thread.currentThread().getName() + " hello, " + a;
try {
out.write(str.getBytes());
out.flush();
a++;
} catch (IOException e) {
e.printStackTrace();
}
}
}
}.start();
return null;
}
});
}
pool.invokeAll((Collection<? extends Callable<Object>>) tasks);
// server.go();
}
结果运行的时候出现了以下错误:
月 13, 2017 5:52:56 下午 io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
警告: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.util.IllegalReferenceCountException: refCnt: 0
at io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1408)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1394)
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1383)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:850)
at io.netty.buffer.AbstractByteBuf.readBytes(AbstractByteBuf.java:858)
at test.netty.TimeHandler.channelRead(TimeHandler.java:17)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:134)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:624)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:559)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:476)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:438)
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)
at java.lang.Thread.run(Thread.java:745)
请问这是为什么呢?
Dalam netty4, kitaran hayat objek dikawal oleh pembilang rujukan Ini adalah kes dengan ByteBuf Kiraan rujukan awal setiap objek ialah 1. Apabila kaedah keluaran dipanggil sekali, pembilang rujukan akan dikurangkan. oleh 1. Apabila cuba mengakses objek dengan pembilang 0, Apabila, IllegalReferenceCountException akan dilemparkan, sama seperti pelaksanaan ensureAccessible Untuk penjelasan yang lebih terperinci, sila rujuk dokumentasi rasmi
AbstractByteBuf.java
Perhatikan baris kod super.channelRead(ctx, msg); dalam kelas TimeHandler TZ. Jejaki laluan panggilan,
Kod terakhir yang dipanggil ialah: ReferenceCountUtil.release(msg)
Iaitu, setiap kali super.channelRead(ctx, msg);, ByteBuf akan memanggil kaedah release(), pembilang akan dikurangkan dengan satu, dan kemudian dalam buf.readBytes(butfs); sahkan ensureAccessible() , kaunter ialah 0, netty berpendapat bahawa objek ByteBuf telah dikeluarkan, dan membuang pengecualian.
Penyelesaian:
Alih keluar baris kod ini dalam TimeHandler super.channelRead(ctx, msg);
Sesiapa sahaja yang memproses objek ByteBuf melepaskannya.
Acara exceptionCaught() telah dipecat, dan ia sampai ke hujung saluran paip. Ia biasanya bermakna pengendali terakhir dalam saluran paip tidak mengendalikan pengecualian.
缺少exceptionCaught(),在在server端最后一个Handler中增加exceptionCaught()