Rumah > Java > JavaSoalan temu bual > Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.

Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.

Lepaskan: 2023-08-17 16:24:22
ke hadapan
1126 orang telah melayarinya

Penemuduga suka bertanya:

  • Bagaimana untuk mereka bentuk pusat pendaftaran?
  • Bagaimana untuk mereka bentuk baris gilir mesej?
  • Bagaimana untuk mereka bentuk rangka kerja kegigihan?
  • Bagaimana untuk mereka bentuk rangka kerja RPC?
  • ......

Hari ini, mari kita bercakap tentang "Prinsip Pelaksanaan RPC" (topik lain yang berkaitan tentang cara mereka bentuk siri XX telah diterbitkan di Planet Pengetahuan)Jadi pertama sekali, mari kita jelas soalan: apakah itu Bagaimana pula dengan RPC?

RPC ialah singkatan dari Remote Procedure Call iaitu remote procedure call.

RPC ialah protokol komunikasi komputer. Protokol ini membenarkan program yang dijalankan pada satu komputer untuk memanggil subrutin pada komputer lain tanpa pembangun perlu memprogramkan interaksi ini secara tambahan.

Perlu diingat bahawa apabila dua atau lebih aplikasi diedarkan pada pelayan yang berbeza, panggilan antara mereka adalah seperti panggilan kaedah tempatan. Seterusnya, mari analisa apa yang berlaku dalam panggilan RPC?

Proses asas panggilan RPC

Beberapa rangka kerja RPC yang lebih popular dalam industri sekarang, seperti Dubbo, menyediakan invokasi kaedah jauh berasaskan antara muka, iaitu pelanggan hanya perlu mengetahui definisi antara muka untuk memanggil kaedah jauh Serve. Dalam Java, antara muka tidak boleh memanggil kaedah contoh secara langsung Operasi ini mesti diselesaikan melalui objek kelas pelaksanaannya, yang bermaksud bahawa klien mesti menjana objek proksi untuk antara muka ini, Java menyediakan sokongan untuk Proxy dan InvocationHandler untuk menjana proksi dinamik Jika anda mempunyai objek proksi, bagaimanakah setiap kaedah tertentu dipanggil? Apabila objek proksi yang dijana oleh proksi dinamik JDK memanggil kaedah yang ditentukan, ia sebenarnya akan melaksanakan kaedah #invoke yang ditakrifkan dalam InvocationHandler, di mana panggilan kaedah jauh selesai dan hasilnya diperoleh.

Mengetepikan pelanggan, melihat ke belakang, RPC ialah panggilan antara dua komputer Ia pada asasnya adalah komunikasi rangkaian antara dua hos Apabila ia berkaitan dengan komunikasi rangkaian, mesti ada pensirilan, pengekodan dan penyahkodan, dsb. Isu yang perlu dipertimbangkan. ; pada masa yang sama, sebenarnya, kebanyakan sistem digunakan dalam kluster Berbilang hos/bekas menyediakan perkhidmatan yang sama kepada dunia luar Jika bilangan nod dalam kluster adalah besar, mengurus alamat perkhidmatan juga akan menjadi perkara yang sangat rumit . , pendekatan biasa ialah setiap nod perkhidmatan mendaftarkan alamatnya dan menyediakan senarai perkhidmatan kepada pusat pendaftaran, dan pusat pendaftaran menguruskan senarai perkhidmatan dengan cara yang bersatu ini menyelesaikan beberapa masalah dan menambah tugas baharu kepada pelanggan. —Itu adalah penemuan perkhidmatan Dalam istilah orang awam, ini bermakna mencari senarai perkhidmatan yang sepadan dengan kaedah jauh dari pusat pendaftaran dan memilih alamat perkhidmatan daripadanya melalui strategi tertentu untuk melengkapkan komunikasi rangkaian.

Selepas berbual tentang pelanggan dan pusat pendaftaran, satu lagi peranan penting secara semulajadi adalah tugas pelayan adalah untuk menyediakan pelaksanaan sebenar antara muka perkhidmatan dan memantau permintaan rangkaian pada port tertentu permintaan, ia akan dihantar dari rangkaian Parameter yang sepadan (seperti antara muka perkhidmatan, kaedah, parameter permintaan, dll.) diperoleh daripada permintaan, dan kemudian berdasarkan parameter ini, pelaksanaan sebenar antara muka dipanggil melalui refleksi untuk mendapatkan keputusan dan ditulis ke dalam aliran respons yang sepadan.

