這篇文章帶給大家的內容是關於SpringBoot整合Netty並使用Protobuf進行資料傳輸的實現過程,有一定的參考價值,有需要的朋友可以參考一下,希望對你有所幫助。
本篇文章主要介紹的是SpringBoot整合Netty以及使用Protobuf進行資料傳輸的相關內容。 Protobuf會簡單的介紹下用法,至於Netty在之前的文章中已經簡單的介紹過了,這裡就不再過多細說了。
protocolbuffer(以下簡稱PB)是google 的一種資料交換的格式,它獨立於語言,獨立於平台。 google 提供了多種語言的實作:java、c#、c 、go 和python,每一種實作都包含了對應語言的編譯器以及函式庫檔案。由於它是一種二進制的格式,比使用 xml進行資料交換快許多。可以把它用於分散式應用之間的資料通訊或異質環境下的資料交換。作為一種效率和相容性都很優秀的二進位資料傳輸格式,可以用於諸如網路傳輸、設定檔、資料儲存等許多領域。
官方網址:https://github.com/google/protobuf
這裡的使用就只介紹Java相關的使用。
首先我們需要建立一個proto文件,在該文件定義我們需要傳輸的文件。
例如我們需要定義一個使用者的信息,包含的欄位主要有編號、名稱、年齡。
那麼該protobuf檔案的格式如下:
註:這裡使用的是proto3,相關的註解我已經寫了,這裡便不再過多講述了。需要注意一點的是proto檔案和產生的Java檔案名稱不能一致!
syntax = "proto3"; // 生成的包名 option java_package="com.pancm.protobuf"; //生成的java名 option java_outer_classname = "UserInfo"; message UserMsg { // ID int32 id = 1; // 姓名 string name = 2; // 年龄 int32 age = 3; // 状态 int32 state = 4; }
建立好該檔案之後,我們把該檔案和protoc .exe(產生Java檔案的軟體)放到E碟目錄下的protobuf資料夾下,然後到該目錄的dos介面下輸入:protoc.exe --java_out=檔案絕對路徑名稱
。
例如:
protoc.exe --java_out=E:\protobuf User.proto
輸入完後,回車即可在同級目錄看到已經產生好的Java文件,然後將該文件放到專案中該文件指定的路徑下即可。
註:產生protobuf的檔案軟體和測試的protobuf檔案我也整合到該專案中了,可以直接取得的。
Java檔案產生好之後,我們再來看怎麼使用。
這裡我就直接貼程式碼了,並且將註解寫在程式碼中,應該更容易理解些。 。 。
程式碼範例:
// 按照定义的数据结构,创建一个对象 UserInfo.UserMsg.Builder userInfo = UserInfo.UserMsg.newBuilder(); userInfo.setId(1); userInfo.setName("xuwujing"); userInfo.setAge(18); UserInfo.UserMsg userMsg = userInfo.build(); // 将数据写到输出流 ByteArrayOutputStream output = new ByteArrayOutputStream(); userMsg.writeTo(output); // 将数据序列化后发送 byte[] byteArray = output.toByteArray(); // 接收到流并读取 ByteArrayInputStream input = new ByteArrayInputStream(byteArray); // 反序列化 UserInfo.UserMsg userInfo2 = UserInfo.UserMsg.parseFrom(input); System.out.println("id:" + userInfo2.getId()); System.out.println("name:" + userInfo2.getName()); System.out.println("age:" + userInfo2.getAge());
附註:這裡說明一點,因為protobuf是透過二進位傳輸,所以需要注意下對應的編碼。還有使用protobuf也需要注意一次傳輸的最大位元組長度。
輸出結果:
id:1 name:xuwujing age:18
說明:如果想直接取得工程那麼可以直接跳到底部,透過鏈接下載工程代碼。
環境需求
#JDK::1.8
Netty:: 4.0或以上(不含5)
Protobuf:3.0以上
如果對Netty不熟的話,可以看看我之前寫的一些文章。大神請無視~。 ~
位址:https://blog.csdn.net/column/details/17640.html
首先還是Maven的相關依賴:
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <java.version>1.8</java.version> <netty.version>4.1.22.Final</netty.version> <protobuf.version>3.5.1</protobuf.version> <springboot>1.5.9.RELEASE</springboot> <fastjson>1.2.41</fastjson> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <version>${springboot}</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <version>${springboot}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <version>${springboot}</version> <optional>true</optional> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>${netty.version}</version> </dependency> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>${protobuf.version}</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson}</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> </dependencies>
新增了對應的maven依賴之後,設定檔這塊暫時沒有什麼可以加的,因為暫時就一個監聽的埠而已。
程式碼模組主要分為服務端和客戶端。
主要實現的業務邏輯:
服務端啟動成功之後,客戶端也啟動成功,這時服務端會發送一條protobuf格式的訊息給客戶端,然後客戶端給予相應的應答。在客戶端與服務端連線成功之後,客戶端每個一段時間會發送心跳指令給服務端,告訴服務端該客戶端還存過中,如果客戶端沒有在指定的時間發送訊息,服務端會關閉與該客戶端的連線。當客戶端無法連接到服務端之後,會每隔一段時間去嘗試重連,只到重連成功!
首先是編寫服務端的啟動類,相應的註釋在程式碼中寫得很詳細了,這裡也不再多講了。不過要注意的是,在之前的我寫的Netty文章中,是透過main方法直接啟動服務端,因此是直接new一個物件的。而在和SpringBoot整合之後,我們需要將Netty交給springBoot去管理,所以這裡就用了相應的註解。
程式碼如下:
@Service("nettyServer") public class NettyServer { private static final int port = 9876; // 设置服务端端口 private static EventLoopGroup boss = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接 private static EventLoopGroup work = new NioEventLoopGroup(); // 通过nio方式来接收连接和处理连接 private static ServerBootstrap b = new ServerBootstrap(); @Autowired private NettyServerFilter nettyServerFilter; public void run() { try { b.group(boss, work); b.channel(NioServerSocketChannel.class); b.childHandler(nettyServerFilter); // 设置过滤器 // 服务器绑定端口监听 ChannelFuture f = b.bind(port).sync(); System.out.println("服务端启动成功,端口是:" + port); // 监听服务器关闭监听 f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { // 关闭EventLoopGroup,释放掉所有资源包括创建的线程 work.shutdownGracefully(); boss.shutdownGracefully(); } } }
服務端主類別寫完畢之後,我們再來設定下對應的篩選條件。
這裡需要繼承Netty中ChannelInitializer類,然後重寫initChannel該方法,進行添加相應的設置,如心跳超時設置,傳輸協議設置,以及相應的業務實現類。
程式碼如下:
@Component public class NettyServerFilter extends ChannelInitializer<SocketChannel> { @Autowired private NettyServerHandler nettyServerHandler; @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline ph = ch.pipeline(); //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式 ph.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); // 解码和编码,应和客户端一致 //传输的协议 Protobuf ph.addLast(new ProtobufVarint32FrameDecoder()); ph.addLast(new ProtobufDecoder(UserMsg.getDefaultInstance())); ph.addLast(new ProtobufVarint32LengthFieldPrepender()); ph.addLast(new ProtobufEncoder()); //业务逻辑实现类 ph.addLast("nettyServerHandler", nettyServerHandler); } }
服务相关的设置的代码写完之后,我们再来编写主要的业务代码。
使用Netty编写业务层的代码,我们需要继承ChannelInboundHandlerAdapter 或SimpleChannelInboundHandler类,在这里顺便说下它们两的区别吧。
继承SimpleChannelInboundHandler类之后,会在接收到数据后会自动release掉数据占用的Bytebuffer资源。并且继承该类需要指定数据格式。
而继承ChannelInboundHandlerAdapter则不会自动释放,需要手动调用ReferenceCountUtil.release()等方法进行释放。继承该类不需要指定数据格式。
所以在这里,个人推荐服务端继承ChannelInboundHandlerAdapter,手动进行释放,防止数据未处理完就自动释放了。而且服务端可能有多个客户端进行连接,并且每一个客户端请求的数据格式都不一致,这时便可以进行相应的处理。
客户端根据情况可以继承SimpleChannelInboundHandler类。好处是直接指定好传输的数据格式,就不需要再进行格式的转换了。
代码如下:
@Service("nettyServerHandler") public class NettyServerHandler extends ChannelInboundHandlerAdapter { /** 空闲次数 */ private int idle_count = 1; /** 发送次数 */ private int count = 1; /** * 建立连接时,发送一条消息 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress()); UserInfo.UserMsg userMsg = UserInfo.UserMsg.newBuilder().setId(1).setAge(18).setName("xuwujing").setState(0) .build(); ctx.writeAndFlush(userMsg); super.channelActive(ctx); } /** * 超时处理 如果5秒没有接受客户端的心跳,就触发; 如果超过两次,则直接关闭; */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.READER_IDLE.equals(event.state())) { // 如果读通道处于空闲状态,说明没有接收到心跳命令 System.out.println("已经5秒没有接收到客户端的信息了"); if (idle_count > 1) { System.out.println("关闭这个不活跃的channel"); ctx.channel().close(); } idle_count++; } } else { super.userEventTriggered(ctx, obj); } } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("第" + count + "次" + ",服务端接受的消息:" + msg); try { // 如果是protobuf类型的数据 if (msg instanceof UserMsg) { UserInfo.UserMsg userState = (UserInfo.UserMsg) msg; if (userState.getState() == 1) { System.out.println("客户端业务处理成功!"); } else if(userState.getState() == 2){ System.out.println("接受到客户端发送的心跳!"); }else{ System.out.println("未知命令!"); } } else { System.out.println("未知数据!" + msg); return; } } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } count++; } /** * 异常处理 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
还有个服务端的启动类,之前是通过main方法直接启动, 不过这里改成了通过springBoot进行启动,差别不大。
代码如下:
@SpringBootApplication public class NettyServerApp { public static void main(String[] args) { // 启动嵌入式的 Tomcat 并初始化 Spring 环境及其各 Spring 组件 ApplicationContext context = SpringApplication.run(NettyServerApp.class, args); NettyServer nettyServer = context.getBean(NettyServer.class); nettyServer.run(); } }
到这里服务端相应的代码就编写完毕了。
客户端这边的代码和服务端的很多地方都类似,我就不再过多细说了,主要将一些不同的代码拿出来简单的讲述下。
首先是客户端的主类,基本和服务端的差不多,也就是多了监听的端口和一个监听器(用来监听是否和服务端断开连接,用于重连)。
主要实现的代码逻辑如下:
public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup) { ChannelFuture f = null; try { if (bootstrap != null) { bootstrap.group(eventLoopGroup); bootstrap.channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(nettyClientFilter); bootstrap.remoteAddress(host, port); f = bootstrap.connect().addListener((ChannelFuture futureListener) -> { final EventLoop eventLoop = futureListener.channel().eventLoop(); if (!futureListener.isSuccess()) { System.out.println("与服务端断开连接!在10s之后准备尝试重连!"); eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS); } }); if(initFalg){ System.out.println("Netty客户端启动成功!"); initFalg=false; } // 阻塞 f.channel().closeFuture().sync(); } } catch (Exception e) { System.out.println("客户端连接失败!"+e.getMessage()); } }
注:监听器这块的实现用的是JDK1.8的写法。
客户端过滤其这块基本和服务端一直。不过需要注意的是,传输协议、编码和解码应该一致,还有心跳的读写时间应该小于服务端所设置的时间。
改动的代码如下:
ChannelPipeline ph = ch.pipeline(); /* * 解码和编码,应和服务端一致 * */ //入参说明: 读超时时间、写超时时间、所有类型的超时时间、时间格式 ph.addLast(new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS));
客户端的业务代码逻辑。
主要实现的几点逻辑是心跳按时发送以及解析服务发送的protobuf格式的数据。
这里比服务端多个个注解, 该注解Sharable主要是为了多个handler可以被多个channel安全地共享,也就是保证线程安全。
废话就不多说了,代码如下:
@Service("nettyClientHandler") @ChannelHandler.Sharable public class NettyClientHandler extends ChannelInboundHandlerAdapter { @Autowired private NettyClient nettyClient; /** 循环次数 */ private int fcount = 1; /** * 建立连接时 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("建立连接时:" + new Date()); ctx.fireChannelActive(); } /** * 关闭连接时 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("关闭连接时:" + new Date()); final EventLoop eventLoop = ctx.channel().eventLoop(); nettyClient.doConnect(new Bootstrap(), eventLoop); super.channelInactive(ctx); } /** * 心跳请求处理 每4秒发送一次心跳请求; * */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception { System.out.println("循环请求的时间:" + new Date() + ",次数" + fcount); if (obj instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) obj; if (IdleState.WRITER_IDLE.equals(event.state())) { // 如果写通道处于空闲状态,就发送心跳命令 UserMsg.Builder userState = UserMsg.newBuilder().setState(2); ctx.channel().writeAndFlush(userState); fcount++; } } } /** * 业务逻辑处理 */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // 如果不是protobuf类型的数据 if (!(msg instanceof UserMsg)) { System.out.println("未知数据!" + msg); return; } try { // 得到protobuf的数据 UserInfo.UserMsg userMsg = (UserInfo.UserMsg) msg; // 进行相应的业务处理。。。 // 这里就从简了,只是打印而已 System.out.println( "客户端接受到的用户信息。编号:" + userMsg.getId() + ",姓名:" + userMsg.getName() + ",年龄:" + userMsg.getAge()); // 这里返回一个已经接受到数据的状态 UserMsg.Builder userState = UserMsg.newBuilder().setState(1); ctx.writeAndFlush(userState); System.out.println("成功发送给服务端!"); } catch (Exception e) { e.printStackTrace(); } finally { ReferenceCountUtil.release(msg); } } }
那么到这里客户端的代码也编写完毕了。
首先启动服务端,然后再启动客户端。
我们来看看结果是否如上述所说。
服务端输出结果:
服务端启动成功,端口是:9876 连接的客户端地址:/127.0.0.1:53319 第1次,服务端接受的消息:state: 1 客户端业务处理成功! 第2次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第3次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第4次,服务端接受的消息:state: 2 接受到客户端发送的心跳!
客户端输入结果:
Netty客户端启动成功! 建立连接时:Mon Jul 16 23:31:58 CST 2018 客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18 成功发送给服务端! 循环请求的时间:Mon Jul 16 23:32:02 CST 2018,次数1 循环请求的时间:Mon Jul 16 23:32:06 CST 2018,次数2 循环请求的时间:Mon Jul 16 23:32:10 CST 2018,次数3 循环请求的时间:Mon Jul 16 23:32:14 CST 2018,次数4
通过打印信息可以看出如上述所说。
接下来我们再来看看客户端是否能够实现重连。
先启动客户端,再启动服务端。
客户端输入结果:
Netty客户端启动成功! 与服务端断开连接!在10s之后准备尝试重连! 客户端连接失败!AbstractChannel$CloseFuture@1fbaa3ac(incomplete) 建立连接时:Mon Jul 16 23:41:33 CST 2018 客户端接受到的用户信息。编号:1,姓名:xuwujing,年龄:18 成功发送给服务端! 循环请求的时间:Mon Jul 16 23:41:38 CST 2018,次数1 循环请求的时间:Mon Jul 16 23:41:42 CST 2018,次数2 循环请求的时间:Mon Jul 16 23:41:46 CST 2018,次数3
服务端输出结果:
服务端启动成功,端口是:9876 连接的客户端地址:/127.0.0.1:53492 第1次,服务端接受的消息:state: 1 客户端业务处理成功! 第2次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第3次,服务端接受的消息:state: 2 接受到客户端发送的心跳! 第4次,服务端接受的消息:state: 2
结果也如上述所说!
关于SpringBoot整合Netty使用Protobuf进行数据传输到这里就结束了。
SpringBoot整合Netty使用Protobuf进行数据传输的项目工程地址:
https://github.com/xuwujing/springBoot-study/tree/master/springboot-netty-protobuf
对了,也有不使用springBoot整合的Netty项目工程地址:
https://github.com/xuwujing/Netty-study/tree/master/Netty-protobuf
相关推荐:
使用Spring Cloud Netflix Zuul代理网关访问后台REST服务的实现(代码)
以上是SpringBoot整合Netty並使用Protobuf進行資料傳輸的實作過程的詳細內容。更多資訊請關注PHP中文網其他相關文章!