首頁 > web前端 > js教程 > 主體

什麼是RPC?聊聊node中怎麼實現 RPC 通信

青灯夜游
發布: 2022-11-03 20:39:20
轉載
1942 人瀏覽過

什麼是RPC?聊聊node中怎麼實現 RPC 通信

【相關教學推薦:nodejs影片教學

什麼是RPC?

RPC:Remote Procedure Call(遠端過程調用)是指遠端過程調用,也就是說兩台伺服器A,B,一個應用程式部署在A伺服器上,想要調用B伺服器上應用提供的函數/方法,由於不在一個記憶體空間,不能直接調用,需要透過網路來表達調用的語義和傳達調用的資料。

伺服器與伺服器之間的通訊

RPC vs HTTP

相同點

  • #都是兩台電腦之間的網路通訊。 ajax是瀏覽器與伺服器之間的通行,RPC是伺服器與伺服器之間的通行
  • 需要雙方約定一個資料格式

不同點

  • 尋址伺服器不同

ajax 是使用DNS作為定址服務取得網域所對應的ip位址,瀏覽器拿到ip位址之後發送請求取得資料。

RPC一般是在內網路裡面互相請求,所以它一般不用DNS做尋址服務。因為在內網,所以可以使用規定的id或一個虛擬vip,例如v5:8001,然後到尋址伺服器取得v5所對應的ip位址。

  • 應用程式層協定不同

ajax使用http協議,它是一個文字協議,我們互動資料的時候檔案格式要不是html,要嘛是json對象,使用json的時候就是key-value的形式。

RPC採用二進位協定。採用二進位傳輸,它傳輸的包是這樣子的[0001 0001 0111 0110 0010],裡面都是二進制,一般採用那幾位表示一個字段,比如前6位是一個字段,依次類推。

這樣就不需要http傳送json物件裡面的key,所以有更小的資料體積。

因為傳輸的是二進制,更適合於電腦來理解,文字協定更適合人類理解,所以電腦去解讀各個欄位的耗時是比文字協定少很多的。

RPC採用二進位有更小的資料體積,及更快的解讀速度。

  • TCP通訊方式
  • 單工通訊:只能客戶端向服務端發送訊息,或只能服務端給客戶端發送訊息

  • 半雙工通訊:在某個時間段內只能客戶端給服務端發送訊息,過了這個時間段服務端可以傳送訊息給客戶端。如果把時間分成很多時間片,在一個時間片內就屬於單工通訊

  • 全雙工通訊:客戶端與服務端能相互通訊

#選擇這三種通訊方式的哪一種主要考慮的因素是:實現難度和成本。全雙工通訊是要比半雙工通訊的成本要高的,在某些場景下還是可以考慮使用半雙工通訊。

ajax是一種半雙工通訊。 http是文字協議,但它底層是tcp協議,http文字在tcp這一層會經歷從二進位資料流到文字的轉換過程。

理解RPC只是在更深入地理解前端技術。

buffer編解碼二進位封包

建立buffer

buffer.from: 從已有的資料建立二進位

const buffer1 = Buffer.from('geekbang')
const buffer2 = Buffer.from([0, 1, 2, 3, 4])


<Buffer 67 65 65 6b 62 61 6e 67>
<Buffer 00 01 02 03 04>
登入後複製

buffer.alloc: 建立一個空的二進位

const buffer3 = Buffer.alloc(20)

<Buffer 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00 00>
登入後複製

#在buffer裡面寫東西

  • buffer.write(string, offset): 寫入字串
  • #buffer.writeInt8(value, offset): int8表示二進位8位元( 8位元表示一個位元組)所能表示的整數,offset開始寫入之前要跳過的位元組數。
  • buffer.writeInt16BE(value, offset): int16(兩個位元組數),表示16個二進位位元所能表示的整數,即32767。超過這個數程序會報錯。
const buffer = Buffer.from([1, 2, 3, 4]) // <Buffer 01 02 03 04>

// 往第二个字节里面写入12
buffer.writeInt8(12, 1) // <Buffer 01 0c 03 04>
登入後複製

大端BE與小端LE:主要是對於2個以上位元組的資料排列方式不同(writeInt8因為只有一個位元組,所以沒有大端和小端),大端的話就是低位位址放高位,小端就是低位元位址放低位。如下:

const buffer = Buffer.from([1, 2, 3, 4])

