目錄
protostuff简介
protostuff效率
使用
pom依赖
工具类
基于netty的rpc
参考
首頁 web前端 html教學 使用Protostuff序列化_html/css_WEB-ITnose

使用Protostuff序列化_html/css_WEB-ITnose

Jun 21, 2016 am 08:51 AM

rpc调用,有多种序列化的方式,通用如json,mongodb使用的bson;java方面的,比如Java默认的序列化,比如hessian;还有跨语言的,比如thrift、protocolbuf。thrift和pb的好处是序列化后size比较小,但是缺点是得生成java代码,这个挺鸡肋的,所以不管二者运行时效率有多高,开发效率相对比较低的。像hessian,是有一些在用,但是感觉不如pb那样强大。所以也一直在寻找运行效率与开发效率兼得的序列化方式。偶尔在网上看到protostuff,觉得找到了一直在找的这种序列化方式。

protostuff简介

protobuf的一个缺点是需要数据结构的预编译过程,首先要编写.proto格式的配置文件,再通过protobuf提供的工具生成各种语言响应的代码。由于java具有反射和动态代码生成的能力,这个预编译过程不是必须的,可以在代码执行时来实现。有 protostuff已经实现了这个功能。

protostuff效率

  • Ser Time+Deser Time (ns)

  • Size, Compressed size [light] in bytes

使用

pom依赖

        <dependency>            <groupId>com.dyuproject.protostuff</groupId>            <artifactId>protostuff-core</artifactId>            <version>1.0.8</version>        </dependency>        <dependency>            <groupId>com.dyuproject.protostuff</groupId>            <artifactId>protostuff-runtime</artifactId>            <version>1.0.8</version>        </dependency>
登入後複製

工具类

public class SerializationUtil {    private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();    private static Objenesis objenesis = new ObjenesisStd(true);    private static <T> Schema<T> getSchema(Class<T> clazz) {        @SuppressWarnings("unchecked")        Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);        if (schema == null) {            schema = RuntimeSchema.getSchema(clazz);            if (schema != null) {                cachedSchema.put(clazz, schema);            }        }        return schema;    }    /**     * 序列化     *     * @param obj     * @return     */    public static <T> byte[] serializer(T obj) {        @SuppressWarnings("unchecked")        Class<T> clazz = (Class<T>) obj.getClass();        LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);        try {            Schema<T> schema = getSchema(clazz);            return ProtostuffIOUtil.toByteArray(obj, schema, buffer);        } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);        } finally {            buffer.clear();        }    }    /**     * 反序列化     *     * @param data     * @param clazz     * @return     */    public static <T> T deserializer(byte[] data, Class<T> clazz) {        try {            T obj = objenesis.newInstance(clazz);            Schema<T> schema = getSchema(clazz);            ProtostuffIOUtil.mergeFrom(data, obj, schema);            return obj;        } catch (Exception e) {            throw new IllegalStateException(e.getMessage(), e);        }    }}
登入後複製

基于netty的rpc

  • NettyServer

