> 백엔드 개발 > Golang > Golang 연결 풀에 대한 여러 구현 사례

Golang 연결 풀에 대한 여러 구현 사례

藏色散人
풀어 주다: 2020-12-14 14:42:41
앞으로
2545명이 탐색했습니다.

다음 golang 튜토리얼 칼럼에서는 Golang 연결 풀의 여러 구현 사례를 소개하겠습니다. 도움이 필요한 친구들에게 도움이 되길 바랍니다!

Golang 연결 풀에 대한 여러 구현 사례

TCP의 3방향 핸드셰이크 및 기타 이유로 인해 연결을 설정하는 것은 상대적으로 비용이 많이 드는 작업입니다. 따라서 특정 엔터티와 여러 번 상호 작용해야 하는 프로그램에서는 재사용을 위해 재사용 가능한 연결로 연결 풀을 유지 관리해야 합니다.

연결 풀을 유지하기 위한 가장 기본적인 요구 사항은 스레드 안전입니다. 특히 goroutine 기능이 있는 Golang과 같은 언어에서는 더욱 그렇습니다.

간단한 연결 풀 구현

type Pool struct {
    m sync.Mutex //保证多个goroutine访问时候,closed的线程安全
    res chan io.Closer //连接存储的chan
    factory func() (io.Closer,error) //新建连接的工厂方法
    closed bool //连接池关闭标志
}
로그인 후 복사

이 간단한 연결 풀에서는 chan을 사용하여 연결을 풀에 저장합니다. 새로운 구조를 생성하는 방법은 비교적 간단합니다.

func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
    if size <= 0 {
        return nil, errors.New("size的值太小了。")
    }
    return &Pool{
        factory: fn,
        res:     make(chan io.Closer, size),
    }, nil
}
로그인 후 복사

해당 팩토리 기능과 연결 풀의 크기만 제공하면 됩니다.

연결 확보

그렇다면 어떻게 리소스를 얻을 수 있을까요? 내부 저장소 연결의 구조는 chan이므로 스레드 안전성을 보장하기 위해 간단한 select만 필요합니다.

//从资源池里获取一个资源
func (p *Pool) Acquire() (io.Closer,error) {
    select {
    case r,ok := <-p.res:
        log.Println("Acquire:共享资源")
        if !ok {
            return nil,ErrPoolClosed
        }
        return r,nil
    default:
        log.Println("Acquire:新生成资源")
        return p.factory()
    }
}
로그인 후 복사

먼저 연결 풀의 res chan에서 가져옵니다. 그렇지 않은 경우 이미 가지고 있는 것을 사용합니다. Factory 기능은 구축 연결을 수행합니다. 동시에 res에서 연결을 얻을 때 ok를 사용하여 먼저 연결 풀이 닫혔는지 확인합니다. 닫혔으면 이미 준비된 연결 닫힘 오류를 반환합니다.

Connection Pool 닫기

Connection Pool 닫기를 언급했는데, Connection Pool을 어떻게 닫을까요?

//关闭资源池,释放资源
func (p *Pool) Close() {
    p.m.Lock()
    defer p.m.Unlock()
    if p.closed {
        return
    }
    p.closed = true
    //关闭通道,不让写入了
    close(p.res)
    //关闭通道里的资源
    for r:=range p.res {
        r.Close()
    }
}
로그인 후 복사

여기서 먼저 p.m.Lock()* 잠금 작업을 수행해야 합니다. 이는 구조에서 *closed를 읽고 써야 하기 때문입니다. Acquire 메서드가 더 이상 새 연결을 얻을 수 없도록 이 플래그를 먼저 설정한 다음 res chan을 닫아야 합니다. 그런 다음 res 채널의 연결에 대해 닫기 작업을 수행합니다.

연결 해제

연결을 해제하려면 먼저 전제 조건이 있어야 합니다. 즉, 연결 풀이 닫혀 있지 않아야 합니다. 연결 풀이 닫혀 있고 res에 연결을 보내면 패닉이 발생합니다.

func (p *Pool) Release(r io.Closer){
    //保证该操作和Close方法的操作是安全的
    p.m.Lock()
    defer p.m.Unlock()
    //资源池都关闭了,就省这一个没有释放的资源了,释放即可
    if p.closed {
        r.Close()
        return
    }
    select {
    case p.res <- r:
        log.Println("资源释放到池子里了")
    default:
        log.Println("资源池满了,释放这个资源吧")
        r.Close()
    }
}
로그인 후 복사

