Maison > développement back-end > Golang > Plusieurs cas d'implémentation sur le pool de connexions Golang

Plusieurs cas d'implémentation sur le pool de connexions Golang

藏色散人
Libérer: 2020-12-14 14:42:41
avant
2545 Les gens l'ont consulté

Ce qui suit est la colonne du tutoriel Golang pour vous présenter plusieurs cas d'implémentation du pool de connexions Golang. J'espère que cela vous sera utile. amis dans le besoin !

Plusieurs cas d'implémentation sur le pool de connexions Golang

En raison de la prise de contact à trois de TCP et d'autres raisons, l'établissement d'une connexion est un acte relativement coûteux. Par conséquent, dans un programme qui doit interagir plusieurs fois avec une entité spécifique, il est nécessaire de maintenir un pool de connexions avec des connexions réutilisables pour la réutilisation.

Pour maintenir un pool de connexions, l'exigence la plus fondamentale est d'atteindre : thread safe (thread safety) , en particulier dans un langage comme Golang qui comporte goroutine.

Implémentation d'un pool de connexions simple

type Pool struct {
    m sync.Mutex //保证多个goroutine访问时候,closed的线程安全
    res chan io.Closer //连接存储的chan
    factory func() (io.Closer,error) //新建连接的工厂方法
    closed bool //连接池关闭标志
}
Copier après la connexion

Pour ce pool de connexions simple, nous utilisons chan pour stocker les connexions dans le pool. La méthode de création d'une nouvelle structure est également relativement simple :

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
Copier après la connexion

il suffit de fournir la fonction d'usine correspondante et la taille du pool de connexions.

Établissez la connexion

Alors, comment pouvons-nous en tirer des ressources ? Parce que la structure de notre connexion de stockage interne est chan, nous n'avons besoin que d'un simple select pour garantir la sécurité des threads :

//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享资源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成资源")
        return p.factory()
    }
}
Copier après la connexion

Nous l'obtenons d'abord à partir du res chan du pool de connexions, sinon Nous utiliserons la fonction d'usine que nous avons déjà préparée pour construire la connexion. En même temps, lorsque nous obtenons la connexion de res, nous utilisons ok pour déterminer d'abord si le pool de connexions a été fermé. S'il a été fermé, nous renverrons l'erreur de connexion fermée déjà préparée.

Fermer le pool de connexions

Donc, puisque nous avons mentionné la fermeture du pool de connexions, comment fermer le pool de connexions ?

//关闭资源池,释放资源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true
    //关闭通道,不让写入了
    close(p.res)
    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }
}
Copier après la connexion

Ici, nous devons d'abord effectuer l'opération de verrouillage p.m.Lock()* C'est parce que nous devons lire et écrire le *closed dans la structure. Vous devez d'abord définir cet indicateur, puis fermer le res chan afin que la méthode Acquire ne puisse plus obtenir de nouvelles connexions. On effectue ensuite l'opération Close sur la connexion dans le chan de res.

Libérer la connexion

Il doit y avoir une condition préalable pour libérer la connexion, c'est-à-dire que le pool de connexions n'a pas été fermé. Si le pool de connexions a été fermé et que vous envoyez une connexion à res, la panique se déclenchera.

func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()
    //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()
        return
    }
    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}
Copier après la connexion

Ce qui précède est une implémentation de pool de connexions simple et sécurisée pour les threads. Ce que nous pouvons constater, c'est que même si le pool de connexions a été mis en place maintenant, il existe encore plusieurs petites lacunes :

  1. Nous n'avons aucune limite sur le nombre maximum de connexions. pool de threads S'il est vide, par défaut nous créerons directement une nouvelle connexion et la renverrons. Une fois que le niveau de concurrence est élevé, de nouvelles connexions continueront à être créées, ce qui peut facilement provoquer des erreurs too many connections (en particulier MySQL).

  2. Puisque nous devons garantir le nombre maximum de connexions pouvant être obtenues, nous ne voulons pas que le nombre soit trop rigide. Nous espérons pouvoir maintenir un certain nombre de connexions inactives IdleNum lorsque nous sommes inactifs, mais nous espérons également pouvoir limiter le nombre maximum de connexions pouvant être obtenues maxNum.

  3. La première situation est celle d'une concurrence trop importante. Et si la quantité de concurrence est trop faible ? Maintenant, après avoir créé une nouvelle connexion et l'avoir renvoyée, nous n'utilisons plus cette connexion pendant longtemps. Il est alors probable que cette connexion ait été établie il y a quelques heures, voire plus. Nous n'avons aucun moyen de garantir la disponibilité d'une connexion restée inactive pendant une longue période. Il est possible que la connexion que nous obtiendrons la prochaine fois soit une connexion expirée.

Ensuite, nous pouvons examiner la bibliothèque de pool de connexions MySQL et la bibliothèque de pool de connexions Redis qui ont été utilisées avec maturité pour voir comment elles résolvent ces problèmes.

Pool de connexions SQL de la bibliothèque standard de Golang

Le pool de connexions de Golang est implémenté sous la bibliothèque standard database/sql/sql.go. Lorsque nous exécutons :

db, err := sql.Open("mysql", "xxxx")
Copier après la connexion

, un pool de connexions sera ouvert. Nous pouvons jeter un œil à la structure du db renvoyé :