public class NettyServer {    private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);    private int ioThreadNum;    //内核为此套接口排队的最大连接个数,对于给定的监听套接口,内核要维护两个队列,未链接队列和已连接队列大小总和最大值    private int backlog;    private int port;    private Channel channel;    private EventLoopGroup bossGroup;    private EventLoopGroup workerGroup;    public NettyServer(int ioThreadNum, int backlog, int port) {        this.ioThreadNum = ioThreadNum;        this.backlog = backlog;        this.port = port;    }    public void start() throws InterruptedException {        bossGroup = new NioEventLoopGroup();        workerGroup = new NioEventLoopGroup(this.ioThreadNum);        final Map<String,Object> demoService = new HashMap<String, Object>();        demoService.put("com.patterncat.service.HelloService", new HelloServiceImpl());        ServerBootstrap serverBootstrap = new ServerBootstrap();        serverBootstrap.group(bossGroup, workerGroup)                .channel(NioServerSocketChannel.class)                .option(ChannelOption.SO_BACKLOG, backlog)                //注意是childOption                .childOption(ChannelOption.SO_KEEPALIVE, true)                .childOption(ChannelOption.TCP_NODELAY, true)                .childHandler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel socketChannel) throws Exception {                        socketChannel.pipeline()                                .addLast(new RpcDecoder(RpcRequest.class))                                .addLast(new RpcEncoder(RpcResponse.class))                                .addLast(new ServerRpcHandler(demoService));                    }                });        channel = serverBootstrap.bind("127.0.0.1",port).sync().channel();        logger.info("NettyRPC server listening on port "+ port + " and ready for connections...");        Runtime.getRuntime().addShutdownHook(new Thread(){            @Override            public void run(){                //do shutdown staff            }        });    }    public void stop() {        if (null == channel) {            throw new ServerStopException();        }        bossGroup.shutdownGracefully();        workerGroup.shutdownGracefully();        channel.closeFuture().syncUninterruptibly();        bossGroup = null;        workerGroup = null;        channel = null;    }}
登入後複製
  • ServerRpcHandler

public class ServerRpcHandler extends SimpleChannelInboundHandler<RpcRequest> {    private static final Logger logger = LoggerFactory.getLogger(ServerRpcHandler.class);    private final Map<String, Object> serviceMapping;    public ServerRpcHandler(Map<String, Object> serviceMapping) {        this.serviceMapping = serviceMapping;    }    @Override    protected void channelRead0(ChannelHandlerContext channelHandlerContext, RpcRequest rpcRequest) throws Exception {        RpcResponse response = new RpcResponse();        response.setTraceId(rpcRequest.getTraceId());        try {            logger.info("server handle request:{}",rpcRequest);            Object result = handle(rpcRequest);            response.setResult(result);        } catch (Throwable t) {            response.setError(t);        }        channelHandlerContext.writeAndFlush(response);    }    private Object handle(RpcRequest request) throws Throwable {        String className = request.getClassName();        Object serviceBean = serviceMapping.get(className);        Class<?> serviceClass = serviceBean.getClass();        String methodName = request.getMethodName();        Class<?>[] parameterTypes = request.getParameterTypes();        Object[] parameters = request.getParameters();        FastClass serviceFastClass = FastClass.create(serviceClass);        FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);        return serviceFastMethod.invoke(serviceBean, parameters);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {        logger.error(cause.getMessage(), cause);        RpcResponse response = new RpcResponse();        if(cause instanceof ServerException){            response.setTraceId(((ServerException) cause).getTraceId());        }        response.setError(cause);        ctx.writeAndFlush(response);    }}
登入後複製
  • NettyClient

public class NettyClient implements IClient {    private EventLoopGroup workerGroup;    private Channel channel;    private int workerGroupThreads;    private ClientRpcHandler clientRpcHandler;    private final Optional<Pair<Long,TimeUnit>> NO_TIMEOUT = Optional.<Pair<Long,TimeUnit>>absent();    public NettyClient(int workerGroupThreads) {        this.workerGroupThreads = workerGroupThreads;    }    public void connect(InetSocketAddress socketAddress) {        workerGroup = new NioEventLoopGroup(workerGroupThreads);        clientRpcHandler = new ClientRpcHandler();        Bootstrap bootstrap = new Bootstrap();        bootstrap                .group(workerGroup)                .channel(NioSocketChannel.class)                .option(ChannelOption.SO_KEEPALIVE, true)                .option(ChannelOption.TCP_NODELAY, true)                .handler(new ChannelInitializer<SocketChannel>() {                    @Override                    protected void initChannel(SocketChannel ch) throws Exception {                        ch.pipeline()                                .addLast(new RpcDecoder(RpcResponse.class))                                .addLast(new RpcEncoder(RpcRequest.class))                                .addLast(clientRpcHandler);                    }                });        channel = bootstrap.connect(socketAddress.getAddress().getHostAddress(), socketAddress.getPort())                .syncUninterruptibly()                .channel();    }    public RpcResponse syncSend(RpcRequest request) throws InterruptedException {        System.out.println("send request:"+request);        channel.writeAndFlush(request).sync();        return clientRpcHandler.send(request,NO_TIMEOUT);    }    public RpcResponse asyncSend(RpcRequest request,TimeUnit timeUnit,long timeout) throws InterruptedException {        channel.writeAndFlush(request);        return clientRpcHandler.send(request, Optional.of(Pair.of(timeout,timeUnit)));    }    public InetSocketAddress getRemoteAddress() {        SocketAddress remoteAddress = channel.remoteAddress();        if (!(remoteAddress instanceof InetSocketAddress)) {            throw new RuntimeException("Get remote address error, should be InetSocketAddress");        }        return (InetSocketAddress) remoteAddress;    }    public void close() {        if (null == channel) {            throw new ClientCloseException();        }        workerGroup.shutdownGracefully();        channel.closeFuture().syncUninterruptibly();        workerGroup = null;        channel = null;    }}
登入後複製
  • ClientRpcHandler