위는 간단하고 스레드로부터 안전한 연결 풀 구현입니다. 우리가 볼 수 있는 것은 연결 풀이 지금 구현되었지만 여전히 몇 가지 작은 단점이 있다는 것입니다.

  1. 최대 연결 수에는 제한이 없습니다. 스레드 풀이 비어 있으면 직접 생성합니다. 기본적으로 새 연결이 반환됩니다. 동시성 양이 높아지면 새로운 연결이 계속 생성되어 너무 많은 연결 오류(특히 MySQL)가 쉽게 발생할 수 있습니다. too many connections的报错发生。

  2. 既然我们需要保证最大可获取连接数量,那么我们就不希望数量定的太死。希望空闲的时候可以维护一定的空闲连接数量idleNum,但是又希望我们能限制最大可获取连接数量maxNum。

  3. 第一种情况是并发过多的情况,那么如果并发量过少呢?现在我们在新建一个连接并且归还后,我们很长一段时间不再使用这个连接。那么这个连接很有可能在几个小时甚至更长时间之前就已经建立的了。长时间闲置的连接我们并没有办法保证它的可用性。便有可能我们下次获取的连接是已经失效的连接。

那么我们可以从已经成熟使用的MySQL连接池库和Redis连接池库中看看,它们是怎么解决这些问题的。

Golang标准库的Sql连接池

Golang的连接池实现在标准库 database/sql/sql.go下。当我们运行:

db, err := sql.Open("mysql", "xxxx")
로그인 후 복사

的时候,就会打开一个连接池。我们可以看看返回的db的结构体:

type DB struct {
    waitDuration int64 // Total time waited for new connections.
    mu           sync.Mutex // protects following fields
    freeConn     []*driverConn
    connRequests map[uint64]chan connRequest
    nextRequest  uint64 // Next key to use in connRequests.
    numOpen      int    // number of opened and pending open connections
    // Used to signal the need for new connections
    // a goroutine running connectionOpener() reads on this chan and
    // maybeOpenNewConnections sends on the chan (one send per needed connection)
    // It is closed during db.Close(). The close tells the connectionOpener
    // goroutine to exit.
    openerCh          chan struct{}
    closed            bool
    maxIdle           int                    // zero means defaultMaxIdleConns; negative means 0
    maxOpen           int                    // <= 0 means unlimited
    maxLifetime       time.Duration          // maximum amount of time a connection may be reused
    cleanerCh         chan struct{}
    waitCount         int64 // Total number of connections waited for.
    maxIdleClosed     int64 // Total number of connections closed due to idle.
    maxLifetimeClosed int64 // Total number of connections closed due to max free limit.
}
로그인 후 복사

上面省去了一些暂时不需要关注的field。我们可以看的,DB这个连接池内部存储连接的结构freeConn,并不是我们之前使用的chan,而是[]*driverConn,一个连接切片。同时我们还可以看到,里面有maxIdle等相关变量来控制空闲连接数量。值得注意的是,DB的初始化函数Open函数并没有新建数据库连接。而新建连接在哪个函数呢?我们可以在Query方法一路往回找,我们可以看到这个函数: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)

🎜얻을 수 있는 최대 연결 수를 보장해야 하므로 그 수가 너무 엄격하지 않기를 바랍니다. 유휴 상태일 때 특정 수의 유휴 연결(idleNum)을 유지할 수 있기를 바라지만, 얻을 수 있는 최대 연결 수(maxNum)도 제한할 수 있기를 바랍니다. 🎜🎜🎜첫 번째 상황은 동시성이 너무 많은 경우입니다. 그렇다면 동시성의 양이 너무 적다면 어떨까요? 이제 새 연결을 만들고 반환한 후에는 더 이상 이 연결을 오랫동안 사용하지 않습니다. 그렇다면 이 연결은 몇 시간 또는 그 이상 전에 설정되었을 가능성이 높습니다. 오랫동안 유휴 상태인 연결의 가용성을 보장할 수 있는 방법은 없습니다. 다음에 얻는 연결이 만료된 연결일 가능성이 있습니다. 🎜🎜그런 다음 이러한 문제를 어떻게 해결하는지 알아보기 위해 성숙하게 사용된 MySQL 연결 풀 라이브러리와 Redis 연결 풀 라이브러리를 살펴보겠습니다. 🎜🎜Golang 표준 라이브러리의 SQL 연결 풀🎜🎜Golang의 연결 풀은 표준 라이브러리 database/sql/sql.go에 구현되어 있습니다. 🎜
// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码
로그인 후 복사
로그인 후 복사
🎜를 실행하면 연결 풀이 열립니다. 반환된 🎜db🎜의 구조를 살펴보면 🎜
...
    // 如果已经超过最大打开数量了,就不需要在回归pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 删除这个在排队的请求。
        if err == nil {
            dc.inUse = true
        }
        // 把连接给这个正在排队的连接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...