Ringkasnya, proses panggilan RPC asas adalah kira-kira seperti berikut:

Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.
Gambar

Pelaksanaan asas

Antara muka perkhidmatan:

Dalam RPC, pengeluar dan pengguna mempunyai API antara muka perkhidmatan yang sama. Seperti berikut, tentukan antara muka HelloService.

/**
 * @Descrption  服务接口
 ***/
public interface HelloService {
    String sayHello(String somebody);
}
Salin selepas log masuk

Pelaksanaan perkhidmatan:

Pengeluar perlu menyediakan pelaksanaan antara muka perkhidmatan dan mencipta kelas pelaksanaan HelloServiceImpl.

/**
 * @Descrption 服务实现
 ***/
public class HelloServiceImpl implements HelloService {
    @Override
    public String sayHello(String somebody) {
        return "hello " + somebody + "!";
    }
}
Salin selepas log masuk

Pendaftaran perkhidmatan:

Contoh ini menggunakan Spring untuk mengurus kacang, dan menggunakan XML tersuai dan penghurai untuk memuatkan kelas pelaksanaan perkhidmatan ke dalam bekas (sudah tentu, anotasi tersuai juga boleh digunakan, tetapi saya tidak akan pergi ke dalamnya juga banyak perincian di sini) Dan daftarkan maklumat antara muka perkhidmatan ke pusat pendaftaran.

Suaikan XSD dahulu:

<xsd:element name="service">
    <xsd:complexType>
        <xsd:complexContent>
            <xsd:extension base="beans:identifiedType">
                <xsd:attribute name="interface" type="xsd:string" use="required"/>
                <xsd:attribute name="timeout" type="xsd:int" use="required"/>
                <xsd:attribute name="serverPort" type="xsd:int" use="required"/>
                <xsd:attribute name="ref" type="xsd:string" use="required"/>
                <xsd:attribute name="weight" type="xsd:int" use="optional"/>
                <xsd:attribute name="workerThreads" type="xsd:int" use="optional"/>
                <xsd:attribute name="appKey" type="xsd:string" use="required"/>
                <xsd:attribute name="groupName" type="xsd:string" use="optional"/>
            </xsd:extension>
        </xsd:complexContent>
    </xsd:complexType>
</xsd:element>
Salin selepas log masuk

Nyatakan Skema dan XSD masing-masing, Skema dan pemetaan Pengendali yang sepadan.

Skema:

http\://www.storm.com/schema/storm-service.xsd=META-INF/storm-service.xsd
http\://www.storm.com/schema/storm-reference.xsd=META-INF/storm-reference.xsd
Salin selepas log masuk

Pengendali:

http\://www.storm.com/schema/storm-service=com.hsunfkqm.storm.framework.spring.StormServiceNamespaceHandler
http\://www.storm.com/schema/storm-reference=com.hsunfkqm.storm.framework.spring.StormRemoteReferenceNamespaceHandler
Salin selepas log masuk

Letakkan fail bertulis ke dalam direktori META-INF di bawah Classpath: 🎜
Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.
图片

在 Spring 配置文件中配置服务类:

<!-- 发布远程服务 -->
 <bean id="helloService" class="com.hsunfkqm.storm.framework.test.HelloServiceImpl"/>
 <storm:service id="helloServiceRegister"
                     interface="com.hsunfkqm.storm.framework.test.HelloService"
                     ref="helloService"
                     groupName="default"
                     weight="2"
                     appKey="ares"
                     workerThreads="100"
                     serverPort="8081"
                     timeout="600"/>
Salin selepas log masuk

编写对应的 Handler 和 Parser:

StormServiceNamespaceHandler:

import org.springframework.beans.factory.xml.NamespaceHandlerSupport;

/**
 * @author 孙浩
 * @Descrption 服务发布自定义标签
 ***/
public class StormServiceNamespaceHandler extends NamespaceHandlerSupport {
    @Override
    public void init() {
        registerBeanDefinitionParser("service", new ProviderFactoryBeanDefinitionParser());
    }
}
Salin selepas log masuk

