Implémentation du service Push
Principe de base
Après le démarrage du serveur, il enregistrera deux gestionnaires.
websocketHandler est utilisé pour permettre au navigateur d'envoyer une demande de mise à niveau et de passer à une connexion WebSocket.
pushHandler est utilisé pour fournir des requêtes aux terminaux push externes pour envoyer des données push.
Le navigateur se connecte d'abord au websocketHandler (l'adresse par défaut est ws://ip:port/ws) et la demande de mise à niveau est une connexion WebSocket. Lorsque la connexion est établie, les informations d'enregistrement doivent être envoyées. inscription. Les informations d'enregistrement ici contiennent des informations de jeton.
Le serveur vérifiera le jeton fourni et obtiendra l'ID utilisateur correspondant (d'une manière générale, un ID utilisateur peut être associé à plusieurs jetons en même temps), et enregistrera et maintiendra la relation entre le jeton, l'ID utilisateur et la connexion (connexion ) .
L'extrémité push envoie une demande pour transmettre des données à pushHandler (l'adresse par défaut est ws://ip:port/push). La demande contient le champ userId et le champ message. Le serveur obtiendra toutes les connexions connectées au serveur à ce moment-là en fonction de l'ID utilisateur, puis enverra les messages un par un.
En raison de la nature en temps réel du service push, les données poussées n'ont pas et n'ont pas besoin d'être mises en cache.
Explication détaillée du code
Je vais décrire brièvement la structure de base du code ici, et parler également de certaines méthodes et modèles d'écriture couramment utilisés dans le langage Go (Je viens aussi d'autres languestournez-vous vers la langue Go. Après tout, la langue Go est également assez jeune, donc si vous avez des suggestions, n'hésitez pas à les donner.
Étant donné que la plupart des inventeurs et certains principaux mainteneurs du langage Go sont issus du langage C/C++, le code du langage Go est également plus orienté vers le système C/C++.
Jetez d'abord un œil à la structure du serveur :
// Server defines parameters for running websocket server. type Server struct { // Address for server to listen on Addr string // Path for websocket request, default "/ws". WSPath string // Path for push message, default "/push". PushPath string // Upgrader is for upgrade connection to websocket connection using // "github.com/gorilla/websocket". // // If Upgrader is nil, default upgrader will be used. Default upgrader is // set ReadBufferSize and WriteBufferSize to 1024, and CheckOrigin always // returns true. Upgrader *websocket.Upgrader // Check token if it's valid and return userID. If token is valid, userID // must be returned and ok should be true. Otherwise ok should be false. AuthToken func(token string) (userID string, ok bool) // Authorize push request. Message will be sent if it returns true, // otherwise the request will be discarded. Default nil and push request // will always be accepted. PushAuth func(r *http.Request) bool wh *websocketHandler ph *pushHandler }
Ici, nous parlons de Upgrader *websocket.Upgrader, qui est l'objet du package gorilla/websocket, qui est utilisé pour mettre à niveau HTTP demandes.
Si une structure a trop de paramètres, il n'est généralement pas recommandé de l'initialiser directement, mais d'utiliser la méthode New qu'elle propose. Le voici :
// NewServer creates a new Server.func NewServer(addr string) *Server { return &Server{ Addr: addr, WSPath: serverDefaultWSPath, PushPath: serverDefaultPushPath, } }
Il s'agit également d'une utilisation courante du langage Go pour fournir des méthodes d'initialisation au monde extérieur.
Ensuite, le serveur utilise la méthode ListenAndServe pour démarrer et écouter le port, similaire à l'utilisation du package http :
// ListenAndServe listens on the TCP network address and handle websocket // request. func (s *Server) ListenAndServe() error { b := &binder{ userID2EventConnMap: make(map[string]*[]eventConn), connID2UserIDMap: make(map[string]string), } // websocket request handler wh := websocketHandler{ upgrader: defaultUpgrader, binder: b, } if s.Upgrader != nil { wh.upgrader = s.Upgrader } if s.AuthToken != nil { wh.calcUserIDFunc = s.AuthToken } s.wh = &wh http.Handle(s.WSPath, s.wh) // push request handler ph := pushHandler{ binder: b, } if s.PushAuth != nil { ph.authFunc = s.PushAuth } s.ph = &ph http.Handle(s.PushPath, s.ph) return http.ListenAndServe(s.Addr, nil) }
Ici, nous avons généré deux gestionnaires, à savoir websocketHandler et pushHandler. websocketHandler est chargé d'établir une connexion avec le navigateur et de transmettre les données, tandis que pushHandler gère les requêtes push-side.
Comme vous pouvez le voir, les deux gestionnaires encapsulent ici un objet classeur. Ce classeur est utilisé pour maintenir la relation entre le jeton <-> userID <-> Conn :
// binder is defined to store the relation of userID and eventConn type binder struct { mu sync.RWMutex // map stores key: userID and value of related slice of eventConn userID2EventConnMap map[string]*[]eventConn // map stores key: connID and value: userID connID2UserIDMap map[string]string }
websocketHandler
Examinons en détail l'implémentation de websocketHandler.
// websocketHandler defines to handle websocket upgrade request. type websocketHandler struct { // upgrader is used to upgrade request. upgrader *websocket.Upgrader // binder stores relations about websocket connection and userID. binder *binder // calcUserIDFunc defines to calculate userID by token. The userID will // be equal to token if this function is nil. calcUserIDFunc func(token string) (userID string, ok bool) }
Structure très simple. websocketHandler implémente l'interface http.Handler :
// First try to upgrade connection to websocket. If success, connection will // be kept until client send close message or server drop them. func (wh *websocketHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { wsConn, err := wh.upgrader.Upgrade(w, r, nil) if err != nil { return } defer wsConn.Close() // handle Websocket request conn := NewConn(wsConn) conn.AfterReadFunc = func(messageType int, r io.Reader) { var rm RegisterMessage decoder := json.NewDecoder(r) if err := decoder.Decode(&rm); err != nil { return } // calculate userID by token userID := rm.Token if wh.calcUserIDFunc != nil { uID, ok := wh.calcUserIDFunc(rm.Token) if !ok { return } userID = uID } // bind wh.binder.Bind(userID, rm.Event, conn) } conn.BeforeCloseFunc = func() { // unbind wh.binder.Unbind(conn) } conn.Listen() }
Convertit d'abord la requête http.Request entrante en websocket.Conn, puis la conditionne dans notre wserver.Conn personnalisé (l'encapsulation, ou combinaison, est une utilisation typique du langage Go . N'oubliez pas que la langue Go n'a pas d'héritage, seulement une composition).
Ensuite, les méthodes AfterReadFunc et BeforeCloseFunc de Conn sont définies, puis conn.Listen() est démarré. AfterReadFunc signifie qu'après que Conn ait lu les données, il essaie de vérifier et de calculer l'ID utilisateur en fonction du jeton, puis la liaison enregistre la liaison. BeforeCloseFunc effectue l'opération de dissociation avant la fermeture de Conn.
pushHandler
pushHandler est facile à comprendre. Il analyse la requête puis pousse les données :
// Authorize if needed. Then decode the request and push message to each // realted websocket connection. func (s *pushHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } // authorize if s.authFunc != nil { if ok := s.authFunc(r); !ok { w.WriteHeader(http.StatusUnauthorized) return } } // read request var pm PushMessage decoder := json.NewDecoder(r.Body) if err := decoder.Decode(&pm); err != nil { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } // validate the data if pm.UserID == "" || pm.Event == "" || pm.Message == "" { w.WriteHeader(http.StatusBadRequest) w.Write([]byte(ErrRequestIllegal.Error())) return } cnt, err := s.push(pm.UserID, pm.Event, pm.Message) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error())) return } result := strings.NewReader(fmt.Sprintf("message sent to %d clients", cnt)) io.Copy(w, result) } Conn Conn (此处指 wserver.Conn) 为 websocket.Conn 的包装。 // Conn wraps websocket.Conn with Conn. It defines to listen and read // data from Conn. type Conn struct { Conn *websocket.Conn AfterReadFunc func(messageType int, r io.Reader) BeforeCloseFunc func() once sync.Once id string stopCh chan struct{} }
La méthode principale est Listen() :
// Listen listens for receive data from websocket connection. It blocks // until websocket connection is closed. func (c *Conn) Listen() { c.Conn.SetCloseHandler(func(code int, text string) error { if c.BeforeCloseFunc != nil { c.BeforeCloseFunc() } if err := c.Close(); err != nil { log.Println(err) } message := websocket.FormatCloseMessage(code, "") c.Conn.WriteControl(websocket.CloseMessage, message, time.Now().Add(time.Second)) return nil }) // Keeps reading from Conn util get error. ReadLoop: for { select { case <-c.stopCh: break ReadLoop default: messageType, r, err := c.Conn.NextReader() if err != nil { // TODO: handle read error maybe break ReadLoop } if c.AfterReadFunc != nil { c.AfterReadFunc(messageType, r) } } } }
Configure principalement le traitement et la lecture continue des données lorsque la connexion websocket est fermée.
Recommandé : gtutoriel olang
Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!