로그인 후 복사
로그인 후 복사
🎜 당분간 주의가 필요하지 않은 일부 필드는 생략됩니다. Connection을 저장하기 위한 DB Connection Pool 내부의 🎜freeConn🎜 구조가 이전에 사용했던 chan이 아닌, Connection Slice인 🎜[]*driverConn🎜 구조임을 알 수 있습니다. 동시에 유휴 연결 수를 제어하는 ​​🎜maxIdle🎜과 같은 관련 변수가 있음을 알 수 있습니다. 🎜DB🎜의 초기화 함수 🎜Open🎜이 새 데이터베이스 연결을 생성하지 않는다는 점은 주목할 가치가 있습니다. 새로운 연결을 생성하는 데 사용되는 기능은 무엇입니까? 🎜Query🎜 메소드로 다시 돌아가서 다음 함수를 볼 수 있습니다: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error) . 연결 풀에서 연결을 얻는 방법은 여기에서 시작됩니다. 🎜

获取连接

// conn returns a newly-opened or cached *driverConn.
func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
    // 先判断db是否已经关闭。
    db.mu.Lock()
    if db.closed {
        db.mu.Unlock()
        return nil, errDBClosed
    }
    // 注意检测context是否已经被超时等原因被取消。
    select {
    default:
    case <-ctx.Done():
        db.mu.Unlock()
        return nil, ctx.Err()
    }
    lifetime := db.maxLifetime
    // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。
    numFree := len(db.freeConn)
    if strategy == cachedOrNewConn && numFree > 0 {
        conn := db.freeConn[0]
        copy(db.freeConn, db.freeConn[1:])
        db.freeConn = db.freeConn[:numFree-1]
        conn.inUse = true
        db.mu.Unlock()
        if conn.expired(lifetime) {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        // Lock around reading lastErr to ensure the session resetter finished.
        conn.Lock()
        err := conn.lastErr
        conn.Unlock()
        if err == driver.ErrBadConn {
            conn.Close()
            return nil, driver.ErrBadConn
        }
        return conn, nil
    }
    // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待
    if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
        // 下面的动作相当于往connRequests这个map插入自己的号码牌。
        // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。
        req := make(chan connRequest, 1)
        reqKey := db.nextRequestKeyLocked()
        db.connRequests[reqKey] = req
        db.waitCount++
        db.mu.Unlock()
        waitStart := time.Now()
        // Timeout the connection request with the context.
        select {
        case <-ctx.Done():
            // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。
            db.mu.Lock()
            delete(db.connRequests, reqKey)
            db.mu.Unlock()
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            select {
            default:
            case ret, ok := <-req:
                // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还!
                if ok && ret.conn != nil {
                    db.putConn(ret.conn, ret.err, false)
                }
            }
            return nil, ctx.Err()
        case ret, ok := <-req:
            // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。
            atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
            if !ok {
                return nil, errDBClosed
            }
            if ret.err == nil && ret.conn.expired(lifetime) {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            if ret.conn == nil {
                return nil, ret.err
            }
            ret.conn.Lock()
            err := ret.conn.lastErr
            ret.conn.Unlock()
            if err == driver.ErrBadConn {
                ret.conn.Close()
                return nil, driver.ErrBadConn
            }
            return ret.conn, ret.err
        }
    }
    // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。
    db.numOpen++ // optimistically
    db.mu.Unlock()
    ci, err := db.connector.Connect(ctx)
    if err != nil {
        db.mu.Lock()
        db.numOpen-- // correct for earlier optimism
        db.maybeOpenNewConnections()
        db.mu.Unlock()
        return nil, err
    }
    db.mu.Lock()
    dc := &driverConn{
        db:        db,
        createdAt: nowFunc(),
        ci:        ci,
        inUse:     true,
    }
    db.addDepLocked(dc, dc)
    db.mu.Unlock()
    return dc, nil
}
复制代码
로그인 후 복사
로그인 후 복사