@ChannelHandler.Sharablepublic class ClientRpcHandler extends SimpleChannelInboundHandler<RpcResponse> {    //用blocking queue主要是用阻塞的功能,省的自己加锁    private final ConcurrentHashMap<String, BlockingQueue<RpcResponse>> responseMap = new ConcurrentHashMap<String, BlockingQueue<RpcResponse>>();    //messageReceived    @Override    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse rpcResponse) throws Exception {        System.out.println("receive response:"+rpcResponse);        BlockingQueue<RpcResponse> queue = responseMap.get(rpcResponse.getTraceId());        queue.add(rpcResponse);    }    @Override    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {        super.exceptionCaught(ctx, cause);        cause.printStackTrace();    }    public RpcResponse send(RpcRequest request,Optional<Pair<Long,TimeUnit>> timeout) throws InterruptedException {        responseMap.putIfAbsent(request.getTraceId(), new LinkedBlockingQueue<RpcResponse>(1));        RpcResponse response = null;        try {            BlockingQueue<RpcResponse> queue = responseMap.get(request.getTraceId());            if(timeout == null || !timeout.isPresent()){                response = queue.take();            }else{                response = queue.poll(timeout.get().getKey(),timeout.get().getValue());            }        } finally {            responseMap.remove(request.getTraceId());        }        return response;    }}
登入後複製
  • decoder

public class RpcDecoder extends ByteToMessageDecoder {    private Class<?> genericClass;    public RpcDecoder(Class<?> genericClass) {        this.genericClass = genericClass;    }    @Override    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {        if (byteBuf.readableBytes() < 4) {            return;        }        byteBuf.markReaderIndex();        int dataLength = byteBuf.readInt();        if (dataLength < 0) {            channelHandlerContext.close();        }        if (byteBuf.readableBytes() < dataLength) {            byteBuf.resetReaderIndex();        }        byte[] data = new byte[dataLength];        byteBuf.readBytes(data);        Object obj = SerializationUtil.deserializer(data, genericClass);        list.add(obj);    }}
登入後複製
  • encoder

public class RpcEncoder extends MessageToByteEncoder {    private Class<?> genericClass;    public RpcEncoder(Class<?> genericClass) {        this.genericClass = genericClass;    }    @Override    protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {        if (genericClass.isInstance(obj)) {            byte[] data = SerializationUtil.serializer(obj);            byteBuf.writeInt(data.length);            byteBuf.writeBytes(data);        }    }}
登入後複製

工程源码 protocol-demo

参考

  • jvm-serializers

  • protostuff

  • java序列化/反序列化之xstream、protobuf、protostuff 的比较与使用例子

  • Protostuff序列化

  • protostuff介绍

  • Protostuff详解

  • 序列化框架 kryo VS hessian VS Protostuff VS java

  • Protostuff序列化和反序列化

  • eishay/jvm-serializers

  • Protostuff 序列化

  • 使用Netty实现多路复用的client

本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn

熱AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智慧驅動的應用程序,用於創建逼真的裸體照片

AI Clothes Remover

AI Clothes Remover

用於從照片中去除衣服的線上人工智慧工具。

Undress AI Tool

Undress AI Tool

免費脫衣圖片

Clothoff.io

Clothoff.io

AI脫衣器

AI Hentai Generator

AI Hentai Generator

免費產生 AI 無盡。

熱門文章

R.E.P.O.能量晶體解釋及其做什麼(黃色晶體)
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳圖形設置
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌
威爾R.E.P.O.有交叉遊戲嗎?
1 個月前 By 尊渡假赌尊渡假赌尊渡假赌

熱工具

記事本++7.3.1

記事本++7.3.1

好用且免費的程式碼編輯器

SublimeText3漢化版

SublimeText3漢化版

中文版,非常好用

禪工作室 13.0.1

禪工作室 13.0.1

強大的PHP整合開發環境

Dreamweaver CS6

Dreamweaver CS6

視覺化網頁開發工具

SublimeText3 Mac版

SublimeText3 Mac版

神級程式碼編輯軟體(SublimeText3)

&gt; gt;的目的是什麼 元素? &gt; gt;的目的是什麼 元素? Mar 21, 2025 pm 12:34 PM

本文討論了HTML&lt; Progress&gt;元素,其目的,樣式和與&lt; meter&gt;元素。主要重點是使用&lt; progress&gt;為了完成任務和LT;儀表&gt;對於stati

&lt; datalist&gt;的目的是什麼。 元素? &lt; datalist&gt;的目的是什麼。 元素? Mar 21, 2025 pm 12:33 PM

本文討論了html&lt; datalist&gt;元素,通過提供自動完整建議,改善用戶體驗並減少錯誤來增強表格。Character計數:159

&lt; meter&gt;的目的是什麼。 元素? &lt; meter&gt;的目的是什麼。 元素? Mar 21, 2025 pm 12:35 PM

本文討論了HTML&lt; meter&gt;元素,用於在一個範圍內顯示標量或分數值及其在Web開發中的常見應用。它區分了&lt; meter&gt;從&lt; progress&gt;和前

視口元標籤是什麼?為什麼對響應式設計很重要? 視口元標籤是什麼?為什麼對響應式設計很重要? Mar 20, 2025 pm 05:56 PM

本文討論了視口元標籤,這對於移動設備上的響應式Web設計至關重要。它解釋瞭如何正確使用確保最佳的內容縮放和用戶交互,而濫用可能會導致設計和可訪問性問題。

HTML容易為初學者學習嗎? HTML容易為初學者學習嗎? Apr 07, 2025 am 12:11 AM

HTML適合初學者學習,因為它簡單易學且能快速看到成果。 1)HTML的學習曲線平緩,易於上手。 2)只需掌握基本標籤即可開始創建網頁。 3)靈活性高,可與CSS和JavaScript結合使用。 4)豐富的學習資源和現代工具支持學習過程。

&lt; iframe&gt;的目的是什麼。 標籤?使用時的安全考慮是什麼? &lt; iframe&gt;的目的是什麼。 標籤?使用時的安全考慮是什麼? Mar 20, 2025 pm 06:05 PM

本文討論了&lt; iframe&gt;將外部內容嵌入網頁,其常見用途,安全風險以及諸如對象標籤和API等替代方案的目的。

HTML,CSS和JavaScript的角色:核心職責 HTML,CSS和JavaScript的角色:核心職責 Apr 08, 2025 pm 07:05 PM

HTML定義網頁結構,CSS負責樣式和佈局,JavaScript賦予動態交互。三者在網頁開發中各司其職,共同構建豐富多彩的網站。

HTML中起始標籤的示例是什麼? HTML中起始標籤的示例是什麼? Apr 06, 2025 am 12:04 AM

AnexampleOfAstartingTaginHtmlis,beginSaparagraph.startingTagSareEssentialInhtmlastheyInitiateEllements,defiteTheeTheErtypes,andarecrucialforsstructuringwebpages wepages webpages andConstructingthedom。

See all articles