ProviderFactoryBeanDefinitionParser:

protected Class getBeanClass(Element element) {
        return ProviderFactoryBean.class;
    }

    protected void doParse(Element element, BeanDefinitionBuilder bean) {

        try {
            String serviceItf = element.getAttribute("interface");
            String serverPort = element.getAttribute("serverPort");
            String ref = element.getAttribute("ref");
            // ....
            bean.addPropertyValue("serverPort", Integer.parseInt(serverPort));
            bean.addPropertyValue("serviceItf", Class.forName(serviceItf));
            bean.addPropertyReference("serviceObject", ref);
            //...
            if (NumberUtils.isNumber(weight)) {
                bean.addPropertyValue("weight", Integer.parseInt(weight));
            }
            //...
       } catch (Exception e) {
            // ...        
      }
    }
Salin selepas log masuk

ProviderFactoryBean:

/**
 * @Descrption 服务发布
 ***/
public class ProviderFactoryBean implements FactoryBean, InitializingBean {

    //服务接口
    private Class<?> serviceItf;
    //服务实现
    private Object serviceObject;
    //服务端口
    private String serverPort;
    //服务超时时间
    private long timeout;
    //服务代理对象,暂时没有用到
    private Object serviceProxyObject;
    //服务提供者唯一标识
    private String appKey;
    //服务分组组名
    private String groupName = "default";
    //服务提供者权重,默认为 1 , 范围为 [1-100]
    private int weight = 1;
    //服务端线程数,默认 10 个线程
    private int workerThreads = 10;

    @Override
    public Object getObject() throws Exception {
        return serviceProxyObject;
    }

    @Override
    public Class<?> getObjectType() {
        return serviceItf;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        //启动 Netty 服务端
        NettyServer.singleton().start(Integer.parseInt(serverPort));
        //注册到 zk, 元数据注册中心
        List<ProviderService> providerServiceList = buildProviderServiceInfos();
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        registerCenter4Provider.registerProvider(providerServiceList);
    }
}

//================RegisterCenter#registerProvider======================
@Override
public void registerProvider(final List<ProviderService> serviceMetaData) {
    if (CollectionUtils.isEmpty(serviceMetaData)) {
        return;
    }

    //连接 zk, 注册服务
    synchronized (RegisterCenter.class) {
        for (ProviderService provider : serviceMetaData) {
            String serviceItfKey = provider.getServiceItf().getName();

            List<ProviderService> providers = providerServiceMap.get(serviceItfKey);
            if (providers == null) {
                providers = Lists.newArrayList();
            }
            providers.add(provider);
            providerServiceMap.put(serviceItfKey, providers);
        }

        if (zkClient == null) {
            zkClient = new ZkClient(ZK_SERVICE, ZK_SESSION_TIME_OUT, ZK_CONNECTION_TIME_OUT, new SerializableSerializer());
        }

        //创建 ZK 命名空间/当前部署应用 APP 命名空间/
        String APP_KEY = serviceMetaData.get(0).getAppKey();
        String ZK_PATH = ROOT_PATH + "/" + APP_KEY;
        boolean exist = zkClient.exists(ZK_PATH);
        if (!exist) {
            zkClient.createPersistent(ZK_PATH, true);
        }

        for (Map.Entry<String, List<ProviderService>> entry : providerServiceMap.entrySet()) {
            //服务分组
            String groupName = entry.getValue().get(0).getGroupName();
            //创建服务提供者
            String serviceNode = entry.getKey();
            String servicePath = ZK_PATH + "/" + groupName + "/" + serviceNode + "/" + PROVIDER_TYPE;
            exist = zkClient.exists(servicePath);
            if (!exist) {
                zkClient.createPersistent(servicePath, true);
            }

            //创建当前服务器节点
            int serverPort = entry.getValue().get(0).getServerPort();//服务端口
            int weight = entry.getValue().get(0).getWeight();//服务权重
            int workerThreads = entry.getValue().get(0).getWorkerThreads();//服务工作线程
            String localIp = IPHelper.localIp();
            String currentServiceIpNode = servicePath + "/" + localIp + "|" + serverPort + "|" + weight + "|" + workerThreads + "|" + groupName;
            exist = zkClient.exists(currentServiceIpNode);
            if (!exist) {
                //注意,这里创建的是临时节点
                zkClient.createEphemeral(currentServiceIpNode);
            }
            //监听注册服务的变化,同时更新数据到本地缓存
            zkClient.subscribeChildChanges(servicePath, new IZkChildListener() {
                @Override
                public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
                    if (currentChilds == null) {
                        currentChilds = Lists.newArrayList();
                    }
                    //存活的服务 IP 列表
                    List<String> activityServiceIpList = Lists.newArrayList(Lists.transform(currentChilds, new Function<String, String>() {
                        @Override
                        public String apply(String input) {
                            return StringUtils.split(input, "|")[0];
                        }
                    }));
                    refreshActivityService(activityServiceIpList);
                }
            });

        }
    }
}
Salin selepas log masuk