简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?

释放连接

我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool这个方法。就像注释说的,这个方法主要的目的是:

Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.

我们主要来看看里面重点那几行:

...
    // 如果已经超过最大打开数量了,就不需要在回归pool了
    if db.maxOpen > 0 && db.numOpen > db.maxOpen {
        return false
    }
    // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。
    if c := len(db.connRequests); c > 0 {
        var req chan connRequest
        var reqKey uint64
        for reqKey, req = range db.connRequests {
            break
        }
        delete(db.connRequests, reqKey) // 删除这个在排队的请求。
        if err == nil {
            dc.inUse = true
        }
        // 把连接给这个正在排队的连接。
        req <- connRequest{
            conn: dc,
            err:  err,
        }
        return true
    } else if err == nil && !db.closed {
        // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。
        if db.maxIdleConnsLocked() > len(db.freeConn) {
            db.freeConn = append(db.freeConn, dc)
            db.startCleanerLocked()
            return true
        }
        db.maxIdleClosed++
    }
...
로그인 후 복사
로그인 후 복사

我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。

现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。

值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。

redis Golang实现的Redis客户端

这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。

而它的连接池结构如下

type ConnPool struct {
    ...
    queue chan struct{}
    connsMu      sync.Mutex
    conns        []*Conn
    idleConns    []*Conn
    poolSize     int
    idleConnsLen int
    stats Stats
    _closed  uint32 // atomic
    closedCh chan struct{}
}
로그인 후 복사

我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queueconnsidleConns这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:connsidleConns,那么问题来了:

到底连接存在哪里?

新建连接池连接

我们先从新建连接池连接开始看:

func NewConnPool(opt *Options) *ConnPool {
    ....
    p.checkMinIdleConns()
    if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
        go p.reaper(opt.IdleCheckFrequency)
    }
    ....
}
로그인 후 복사

初始化连接池的函数有个和前面两个不同的地方。

  1. checkMinIdleConns方法,在连接池初始化的时候就会往连接池填满空闲的连接。

  2. go p.reaper(opt.IdleCheckFrequency)则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。

获取连接

func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
    if p.closed() {
        return nil, ErrClosed
    }
    //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。
    err := p.waitTurn(ctx)
    if err != nil {
        return nil, err
    }
    //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。
    for {
        p.connsMu.Lock()
        //从空闲连接里面先获取一个空闲连接。
        cn := p.popIdle()
        p.connsMu.Unlock()
        if cn == nil {
            // 没有空闲连接时候直接跳出循环。
            break
        }
        // 判断是否已经过时,是的话close掉了然后继续取出。
        if p.isStaleConn(cn) {
            _ = p.CloseConn(cn)
            continue
        }
        atomic.AddUint32(&p.stats.Hits, 1)
        return cn, nil
    }
    atomic.AddUint32(&p.stats.Misses, 1)
    // 如果没有空闲连接的话,这边就直接新建连接了。
    newcn, err := p.newConn(ctx, true)
    if err != nil {
        // 归还令牌。
        p.freeTurn()
        return nil, err
    }
    return newcn, nil
}
로그인 후 복사

我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()这句话可以看出,获取连接这个动作,是从 idleConns里面获取的,而里面的函数也证明了这一点。同时我的理解是:

  1. sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。

  2. redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。

而:

func (p *ConnPool) freeTurn() {
    <-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
    case p.queue <- struct{}{}:
        return nil
...
}
로그인 후 복사

就是在靠queue这个chan来维持令牌数量。

那么 conns的作用是什么呢?我们可以来看看新建连接这个函数:

新建连接