type DB struct {
    waitDuration int64 // Total time waited for new connections.
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}
    waitCount         int64 // Total number of connections waited for.
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
Copier après la connexion

Ce qui précède omet certains champs qui ne nécessitent pas d'attention pour le moment. Nous pouvons voir que la structure freeConn qui stocke les connexions à l'intérieur du pool de connexions DB n'est pas le chan que nous avons utilisé auparavant, mais []*driverConn, une tranche de connexion. Dans le même temps, nous pouvons également voir qu'il existe des variables associées telles que maxIdle pour contrôler le nombre de connexions inactives. Il est à noter que la fonction d'initialisation Open de DB ne crée pas de nouvelle connexion à la base de données. Quelle fonction est utilisée pour créer une nouvelle connexion ? On peut remonter jusqu'en arrière dans la méthode Query, et on peut voir cette fonction : func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error). Et notre méthode pour obtenir une connexion à partir du pool de connexions commence ici :

获取连接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码
Copier après la connexion

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:

...
    // 如果已经超过最大打开数量了,就不需要在回归pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 删除这个在排队的请求。
        if err == nil {
            dc.inUse = true
        }
        // 把连接给这个正在排队的连接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...
Copier après la connexion

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

redis Golang实现的Redis客户端

这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。

而它的连接池结构如下

type ConnPool struct {
    ...
    queue chan struct{}
    connsMu      sync.Mutex
    conns        []*Conn
    idleConns    []*Conn
    poolSize     int
    idleConnsLen int
    stats Stats
    _closed  uint32 // atomic
    closedCh chan struct{}
}
Copier après la connexion

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

func NewConnPool(opt *Options) *ConnPool {
    ....
    p.checkMinIdleConns()
    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }
    ....
}
Copier après la connexion

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。

  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }
    //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }
    //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
    for {
        p.connsMu.Lock()
        //从空闲连接里面先获取一个空闲连接。
        cn := p.popIdle()
        p.connsMu.Unlock()
        if cn == nil {
            // 没有空闲连接时候直接跳出循环。
            break
        }
        // 判断是否已经过时,是的话close掉了然后继续取出。
        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }
        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }
    atomic.AddUint32(&p.stats.Misses, 1)
    // 如果没有空闲连接的话,这边就直接新建连接了。
    newcn, err := p.newConn(ctx, true)
    if err != nil {
        // 归还令牌。
        p.freeTurn()
        return nil, err
    }
    return newcn, nil
}
Copier après la connexion

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()这句话可以看出,获取连接这个动作,是从 idleConns里面获取的,而里面的函数也证明了这一点。同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。

  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

func (p *ConnPool) freeTurn() {
    <-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
    case p.queue <- struct{}{}:
        return nil
...
}
Copier après la connexion

就是在靠queue这个chan来维持令牌数量。

那么 conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }
    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // 如果连接池满了,会在后面移除。
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}
Copier après la connexion

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns里面,而是先放 conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }
    //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }
    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    //我们可以看到很明显的这个归还号码牌的动作。
    p.freeTurn()
}
Copier après la connexion

其实归还的过程,就是从 conns转移到 idleConns的过程。当然了,如果新建这个连接时候发现已经 超卖了,后面归还时候就不转移,直接删除了。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. 先 waitTurn,拿到令牌。而令牌数量是根据pool里面的 queue决定的。

  2. 拿到令牌了,去库房 idleConns里面拿空闲的连接。没有的话就自己 newConn一个,并且把他记录到 conns里面。

    Lorsque
  3. est épuisé, appelez putRetour : c'est-à-dire transférez de conns à idleConns. Lorsque vous le retournerez, vérifiez s'il a été marqué comme survendu chez newConn. Si tel est le cas, il ne sera pas transféré à idleConns.

J'ai longtemps été perplexe. Puisqu'il faut toujours obtenir un token pour se connecter, le nombre de tokens est fixe. Pourquoi est-il encore survendu ? Après avoir parcouru le code source, ma réponse est :

Bien que la Get méthode pour obtenir la connexion soit newConn cette méthode privée, elle est soumise au contrôle des jetons et n'entraînera pas de survente. Mais cette méthode accepte les paramètres : pooledbool. Je suppose donc que je crains que lorsque d'autres personnes appellent cette méthode, elles passent vrai quelle que soit la situation, ce qui entraînera une augmentation de plus en plus grande de poolSize.

En général, le nombre de connexions dans le pool de connexions Redis est contrôlé par le queuechan que j'appelle le jeton.

Résumé

Comme vous pouvez le voir ci-dessus, la garantie la plus fondamentale du pool de connexions est la sécurité des threads lors de l'obtention de la connexion. Mais lors de la mise en œuvre de nombreuses fonctionnalités supplémentaires, elles sont implémentées sous différents angles. Toujours très intéressant. Mais que la structure de stockage soit chan ou slice, cela peut être très bien réalisé. Si vous utilisez slice pour stocker des connexions telles que SQL ou Redis, vous devez conserver une structure pour représenter l'effet de la file d'attente.

Auteur de cet article : Xiao Ling, s'il vous plaît, bonne chance Date : 2020-02-28 Lien original : https://juejin.im/post/5e58e3b7f265da57537eb7ed


Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Étiquettes associées:
source:csdn.net
Déclaration de ce site Web
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn
Tutoriels populaires
Plus>
Derniers téléchargements
Plus>
effets Web
Code source du site Web
Matériel du site Web
Modèle frontal