buffer.writeInt16BE(512, 2) // <Buffer 01 02 02 00>
buffer.writeInt16LE(512, 2) // <Buffer 01 02 00 02>
登入後複製

RPC傳輸的二進位如何表示傳遞的欄位

PC傳輸的二進位是如何表示欄位的呢?現在有二進位套件[00, 00, 00, 00, 00, 00, 00],我們假定前三個位元組表示一個欄位值,後面兩個表示一個欄位的值,最後兩個也表示一個欄位的值。那寫法如下:

writeInt16BE(value, 0)
writeInt16BE(value, 2)
writeInt16BE(value, 4)
登入後複製

发现像这样写,不仅要知道写入的值,还要知道值的数据类型,这样就很麻烦。不如json格式那么方便。针对这种情况业界也有解决方案。npm有个库protocol-buffers,把我们写的参数转化为buffer

// test.proto 定义的协议文件
message Column {
  required float num  = 1;
  required string payload = 2;
}
// index.js
const fs = require(&#39;fs&#39;)
var protobuf = require(&#39;protocol-buffers&#39;)
var messages = protobuf(fs.readFileSync(&#39;test.proto&#39;))

var buf = messages.Column.encode({
	num: 42,
	payload: &#39;hello world&#39;
})
console.log(buf)
// <Buffer 0d 00 00 28 42 12 0b 68 65 6c 6c 6f 20 77 6f 72 6c 64>

var obj = messages.Column.decode(buf)
console.log(obj)
// { num: 42, payload: &#39;hello world&#39; }
登入後複製

net建立RPC通道

半双工通信

服务端代码:

const net = require(&#39;net&#39;)

const LESSON_DATA = {
  136797: &#39;01 | 课程介绍&#39;,
  136798: &#39;02 | 内容综述&#39;,
  136799: &#39;03 | Node.js是什么?&#39;,
  136800: &#39;04 | Node.js可以用来做什么?&#39;,
  136801: &#39;05 | 课程实战项目介绍&#39;,
  136803: &#39;06 | 什么是技术预研?&#39;,
  136804: &#39;07 | Node.js开发环境安装&#39;,
  136806: &#39;08 | 第一个Node.js程序:石头剪刀布游戏&#39;,
  136807: &#39;09 | 模块:CommonJS规范&#39;,
  136808: &#39;10 | 模块:使用模块规范改造石头剪刀布游戏&#39;,
  136809: &#39;11 | 模块:npm&#39;,
  141994: &#39;12 | 模块:Node.js内置模块&#39;,
  143517: &#39;13 | 异步:非阻塞I/O&#39;,
  143557: &#39;14 | 异步:异步编程之callback&#39;,
  143564: &#39;15 | 异步:事件循环&#39;,
  143644: &#39;16 | 异步:异步编程之Promise&#39;,
  146470: &#39;17 | 异步:异步编程之async/await&#39;,
  146569: &#39;18 | HTTP:什么是HTTP服务器?&#39;,
  146582: &#39;19 | HTTP:简单实现一个HTTP服务器&#39;
}

const server = net.createServer(socket => {
  // 监听客户端发送的消息
  socket.on(&#39;data&#39;, buffer => {
    const lessonId = buffer.readInt32BE()
    setTimeout(() => {
      // 往客户端发送消息
      socket.write(LESSON_DATA[lessonId])
    }, 1000)
  })
})

server.listen(4000)
登入後複製

客户端代码:

const net = require(&#39;net&#39;)

const socket = new net.Socket({})

const LESSON_IDS = [
  &#39;136797&#39;,
  &#39;136798&#39;,
  &#39;136799&#39;,
  &#39;136800&#39;,
  &#39;136801&#39;,
  &#39;136803&#39;,
  &#39;136804&#39;,
  &#39;136806&#39;,
  &#39;136807&#39;,
  &#39;136808&#39;,
  &#39;136809&#39;,
  &#39;141994&#39;,
  &#39;143517&#39;,
  &#39;143557&#39;,
  &#39;143564&#39;,
  &#39;143644&#39;,
  &#39;146470&#39;,
  &#39;146569&#39;,
  &#39;146582&#39;
]

socket.connect({
  host: &#39;127.0.0.1&#39;,
  port: 4000
})

let buffer = Buffer.alloc(4)
buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

// 往服务端发送消息
socket.write(buffer)

// 监听从服务端传回的消息
socket.on(&#39;data&#39;, buffer => {
  console.log(buffer.toString())

  // 获取到数据之后再次发送消息
  buffer = Buffer.alloc(4)
  buffer.writeInt32BE(LESSON_IDS[Math.floor(Math.random() * LESSON_IDS.length)])

  socket.write(buffer)
})
登入後複製

以上半双工通信步骤如下:

  • 客户端发送消息 socket.write(buffer)
  • 服务端接受消息后往客户端发送消息 socket.write(buffer)
  • 客户端接受消息后再次发送消息

这样在一个时间端之内,只有一个端往另一个端发送消息,这样就实现了半双工通信。那如何实现全双工通信呢,也就是在客户端往服务端发送消息的同时,服务端还没有消息返回给客户端之前,客户端又发送了一个消息给服务端。

全双工通信

先来看一个场景:

什麼是RPC?聊聊node中怎麼實現 RPC 通信

客户端发送了一个id1的请求,但是服务端还来不及返回,接着客户端又发送了一个id2的请求。

等了一个之后,服务端先把id2的结果返回了,然后再把id1的结果返回。

那如何结果匹配到对应的请求上呢?

如果按照时间顺序,那么id1的请求对应了id2的结果,因为id2是先返回的;id2的请求对应了id1的结果,这样就导致请求包和返回包错位的情况。

怎么办呢?

我们可以给请求包和返回包都带上序号,这样就能对应上。

错位处理

客户端代码:

socket.on(&#39;data&#39;, buffer => {
  // 包序号
  const seqBuffer = buffer.slice(0, 2)
  // 服务端返回的内容
  const titleBuffer = buffer.slice(2)
    
  console.log(seqBuffer.readInt16BE(), titleBuffer.toString())
})

// 包序号
let seq = 0
function encode(index) {
  // 请求包的长度现在是6 = 2(包序号) + 4(课程id)
  buffer = Buffer.alloc(6)
  buffer.writeInt16BE(seq)
  buffer.writeInt32BE(LESSON_IDS[index], 2)

  seq++
  return buffer
}

// 每50ms发送一次请求
setInterval(() => {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}, 50)
登入後複製

服务端代码:

const server = net.createServer(socket => {
  socket.on(&#39;data&#39;, buffer => {
    // 把包序号取出
    const seqBuffer = buffer.slice(0, 2)
    // 从第2个字节开始读取
    const lessonId = buffer.readInt32BE(2)
    setTimeout(() => {
      const buffer = Buffer.concat([
        seqBuffer,
        Buffer.from(LESSON_DATA[lessonId])
      ])
      socket.write(buffer)
      // 这里返回时间采用随机的,这样就不会按顺序返回,就可以测试错位的情况
    }, 10 + Math.random() * 1000)
  })
})
登入後複製
  • 客户端把包序号和对应的id给服务端
  • 服务端取出包序号和对应的id,然后把包序号和id对应的内容返回给客户端,同时设置返回的时间是随机的,这样就不会按照顺序返回。

粘包处理

如果我们这样发送请求:

for (let i = 0; i < 100; i++) {
  id = Math.floor(Math.random() * LESSON_IDS.length)
  socket.write(encode(id))
}
登入後複製

我们发现服务端接收到的信息如下:

<Buffer 00 00 00 02 16 64 00 01 00 02 16 68 00 02 00 02 31 1c 00 03 00 02 3c 96 00 04 00 02 16 68 00 05 00 02 16 5e 00 06 00 02 16 66 00 07 00 02 16 67 00 08 ... 550 more bytes>
登入後複製

这是因为TCP自己做的一个优化,它会把所有的请求包拼接在一起,这样就会产生粘包的现象。

服务端需要把包进行拆分,拆分成100个小包。

那如何拆分呢?

首先客户端发送的数据包包括两部分:定长的包头和不定长的包体

包头又分为两部分:包序号及包体的长度。只有知道包体的长度,才能知道从哪里进行分割。

let seq = 0
function encode(data) {
    // 正常情况下,这里应该是使用 protocol-buffers 来encode一段代表业务数据的数据包
    // 为了不要混淆重点,这个例子比较简单,就直接把课程id转buffer发送
    const body = Buffer.alloc(4);
    body.writeInt32BE(LESSON_IDS[data.id]);

    // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
    // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
    const header = Buffer.alloc(6); // 包序号占2个字节,包体长度占4个字节,共6个字节
    header.writeInt16BE(seq)
    header.writeInt32BE(body.length, 2);

    // 包头和包体拼起来发送
    const buffer = Buffer.concat([header, body])

    console.log(`包${seq}传输的课程id为${LESSON_IDS[data.id]}`);
    seq++;
    return buffer;
}

// 并发
for (let i = 0; i < 100; i++) {
    id = Math.floor(Math.random() * LESSON_IDS.length)
    socket.write(encode({ id }))
}
登入後複製

服务端进行拆包

const server = net.createServer(socket => {
  let oldBuffer = null
  socket.on(&#39;data&#39;, buffer => {
    // 把上一次data事件使用残余的buffer接上来
    if (oldBuffer) {
      buffer = Buffer.concat([oldBuffer, buffer])
    }
    let packageLength = 0
    // 只要还存在可以解成完整包的包长
    while ((packageLength = checkComplete(buffer))) {
      // 确定包的长度后进行slice分割
      const package = buffer.slice(0, packageLength)
      // 剩余的包利用循环继续分割
      buffer = buffer.slice(packageLength)

      // 把这个包解成数据和seq
      const result = decode(package)

      // 计算得到要返回的结果,并write返回
      socket.write(encode(LESSON_DATA[result.data], result.seq))
    }

    // 把残余的buffer记下来
    oldBuffer = buffer
  })
})
登入後複製

checkComplete 函数的作用来确定一个数据包的长度,然后进行分割:

function checkComplete(buffer) {
  // 如果包的长度小于6个字节说明只有包头,没有包体,那么直接返回0
  if (buffer.length <= 6) {
    return 0
  }
  // 读取包头的第二个字节,取出包体的长度
  const bodyLength = buffer.readInt32BE(2)
  // 请求包包括包头(6个字节)和包体body
  return 6 + bodyLength
}
登入後複製

decode对包进行解密:

function decode(buffer) {
  // 读取包头
  const header = buffer.slice(0, 6)
  const seq = header.readInt16BE()
    
  // 读取包体  
  // 正常情况下,这里应该是使用 protobuf 来decode一段代表业务数据的数据包
  // 为了不要混淆重点,这个例子比较简单,就直接读一个Int32即可
  const body = buffer.slice(6).readInt32BE()

  // 这里把seq和数据返回出去
  return {
    seq,
    data: body
  }
}
登入後複製

encode把客户端想要的数据转化为二进制返回,这个包同样包括包头和包体,包头又包括包需要包序号和包体的长度。

function encode(data, seq) {
  // 正常情况下,这里应该是使用 protobuf 来encode一段代表业务数据的数据包
  // 为了不要混淆重点,这个例子比较简单,就直接把课程标题转buffer返回
  const body = Buffer.from(data)

  // 一般来说,一个rpc调用的数据包会分为定长的包头和不定长的包体两部分
  // 包头的作用就是用来记载包的序号和包的长度,以实现全双工通信
  const header = Buffer.alloc(6)
  header.writeInt16BE(seq)
  header.writeInt32BE(body.length, 2)

  const buffer = Buffer.concat([header, body])

  return buffer
}
登入後複製

当客户端收到服务端发送的包之后,同样也要进行拆包,因为所有的包同样都粘在一起了:

 <Buffer 00 00 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 b9 88 e6 98 af e6 8a 80 e6 9c af e9 a2 84 e7 a0 94 ef bc 9f 00 01 00 00 00 1d 30 36 20 7c 20 e4 bb 80 e4 ... 539 more bytes>
登入後複製

因此,客户端也需要拆包,拆包策略与服务端的拆包策略是一致的:

let oldBuffer = null
socket.on(&#39;data&#39;, buffer => {
  // 把上一次data事件使用残余的buffer接上来
  if (oldBuffer) {
    buffer = Buffer.concat([oldBuffer, buffer])
  }
  let completeLength = 0

  // 只要还存在可以解成完整包的包长
  while ((completeLength = checkComplete(buffer))) {
    const package = buffer.slice(0, completeLength)
    buffer = buffer.slice(completeLength)

    // 把这个包解成数据和seq
    const result = decode(package)
    console.log(`包${result.seq},返回值是${result.data}`)
  }

  // 把残余的buffer记下来
  oldBuffer = buffer
})
登入後複製

到这里就实现了双全工通行,这样客户端和服务端随时都可以往对方发小消息了。

更多node相关知识,请访问:nodejs 教程

以上是什麼是RPC?聊聊node中怎麼實現 RPC 通信的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:juejin.cn
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
最新問題
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!