func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
    cn, err := p.dialConn(ctx, pooled)
    if err != nil {
        return nil, err
    }
    p.connsMu.Lock()
    p.conns = append(p.conns, cn)
    if pooled {
        // 如果连接池满了,会在后面移除。
        if p.poolSize >= p.opt.PoolSize {
            cn.pooled = false
        } else {
            p.poolSize++
        }
    }
    p.connsMu.Unlock()
    return cn, nil
}
로그인 후 복사

基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns里面,而是先放 conns里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。

归还连接

func (p *ConnPool) Put(cn *Conn) {
    if cn.rd.Buffered() > 0 {
        internal.Logger.Printf("Conn has unread data")
        p.Remove(cn, BadConnError{})
        return
    }
    //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。
    if !cn.pooled {
        p.Remove(cn, nil)
        return
    }
    p.connsMu.Lock()
    p.idleConns = append(p.idleConns, cn)
    p.idleConnsLen++
    p.connsMu.Unlock()
    //我们可以看到很明显的这个归还号码牌的动作。
    p.freeTurn()
}
로그인 후 복사

其实归还的过程,就是从 conns转移到 idleConns的过程。当然了,如果新建这个连接时候发现已经 超卖了,后面归还时候就不转移,直接删除了。

等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:

  1. 先 waitTurn,拿到令牌。而令牌数量是根据pool里面的 queue决定的。

  2. 拿到令牌了,去库房 idleConns里面拿空闲的连接。没有的话就自己 newConn一个,并且把他记录到 conns里面。

  3. 사용이 끝나면 put을 호출하여 반환합니다. 즉, conns에서 idleConns로 전송합니다. 반품시 newConn에서 과매도 표시가 되어 있는지 확인해주세요. 그렇다면 idleConns로 전송되지 않습니다. put归还:也就是从 conns转移到 idleConns。归还的时候就检查在 newConn时候是不是已经做了超卖标记了。是的话就不转移到 idleConns

我当时疑惑了好久,既然始终都需要获得令牌才能得到连接,令牌数量是定的。为什么还会超卖呢?翻了一下源码,我的答案是:

虽然 Get方法获取连接是 newConn这个私用方法,受到令牌管制导致不会出现超卖。但是这个方法接受传参:pooledbool。所以我猜是担心其他人调用这个方法时候,不管三七二十一就传了true,导致poolSize越来越大。

总的来说,redis这个连接池的连接数控制,还是在 queue

오랫동안 헷갈렸어요. 연결하려면 항상 토큰을 얻어야 하기 때문에 토큰 개수는 정해져 있어요. 왜 아직도 과매도 상태인가요? 소스 코드를 살펴본 후 내 대답은 다음과 같습니다.

연결을 얻기 위한 Get 메서드는 newConn의 비공개 메서드이지만 토큰 제어의 대상이 되며 과매도를 일으키지 않습니다. 하지만 이 메소드는 pooledbool 매개변수를 허용합니다. 그래서 다른 사람들이 이 메소드를 호출하면 상황에 관계없이 true를 전달해서 poolSize가 점점 커지는 게 아닐까 걱정이 되는 것 같아요.

일반적으로 Redis 연결 풀의 연결 수는 제가 토큰이라고 부르는 queue chan에 의해 제어됩니다.

요약


위에서 볼 수 있듯이 연결 풀의 가장 기본적인 보장은 연결을 얻을 때 스레드 안전성입니다. 그러나 많은 추가 기능을 구현할 때는 다양한 각도에서 구현됩니다. 여전히 매우 흥미롭습니다. 그러나 저장 구조가 chan이든 슬라이스이든 관계없이 이는 매우 잘 달성될 수 있습니다. 슬라이스를 사용하여 SQL 또는 Redis와 같은 연결을 저장하는 경우 대기열 효과를 나타내는 구조를 유지해야 합니다.

🎜🎜글 작성자: 행운을 빌어주는 샤오링 날짜: 2020-02-28 원본 링크: https://juejin.im/post/5e58e3b7f265da57537eb7ed🎜🎜🎜🎜🎜

위 내용은 Golang 연결 풀에 대한 여러 구현 사례의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

관련 라벨:
원천:csdn.net
본 웹사이트의 성명
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.
인기 튜토리얼
더>
최신 다운로드
더>
웹 효과
웹사이트 소스 코드
웹사이트 자료
프론트엔드 템플릿