至此服务实现类已被载入 Spring 容器中,且服务接口信息也注册到了注册中心。

网络通信:

作为生产者对外提供 RPC 服务,必须有一个网络程序来来监听请求和做出响应。在 Java 领域 Netty 是一款高性能的 NIO 通信框架,很多的框架的通信都是采用 Netty 来实现的,本例中也采用它当做通信服务器。

构建并启动 Netty 服务监听指定端口:

public void start(final int port) {
        synchronized (NettyServer.class) {
            if (bossGroup != null || workerGroup != null) {
                return;
            }

            bossGroup = new NioEventLoopGroup();
            workerGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap
                    .group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            //注册解码器 NettyDecoderHandler
                            ch.pipeline().addLast(new NettyDecoderHandler(StormRequest.class, serializeType));
                            //注册编码器 NettyEncoderHandler
                            ch.pipeline().addLast(new NettyEncoderHandler(serializeType));
                            //注册服务端业务逻辑处理器 NettyServerInvokeHandler
                            ch.pipeline().addLast(new NettyServerInvokeHandler());
                        }
                    });
            try {
                channel = serverBootstrap.bind(port).sync().channel();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }
Salin selepas log masuk

上面的代码中向 Netty 服务的 Pipeline 中添加了编解码和业务处理器,当接收到请求时,经过编解码后,真正处理业务的是业务处理器,即 NettyServerInvokeHandler,该处理器继承自 SimpleChannelInboundHandler,当数据读取完成将触发一个事件,并调用 NettyServerInvokeHandler#channelRead0 方法来处理请求。

@Override
protected void channelRead0(ChannelHandlerContext ctx, StormRequest request) throws Exception {
    if (ctx.channel().isWritable()) {
        //从服务调用对象里获取服务提供者信息
        ProviderService metaDataModel = request.getProviderService();
        long consumeTimeOut = request.getInvokeTimeout();
        final String methodName = request.getInvokedMethodName();

        //根据方法名称定位到具体某一个服务提供者
        String serviceKey = metaDataModel.getServiceItf().getName();
        //获取限流工具类
        int workerThread = metaDataModel.getWorkerThreads();
        Semaphore semaphore = serviceKeySemaphoreMap.get(serviceKey);
        if (semaphore == null) {
            synchronized (serviceKeySemaphoreMap) {
                semaphore = serviceKeySemaphoreMap.get(serviceKey);
                if (semaphore == null) {
                    semaphore = new Semaphore(workerThread);
                    serviceKeySemaphoreMap.put(serviceKey, semaphore);
                }
            }
        }

        //获取注册中心服务
        IRegisterCenter4Provider registerCenter4Provider = RegisterCenter.singleton();
        List<ProviderService> localProviderCaches = registerCenter4Provider.getProviderServiceMap().get(serviceKey);

        Object result = null;
        boolean acquire = false;

        try {
            ProviderService localProviderCache = Collections2.filter(localProviderCaches, new Predicate<ProviderService>() {
                @Override
                public boolean apply(ProviderService input) {
                    return StringUtils.equals(input.getServiceMethod().getName(), methodName);
                }
            }).iterator().next();
            Object serviceObject = localProviderCache.getServiceObject();

            //利用反射发起服务调用
            Method method = localProviderCache.getServiceMethod();
            //利用 semaphore 实现限流
            acquire = semaphore.tryAcquire(consumeTimeOut, TimeUnit.MILLISECONDS);
            if (acquire) {
                result = method.invoke(serviceObject, request.getArgs());
                //System.out.println("---------------"+result);
            }
        } catch (Exception e) {
            System.out.println(JSON.toJSONString(localProviderCaches) + "  " + methodName+" "+e.getMessage());
            result = e;
        } finally {
            if (acquire) {
                semaphore.release();
            }
        }
        //根据服务调用结果组装调用返回对象
        StormResponse response = new StormResponse();
        response.setInvokeTimeout(consumeTimeOut);
        response.setUniqueKey(request.getUniqueKey());
        response.setResult(result);
        //将服务调用返回对象回写到消费端
        ctx.writeAndFlush(response);
    } else {
        logger.error("------------channel closed!---------------");
    }
}
Salin selepas log masuk

此处还有部分细节如自定义的编解码器等,篇幅所限不在此详述,继承 MessageToByteEncoder 和 ByteToMessageDecoder 覆写对应的 encode 和 decode 方法即可自定义编解码器,使用到的序列化工具如 Hessian/Proto 等可参考对应的官方文档。

请求和响应包装:

为便于封装请求和响应,定义两个 bean 来表示请求和响应。

请求:

/**
 * @author 孙浩
 * @Descrption
 ***/
public class StormRequest implements Serializable {

    private static final long serialVersionUID = -5196465012408804755L;
    //UUID,唯一标识一次返回值
    private String uniqueKey;
    //服务提供者信息
    private ProviderService providerService;
    //调用的方法名称
    private String invokedMethodName;
    //传递参数
    private Object[] args;
    //消费端应用名
    private String appName;
    //消费请求超时时长
    private long invokeTimeout;
    // getter/setter
}
Salin selepas log masuk

响应:

/**
 * @Descrption
 ***/
public class StormResponse implements Serializable {
    private static final long serialVersionUID = 5785265307118147202L;
    //UUID, 唯一标识一次返回值
    private String uniqueKey;
    //客户端指定的服务超时时间
    private long invokeTimeout;
    //接口调用返回的结果对象
    private Object result;
    //getter/setter
}
Salin selepas log masuk

客户端(消费者)

客户端(消费者)在 RPC 调用中主要是生成服务接口的代理对象,并从注册中心获取对应的服务列表发起网络请求。

客户端和服务端一样采用 Spring 来管理 bean 解析 XML 配置等不再赘述,重点看下以下几点:

1、通过 JDK 动态代理来生成引入服务接口的代理对象

public Object getProxy() {
    return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[]{targetInterface}, this);
}
Salin selepas log masuk

2、从注册中心获取服务列表并依据某种策略选取其中一个服务节点

//服务接口名称
String serviceKey = targetInterface.getName();
//获取某个接口的服务提供者列表
IRegisterCenter4Invoker registerCenter4Consumer = RegisterCenter.singleton();
List<ProviderService> providerServices = registerCenter4Consumer.getServiceMetaDataMap4Consume().get(serviceKey);
//根据软负载策略,从服务提供者列表选取本次调用的服务提供者
ClusterStrategy clusterStrategyService = ClusterEngine.queryClusterStrategy(clusterStrategy);
ProviderService providerService = clusterStrategyService.select(providerServices);
Salin selepas log masuk

3、通过 Netty 建立连接,发起网络请求

/**
 * @author 孙浩
 * @Descrption Netty 消费端 bean 代理工厂
 ***/
public class RevokerProxyBeanFactory implements InvocationHandler {
    private ExecutorService fixedThreadPool = null;
    //服务接口
    private Class<?> targetInterface;
    //超时时间
    private int consumeTimeout;
    //调用者线程数
    private static int threadWorkerNumber = 10;
    //负载均衡策略
    private String clusterStrategy;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        ...

        //复制一份服务提供者信息
        ProviderService newProvider = providerService.copy();
        //设置本次调用服务的方法以及接口
        newProvider.setServiceMethod(method);
        newProvider.setServiceItf(targetInterface);

        //声明调用 AresRequest 对象,AresRequest 表示发起一次调用所包含的信息
        final StormRequest request = new StormRequest();
        //设置本次调用的唯一标识
        request.setUniqueKey(UUID.randomUUID().toString() + "-" + Thread.currentThread().getId());
        //设置本次调用的服务提供者信息
        request.setProviderService(newProvider);
        //设置本次调用的方法名称
        request.setInvokedMethodName(method.getName());
        //设置本次调用的方法参数信息
        request.setArgs(args);

