java - netty如何实现向客户端主动发送消息
ringa_lee
ringa_lee 2017-04-18 09:03:28
0
4
881

需求场景:
智能家居网关(以下简称gateway),需要和netty服务器通讯(以下简称netty),netty和gateway之间需要保持长连接(换句话说,netty和gateway之间都会主动给对方发送消息)

碰到的问题:
netty作为服务器端如何主动的向gateway发送消息,我尝试当每个gateway连接到netty(TCP/IP)时使用一个map把该channelSocket的id和该channelSocket绑定在一起

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        String uuid = ctx.channel().id().asLongText();
        GatewayService.addGatewayChannel(uuid, (SocketChannel)ctx.channel());
        System.out.println("a new connect come in: " + uuid);
    }

GatewayService其实就是一个ConcurrentHashMap

public class GatewayService {
    
    private static Map<String, SocketChannel> map = new ConcurrentHashMap<>();
    
    public static void addGatewayChannel(String id, SocketChannel gateway_channel){
        map.put(id, gateway_channel);
    }
    
    public static Map<String, SocketChannel> getChannels(){
        return map;
    }

    public static SocketChannel getGatewayChannel(String id){
        return map.get(id);
    }
    
    public static void removeGatewayChannel(String id){
        map.remove(id);
    }
}

我在服务器端尝试每间隔一段时间loop这个ConcurrentHashMap如果里面已经有绑定的channelSocket,就使用write方法向客户端发送消息

Runnable sendTask = new Runnable() {
            @Override
            public void run() {
                sendTaskLoop:
                    for(;;){
                        System.out.println("task is beginning...");
                        try{
                            Map<String, SocketChannel> map = GatewayService.getChannels();
                            Iterator<String> it = map.keySet().iterator();
                            while (it.hasNext()) {
                                String key = it.next();
                                SocketChannel obj = map.get(key);
                                System.out.println("channel id is: " + key);
                                System.out.println("channel: " + obj.isActive());
                                obj.writeAndFlush("hello, it is Server test header ping");
                            }
                        }catch(Exception e){break sendTaskLoop;}
                        try {
                            Thread.sleep(5000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }
        };
        new Thread(sendTask).start();

理论上客户端应该是可以接受到我发送的消息,但是我观察了一下源代码,发现writeAndFlush这个方法最终会被handler触发,于是我又在handler中覆写了write方法

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("write handler");
        ctx.writeAndFlush(msg);
    }

可是最终结果客户端并没有收到任何消息,请问netty如何主动向客户端发送消息?

ringa_lee
ringa_lee

ringa_lee

membalas semua(4)
阿神

Secara kasar boleh diagak bahawa pelayan tidak mempunyai pengekod, jadi ia tidak boleh dihantar sama sekali
Daripada kod yang dipos, anda dapat melihat bahawa terdapat dua cara untuk menghantar mesej kepada klien:

obj.writeAndFlush("hello, it is Server test header ping");

dan

ctx.writeAndFlush(msg);//Object

Dalam netty, semua entri dan keluar adalah ByteBuf Pengarang harus mengesahkan sama ada pelayan mempunyai pengekod yang sepadan dan menukar rentetan kepada ByteBuf.

巴扎黑

GatewayService tidak perlu dipaksa untuk menukar kepada jenis SocketChannel apabila menyimpan saluran
Tiada masalah untuk mendapatkan saluran dan melakukan writeAndFlush.
Untuk menyelesaikan masalah selanjutnya, anda boleh melakukan ini,
1 Pantau hasil Janji ctx.writeAndFlush(msg);, seperti

ctx.writeAndFlush(msg).addListener(new ChannelFutureListener() {

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        if (future.isSuccess()) {
            // do sth
        } else {
            // do sth
        }
    }
});

2. Perhatikan sama ada pengekod dan penyahkod pada klien dan pelayan adalah sama

Peter_Zhu

Adalah disyorkan untuk menggunakan ChannelGroup untuk mengurus pelanggan. Sebab mengapa pelayan pemimpin tidak boleh menolak data mungkin kerana tiada pengekod ctx.writeAndFlush tidak boleh menulis jenis rentetan secara langsung.

PHPzhong

Helo, kod anda boleh diubah suai seperti ini:
Kod Pengendali diubah suai seperti berikut:
@Override

public void channelActive(ChannelHandlerContext ctx) throws Exception {
    String uuid = ctx.channel().id().asLongText();
    GatewayService.addGatewayChannel(uuid, ctx);
    logger.info("a new connect come in: " + uuid);
    logger.info("a new connect come in remote ip: " + ctx.channel().remoteAddress());
}

Perkhidmatan Gerbang:

Perkhidmatan Gateway kelas awam {

private static Map<String, Object> map = new ConcurrentHashMap();
public static void addGatewayChannel(String id, ChannelHandlerContext gateway_ctx){
    map.put(id, gateway_ctx);
}
public static Map<String, Object> getChannels(){
    return map;
}

public static Object getGatewayChannel(String id){
    return map.get(id);
}

public static void removeGatewayChannel(String id){
    map.remove(id);
}

}

Kod untuk melintasi klien adalah seperti berikut:
kelas awam TimeTask melaksanakan Runnable{

private static final Log logger = LogFactory.getLog(TcpCMDHandler.class);
public static final String CHARSET = "UTF-8";
@Override
public void run() {
    sendTaskLoop:
    for(;;){
        System.out.println("task is beginning...");
        try{
            Map<String, Object> map = GatewayService.getChannels();
            Iterator<String> it = map.keySet().iterator();
            while (it.hasNext()) {
                String key = it.next();
                ChannelHandlerContext ctx = (ChannelHandlerContext)map.get(key);
                logger.info("channel id is: " + key);
                logger.info("channel: " + ctx.channel().isActive());

                String bak = "hello, every one!";
                byte[] responB = bak.getBytes(CHARSET);
    //            ByteBuf response = Unpooled.buffer(responB.length);

                final ByteBuf time = ctx.alloc().buffer(responB.length);
                time.writeBytes(responB);

                ctx.writeAndFlush(time); // (3)

// obj.writeAndFlush("hello, ia adalah ping pengepala ujian Pelayan");

            }
        }catch(Exception e){
            break
            sendTaskLoop;
        }
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

}

Itu sahaja. Saya sendiri telah mengujinya dan ia berfungsi

Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan