首页 > 后端开发 > Golang > 正文

Golang 服务器向每个用户发送的事件

PHPz
发布: 2024-02-06 11:39:14
转载
1005 人浏览过

Golang 服务器向每个用户发送的事件

问题内容

我使用 Go 已经有一段时间了,但之前从未使用过 SSE。我遇到了一个问题,有人可以提供一个仅发送给特定用户(连接)的服务器发送事件的工作示例吗?

我正在使用 gorilla - 会话进行身份验证,并且我想使用 UserID 来分隔连接。

或者我应该通过 Ajax 使用 5 秒轮询?

非常感谢

这是我发现并尝试过的:

  1. https://gist.github.com/ismasan/3fb75381cd2deb6bfa9c 它不会发送给个人用户,并且如果连接关闭,go func 也不会停止

  2. https://github.com/striversity/gotr/blob/master/010-server-sent-event-part-2/main.go 这正是我所需要的,但一旦连接被删除。所以现在,一旦您在私人窗口中关闭并打开浏览器,它就根本无法工作。另外,如上所述,Go 例程继续进行。


正确答案


创建一个“代理”来将消息分发给连接的用户:

type broker struct {
    // users is a map where the key is the user id
    // and the value is a slice of channels to connections
    // for that user id
    users map[string][]chan []byte

    // actions is a channel of functions to call
    // in the broker's goroutine. the broker executes
    // everything in that single goroutine to avoid
    // data races.
    actions chan func()
}

// run executes in a goroutine. it simply gets and 
// calls functions.
func (b *broker) run() {
    for a := range b.actions {
        a()
    }
}

func newbroker() *broker {
    b := &broker{
        users:   make(map[string][]chan []byte),
        actions: make(chan func()),
    }
    go b.run()
    return b
}

// adduserchan adds a channel for user with given id.
func (b *broker) adduserchan(id string, ch chan []byte) {
    b.actions <- func() {
        b.users[id] = append(b.users[id], ch)
    }
}

// removeuserchan removes a channel for a user with the given id.
func (b *broker) removeuserchan(id string, ch chan []byte) {
    // the broker may be trying to send to 
    // ch, but nothing is receiving. pump ch
    // to prevent broker from getting stuck.
    go func() { for range ch {} }()

    b.actions <- func() {
        chs := b.users[id]
        i := 0
        for _, c := range chs {
            if c != ch {
                chs[i] = c
                i = i + 1
            }
        }
        if i == 0 {
            delete(b.users, id)
        } else {
            b.users[id] = chs[:i]
        }
        // close channel to break loop at beginning
        // of removeuserchan.
        // this must be done in broker goroutine
        // to ensure that broker does not send to
        // closed goroutine.
        close(ch)
    }
}

// sendtouser sends a message to all channels for the given user id.
func (b *broker) sendtouser(id string, data []byte) {
    b.actions <- func() {
        for _, ch := range b.users[id] {
            ch <- data
        }
    }
}
登录后复制

在包级别使用代理声明变量:

var broker = newbroker()
登录后复制

使用代理编写 sse 端点:

func sseEndpoint(w http.ResponseWriter, r *http.Request) {
    // I assume that user id is in query string for this example,
    // You should use your authentication code to get the id.
    id := r.FormValue("id")

    // Do the usual SSE setup.
    flusher := w.(http.Flusher)
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")
    w.Header().Set("Connection", "keep-alive")

    // Create channel to receive messages for this connection.  
    // Register that channel with the broker.
    // On return from the function, remove the channel
    // from the broker.
    ch := make(chan []byte)
    broker.addUserChan(id, ch)
    defer broker.removeUserChan(id, ch)
    for {
        select {
        case <-r.Context().Done():
            // User closed the connection. We are out of here.
            return
        case m := <-ch:
            // We got a message. Do the usual SSE stuff.
            fmt.Fprintf(w, "data: %s\n\n", m)
            flusher.Flush()
        }
    }
}
登录后复制

向您的应用程序添加代码以调用 broker.sendtouser。

以上是Golang 服务器向每个用户发送的事件的详细内容。更多信息请关注PHP中文网其他相关文章!

来源:stackoverflow.com
本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责声明 Sitemap
PHP中文网:公益在线PHP培训,帮助PHP学习者快速成长!