        try {
            //构建用来发起调用的线程池
            if (fixedThreadPool == null) {
                synchronized (RevokerProxyBeanFactory.class) {
                    if (null == fixedThreadPool) {
                        fixedThreadPool = Executors.newFixedThreadPool(threadWorkerNumber);
                    }
                }
            }
            //根据服务提供者的 ip,port, 构建 InetSocketAddress 对象,标识服务提供者地址
            String serverIp = request.getProviderService().getServerIp();
            int serverPort = request.getProviderService().getServerPort();
            InetSocketAddress inetSocketAddress = new InetSocketAddress(serverIp, serverPort);
            //提交本次调用信息到线程池 fixedThreadPool, 发起调用
            Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
            //获取调用的返回结果
            StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
            if (response != null) {
                return response.getResult();
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        return null;
    }
    //  ...
}
Salin selepas log masuk

Netty 的响应是异步的,为了在方法调用返回前获取到响应结果,需要将异步的结果同步化。

4、Netty 异步返回的结果存入阻塞队列

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, StormResponse response) throws Exception {
    //将 Netty 异步返回的结果存入阻塞队列,以便调用端同步获取
    RevokerResponseHolder.putResultValue(response);
}
Salin selepas log masuk

5、请求发出后同步获取结果

//提交本次调用信息到线程池 fixedThreadPool, 发起调用
Future<StormResponse> responseFuture = fixedThreadPool.submit(RevokerServiceCallable.of(inetSocketAddress, request));
//获取调用的返回结果
StormResponse response = responseFuture.get(request.getInvokeTimeout(), TimeUnit.MILLISECONDS);
if (response != null) {
    return response.getResult();
}

//===================================================
//从返回结果容器中获取返回结果,同时设置等待超时时间为 invokeTimeout
long invokeTimeout = request.getInvokeTimeout();
StormResponse response = RevokerResponseHolder.getValue(request.getUniqueKey(), invokeTimeout);
Salin selepas log masuk

测试

Server:

/**
 * @Descrption
 ***/
public class MainServer {
    public static void main(String[] args) throws Exception {
        //发布服务
        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-server.xml");
        System.out.println(" 服务发布完成");
    }
}
Salin selepas log masuk

Client:

public class Client {

    private static final Logger logger = LoggerFactory.getLogger(Client.class);

    public static void main(String[] args) throws Exception {

        final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("storm-client.xml");
        final HelloService helloService = (HelloService) context.getBean("helloService");
        String result = helloService.sayHello("World");
        System.out.println(result);
        for (;;) {

        }
    }
}
Salin selepas log masuk

结果

生产者:

Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.
图片

消费者:

Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.
图片

注册中心:

Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.
图片

总结

本文简单介绍了 RPC 的整个流程,并实现了一个简单的 RPC 调用。希望阅读完本文之后,能加深你对 RPC 的一些认识。

生产者端流程:

  • Muat antara muka perkhidmatan, cache
  • pendaftaran perkhidmatan, tulis antara muka perkhidmatan dan maklumat hos perkhidmatan ke dalam pusat pendaftaran (contoh ini menggunakan ZooKeeper)
  • Mulakan pelayan rangkaian dan dengar
  • refleksi, Panggilan tempatan
  • Proses pihak pengguna:

    Antara muka perkhidmatan proksi menjana objek proksi
  • Penemuan perkhidmatan (sambung ke ZooKeeper, dapatkan senarai alamat perkhidmatan, dan dapatkan alamat perkhidmatan yang sesuai melalui pelanggan polisi)
  • Panggilan kaedah jauh (dalam contoh ini, hantar mesej melalui Netty dan dapatkan hasil respons)

Atas ialah kandungan terperinci Penemuduga Alibaba: Sila tulis tangan rangka kerja RPC untuk saya.. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Label berkaitan:
sumber:Java后端技术全栈
Kenyataan Laman Web ini
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn
Tutorial Popular
Lagi>
Muat turun terkini
Lagi>
kesan web
Kod sumber laman web
Bahan laman web
Templat hujung hadapan