首頁 > 後端開發 > Golang > 主體

程式碼詳解使用Go基於WebSocket建構視訊直播彈幕系統

coldplay.xixi
發布: 2020-07-21 17:27:03
轉載
4499 人瀏覽過

程式碼詳解使用Go基於WebSocket建構視訊直播彈幕系統

(1)業務複雜度介紹

開門見山,假設一個直播間同時500W人在線,那麼1秒鐘1000條彈幕,那麼彈幕系統的推播頻率就是: 500W * 1000條/秒=50億條/秒 ,想想B站2019跨年晚會那次彈幕系統得是多麼的NB,況且一個大型網站不可能只有一個直播間!

使用Go做WebSocket開發無非就是三種情況:

  • 使用Go原生自帶的函式庫,也就是golang. org/x/net ,但這個官方函式庫真是出了奇Bug多
  • 使用GitHub大佬gorilla/websocket 函式庫,可以結合到某些Web開發框架,例如Gin、iris等,只要使用的框架式是基於golang.org/net 的,那麼這個函式庫就可以與這個框架結合
  • 手擼一個WebSocket框架

#相關學習推薦:Go語言教學

根據估算結果,彈幕推送量很大的時候,Linux核心將會出現瓶頸,因為Linux核心發送TCP包的時候極限包發送頻率是100W。因此可以將同一秒內的彈幕訊息合併為1條推送,減少網路小資料包的發送,從而降低推送頻率。

彈幕系統需要維護線上的用戶長連線來實現定向推送到線上的用戶,通常是使用Hash字典結構,通常推播訊息就是遍歷線上用的Hash字典。在彈幕推送期間用戶在不斷的上下線,為了維護上線用戶,那麼就得不斷的修改Hash字典,不斷地進行鎖定操作,用戶量過大導致鎖瓶頸。因此可以將整個Hash結構拆分為多個Hash結構,分別對多個Hash結構加不同的鎖,並且使用讀寫鎖來取代互斥鎖。

通常伺服器與客戶端互動使用JSON結構,那麼需要不斷的編碼解碼JSON數據,這將會導致CPU瓶頸。將訊息先合併,然後進行編碼,最後輪詢Hash結構進行推播。

以上是單體架構存在的問題,為了支援更多的用戶負載,通常彈幕系統採用分散式架構,進行彈性擴容縮容。

(2)推送還是拉取?

如果是客戶端拉取伺服器端數據,那麼將會存在以下幾個問題:

  • 直播線上人數多就表示訊息資料更新頻率高,拉取訊息意味著彈幕無法滿足時效性
  • 如果很多客戶端同時拉取,那麼伺服器端的壓力無異於DDOS
  • 一個彈幕系統應該是通用的,因此對於直播間彈幕較少的場景,意味著訊息資料拉取請求都是無效的

因此我們考慮推送模式:當資料發生更新的時候伺服器端主動推送到客戶端,這樣可以有效減少客戶端的請求次數。如果需要實現訊息推送,那麼就意味著伺服器端維護大量的長連線。

(3)為什麼要使用WebSocket?

實現彈幕訊息的即時更新一定是使用Socket的方式,那麼為啥要使用WebSocket呢?現在大部分直播應用程式的開發都是跨平台的,然而跨平台的開發框架本質就是Web開發,那麼一定離不開WebSocket,而且一部分用戶會選擇在Web端看視頻,比如Bilibili,現如今也有一些桌面應用是用Electron等跨平台框架開發的,例如Lark飛書等,因此實現訊息推送的最佳方案就是使用WebSocket。

使用WebSocket可以輕鬆的維持伺服器端長連接,其次WebSocket是架構在HTTP協定之上的,並且也可以使用HTTPS方式,因此WebSocket是可靠傳輸,並且不需要開發者關注底層細節。

為啥要使用Go搞WebSocket呢?首先說到WebSocket你可能會想到Node.js,但是Node.js是單線程模型,如果實現高並發,不得不創建多個Node.js進程,但是這又不容易服務端遍歷整個連接集合;如果使用Java就會顯得比較笨重,Java專案的部署,編寫Dockerfile都不如Go的目標二進位更加簡潔,並且Go協程很容易實現高並發,上一章說到Go語言目前也有成熟的WebSocket輪子。

(4)服務端基本Demo

首先建立好一個框架:

package main

import (
  "fmt"
  "net/http"
)

func main() {
 fmt.Println("Listen localhost:8080")
   // 注册一个用于WebSocket的路由,实际业务中不可能只有一个路由
  http.HandleFunc("/messages", messageHandler)
  // 监听8080端口,没有实现服务异常处理器,因此第二个参数是nil
  http.ListenAndServe("localhost:8080", nil)
}

func messageHandler(response http.ResponseWriter, request *http.Request) {
  // TODO: 实现消息处理
  response.Write([]byte("HelloWorld"))
}
登入後複製

然後完善messageHandler函數:

func messageHandler(response http.ResponseWriter, request *http.Request) {
  var upgrader = websocket.Upgrader{
    // 允许跨域
    CheckOrigin: func(resquest *http.Request) bool {
      return true
    },
  }

  // 建立连接
  conn, err := upgrader.Upgrade(response, request, nil)
  if err != nil {
    return
  }

  // 收发消息
  for {
    // 读取消息
    _, bytes, err := conn.ReadMessage()
    if err != nil {
      _ = conn.Close()
    }
    // 写入消息
    err = conn.WriteMessage(websocket.TextMessage, bytes)
    if err != nil {
      _ = conn.Close()
    }
  }
}
登入後複製

現在基本上實作了WebSocket功能,但是websocket的原生API不是線程安全的(Close方法是線程安全的,並且是可重入的),並且其他模組無法復用業務邏輯,因此進行封裝:

    ##封裝Connection物件描述一個WebSocket連線
  • 為Connection物件提供線程安全的關閉、接收、發送API
  • // main.go
    package main
    
    import (
      "bluemiaomiao.github.io/websocket-go/service"
      "fmt"
      "net/http"
    
      "github.com/gorilla/websocket"
    )
    
    func main() {
      fmt.Println("Listen localhost:8080")
      http.HandleFunc("/messages", messageHandler)
      _ = http.ListenAndServe("localhost:8080", nil)
    }
    
    func messageHandler(response http.ResponseWriter, request *http.Request) {
      var upgrader = websocket.Upgrader{
        // 允许跨域
        CheckOrigin: func(resquest *http.Request) bool {
          return true
        },
      }
    
      // 建立连接
      conn, err := upgrader.Upgrade(response, request, nil)
      wsConn, err := service.Create(conn)
      if err != nil {
        return
      }
    
      // 收发消息
      for {
        // 读取消息
        msg, err := wsConn.ReadOne()
        if err != nil {
          wsConn.Close()
        }
        // 写入消息
        err = wsConn.WriteOne(msg)
        if err != nil {
          _ = conn.Close()
        }
      }
    }
    登入後複製
    // service/messsage_service.go
    package service
    
    import (
      "errors"
      "github.com/gorilla/websocket"
      "sync"
    )
    
    // 封装的连接对象
    // 
    // 由于websocket的Close()方法是可重入的,所以可以多次调用,但是关闭Channel的close()
    // 方法不是可重入的,因此通过isClosed进行判断
    // isClosed可能发生资源竞争,因此通过互斥锁避免
    // 关闭websocket连接后,也要自动关闭输入输出消息流,因此通过signalCloseLoopChan实现
    type Connection struct {
      conn                  *websocket.Conn  // 具体的连接对象
      inputStream             chan []byte       // 输入流,使用Channel模拟
      outputStream           chan []byte       // 输出流,使用chaneel模拟
      signalCloseLoopChan     chan byte       // 关闭信号
      isClosed               bool            // 是否调用过close()方法
      lock                   sync.Mutex      // 简单的锁
    }
    
    // 用于初始化一个连接对象
    func Create(conn *websocket.Conn) (connection *Connection, err error) {
      connection = &Connection{
        conn:              conn,
        inputStream:        make(chan []byte, 1000),
        outputStream:       make(chan []byte, 1000),
        signalCloseLoopChan: make(chan byte, 1),
        isClosed:            false,
      }
    
      // 启动读写循环
      go connection.readLoop()
      go connection.writeLoop()
      return
    }
    
    // 读取一条消息
    func (c *Connection) ReadOne() (msg []byte, err error) {
      select {
      case msg = <-(*c).inputStream:
      case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
      }
      return
    }
    
    // 写入一条消息
    func (c *Connection) WriteOne(msg []byte) (err error) {
      select {
      case (*c).outputStream <- msg:
      case <-(*c).signalCloseLoopChan:
        err = errors.New("connection is closed")
      }
      return
    }
    
    // 关闭连接对象
    func (c *Connection) Close() {
      _ = (*c).conn.Close()
      (*c).lock.Lock()
      if !(*c).isClosed {
        close((*c).signalCloseLoopChan)
      }
      (*c).lock.Unlock()
    
    }
    
    // 读取循环
    func (c *Connection) readLoop() {
      // 不停的读取长连接中的消息,只要存在消息就将其放到队列中
      for {
        _, bytes, err := (*c).conn.ReadMessage()
        if err != nil {
          (*c).Close()
        }
        select {
        case <-(*c).signalCloseLoopChan:
          (*c).Close()
        case (*c).inputStream <- bytes:
        }
      }
    }
    
    // 写入循环
    func (c *Connection) writeLoop() {
      // 只要队列中存在消息,就将其写入
      var data []byte
      for {
        select {
        case data = <-(*c).outputStream:
        case <-(*c).signalCloseLoopChan:
          (*c).Close()
        }
        err := (*c).conn.WriteMessage(websocket.TextMessage, data)
        if err != nil {
          _ = (*c).conn.Close()
        }
      }
    }
    登入後複製
    至此,你已經學會如何使用Go構建WebSocket服務。

    以上是程式碼詳解使用Go基於WebSocket建構視訊直播彈幕系統的詳細內容。更多資訊請關注PHP中文網其他相關文章!

相關標籤:
來源:jb51.net
本網站聲明
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn
作者最新文章
熱門教學
更多>
最新下載
更多>
網站特效
網站源碼
網站素材
前端模板
關於我們 免責聲明 Sitemap
PHP中文網:公益線上PHP培訓,幫助PHP學習者快速成長!