The following golang tutorial column will introduce to you several implementation cases of Golang connection pool. I hope it will be helpful to friends in need!
Due to TCP's three-way handshake and other reasons, establishing a connection is a relatively expensive act. Therefore, in a program that needs to interact with a specific entity multiple times, it is necessary to maintain a connection pool with reusable connections for reuse.
To maintain a connection pool, the most basic requirement is to achieve: thread safe (thread safety), especially in a language like Golang whose feature is goroutine.
type Pool struct { m sync.Mutex //保证多个goroutine访问时候,closed的线程安全 res chan io.Closer //连接存储的chan factory func() (io.Closer,error) //新建连接的工厂方法 closed bool //连接池关闭标志 }
For this simple connection pool, we use chan to store the connections in the pool. The method of creating a new structure is relatively simple:
func New(fn func() (io.Closer, error), size uint) (*Pool, error) { if size <= 0 { return nil, errors.New("size的值太小了。") } return &Pool{ factory: fn, res: make(chan io.Closer, size), }, nil }
You only need to provide the corresponding factory function and the size of the connection pool.
So how do we get resources from it? Because the structure of our internal storage connection is chan, we only need a simple select to ensure thread safety:
//从资源池里获取一个资源 func (p *Pool) Acquire() (io.Closer,error) { select { case r,ok := <-p.res: log.Println("Acquire:共享资源") if !ok { return nil,ErrPoolClosed } return r,nil default: log.Println("Acquire:新生成资源") return p.factory() } }
We first get it from the res chan of the connection pool, if not We will use the factory function we have already prepared to construct the connection. At the same time, when we obtain the connection from res, we use ok to first determine whether the connection pool has been closed. If it has been closed, we will return the already prepared connection closed error.
So since we mentioned closing the connection pool, how do we close the connection pool?
//关闭资源池,释放资源 func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true //关闭通道,不让写入了 close(p.res) //关闭通道里的资源 for r:=range p.res { r.Close() } }
Here we need to perform the p.m.Lock()* locking operation first. This is because we need to read and write the *closed in the structure. You need to set this flag first and then close the res chan so that the Acquire method can no longer obtain new connections. We then perform the Close operation on the connection in the res chan.
There must be a prerequisite for releasing the connection, that is, the connection pool has not been closed. If the connection pool has been closed and a connection is sent to res, panic will be triggered.
func (p *Pool) Release(r io.Closer){ //保证该操作和Close方法的操作是安全的 p.m.Lock() defer p.m.Unlock() //资源池都关闭了,就省这一个没有释放的资源了,释放即可 if p.closed { r.Close() return } select { case p.res <- r: log.Println("资源释放到池子里了") default: log.Println("资源池满了,释放这个资源吧") r.Close() } }
The above is a simple and thread-safe connection pool implementation. What we can see is that although the connection pool has been implemented now, there are still several small shortcomings:
We have no limit on the maximum number of connections. If the thread pool If it is empty, by default we will directly create a new connection and return it. Once the amount of concurrency is high, new connections will continue to be created, which can easily cause
too many connections
errors (especially MySQL).Since we need to ensure the maximum number of obtainable connections, we don’t want the number to be too rigid. We hope that we can maintain a certain number of idle connections idleNum when we are idle, but we also hope that we can limit the maximum number of obtainable connections maxNum.
The first situation is too much concurrency. So what if the amount of concurrency is too little? Now after we create a new connection and return it, we no longer use this connection for a long time. Then this connection is likely to have been established a few hours or more ago. We have no way to guarantee the availability of a connection that has been idle for a long time. It is possible that the connection we obtain next time is an expired connection.
Then we can look at how they solve these problems from the MySQL connection pool library and Redis connection pool library that have been maturely used.
Golang’s connection pool is implemented under the standard library database/sql/sql.go
. When we run:
db, err := sql.Open("mysql", "xxxx")
, a connection pool will be opened. We can take a look at the structure of the returned db:
type DB struct { waitDuration int64 // Total time waited for new connections. mu sync.Mutex // protects following fields freeConn []*driverConn connRequests map[uint64]chan connRequest nextRequest uint64 // Next key to use in connRequests. numOpen int // number of opened and pending open connections // Used to signal the need for new connections // a goroutine running connectionOpener() reads on this chan and // maybeOpenNewConnections sends on the chan (one send per needed connection) // It is closed during db.Close(). The close tells the connectionOpener // goroutine to exit. openerCh chan struct{} closed bool maxIdle int // zero means defaultMaxIdleConns; negative means 0 maxOpen int // <= 0 means unlimited maxLifetime time.Duration // maximum amount of time a connection may be reused cleanerCh chan struct{} waitCount int64 // Total number of connections waited for. maxIdleClosed int64 // Total number of connections closed due to idle. maxLifetimeClosed int64 // Total number of connections closed due to max free limit. }
Some fields that do not require attention for the time being are omitted. We can see that the structure freeConn used to store connections inside the DB connection pool is not the chan we used before, but []*driverConn, a connection slice. At the same time, we can also see that there are related variables such as maxIdle to control the number of idle connections. It is worth noting that the DB initialization function Open function does not create a new database connection. Which function is used to create a new connection? We can go all the way back in the Query method, and we can see this function: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)
. Our method of obtaining connections from the connection pool starts here:
// conn returns a newly-opened or cached *driverConn. func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) { // 先判断db是否已经关闭。 db.mu.Lock() if db.closed { db.mu.Unlock() return nil, errDBClosed } // 注意检测context是否已经被超时等原因被取消。 select { default: case <-ctx.Done(): db.mu.Unlock() return nil, ctx.Err() } lifetime := db.maxLifetime // 这边如果在freeConn这个切片有空闲连接的话,就left pop一个出列。注意的是,这边因为是切片操作,所以需要前面需要加锁且获取后进行解锁操作。同时判断返回的连接是否已经过期。 numFree := len(db.freeConn) if strategy == cachedOrNewConn && numFree > 0 { conn := db.freeConn[0] copy(db.freeConn, db.freeConn[1:]) db.freeConn = db.freeConn[:numFree-1] conn.inUse = true db.mu.Unlock() if conn.expired(lifetime) { conn.Close() return nil, driver.ErrBadConn } // Lock around reading lastErr to ensure the session resetter finished. conn.Lock() err := conn.lastErr conn.Unlock() if err == driver.ErrBadConn { conn.Close() return nil, driver.ErrBadConn } return conn, nil } // 这边就是等候获取连接的重点了。当空闲的连接为空的时候,这边将会新建一个request(的等待连接 的请求)并且开始等待 if db.maxOpen > 0 && db.numOpen >= db.maxOpen { // 下面的动作相当于往connRequests这个map插入自己的号码牌。 // 插入号码牌之后这边就不需要阻塞等待继续往下走逻辑。 req := make(chan connRequest, 1) reqKey := db.nextRequestKeyLocked() db.connRequests[reqKey] = req db.waitCount++ db.mu.Unlock() waitStart := time.Now() // Timeout the connection request with the context. select { case <-ctx.Done(): // context取消操作的时候,记得从connRequests这个map取走自己的号码牌。 db.mu.Lock() delete(db.connRequests, reqKey) db.mu.Unlock() atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) select { default: case ret, ok := <-req: // 这边值得注意了,因为现在已经被context取消了。但是刚刚放了自己的号码牌进去排队里面。意思是说不定已经发了连接了,所以得注意归还! if ok && ret.conn != nil { db.putConn(ret.conn, ret.err, false) } } return nil, ctx.Err() case ret, ok := <-req: // 下面是已经获得连接后的操作了。检测一下获得连接的状况。因为有可能已经过期了等等。 atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart))) if !ok { return nil, errDBClosed } if ret.err == nil && ret.conn.expired(lifetime) { ret.conn.Close() return nil, driver.ErrBadConn } if ret.conn == nil { return nil, ret.err } ret.conn.Lock() err := ret.conn.lastErr ret.conn.Unlock() if err == driver.ErrBadConn { ret.conn.Close() return nil, driver.ErrBadConn } return ret.conn, ret.err } } // 下面就是如果上面说的限制情况不存在,可以创建先连接时候,要做的创建连接操作了。 db.numOpen++ // optimistically db.mu.Unlock() ci, err := db.connector.Connect(ctx) if err != nil { db.mu.Lock() db.numOpen-- // correct for earlier optimism db.maybeOpenNewConnections() db.mu.Unlock() return nil, err } db.mu.Lock() dc := &driverConn{ db: db, createdAt: nowFunc(), ci: ci, inUse: true, } db.addDepLocked(dc, dc) db.mu.Unlock() return dc, nil } 复制代码
简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?
我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool
这个方法。就像注释说的,这个方法主要的目的是:
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
我们主要来看看里面重点那几行:
... // 如果已经超过最大打开数量了,就不需要在回归pool了 if db.maxOpen > 0 && db.numOpen > db.maxOpen { return false } // 这边是重点了,基本来说就是从connRequest这个map里面随机抽一个在排队等着的请求。取出来后发给他。就不用归还池子了。 if c := len(db.connRequests); c > 0 { var req chan connRequest var reqKey uint64 for reqKey, req = range db.connRequests { break } delete(db.connRequests, reqKey) // 删除这个在排队的请求。 if err == nil { dc.inUse = true } // 把连接给这个正在排队的连接。 req <- connRequest{ conn: dc, err: err, } return true } else if err == nil && !db.closed { // 既然没人排队,就看看到了最大连接数目没有。没到就归还给freeConn。 if db.maxIdleConnsLocked() > len(db.freeConn) { db.freeConn = append(db.freeConn, dc) db.startCleanerLocked() return true } db.maxIdleClosed++ } ...
我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。
现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。
值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。
这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。
而它的连接池结构如下
type ConnPool struct { ... queue chan struct{} connsMu sync.Mutex conns []*Conn idleConns []*Conn poolSize int idleConnsLen int stats Stats _closed uint32 // atomic closedCh chan struct{} }
我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queue
, conns
, idleConns
这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:conns
、 idleConns
,那么问题来了:
到底连接存在哪里?
我们先从新建连接池连接开始看:
func NewConnPool(opt *Options) *ConnPool { .... p.checkMinIdleConns() if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 { go p.reaper(opt.IdleCheckFrequency) } .... }
初始化连接池的函数有个和前面两个不同的地方。
checkMinIdleConns
方法,在连接池初始化的时候就会往连接池填满空闲的连接。
go p.reaper(opt.IdleCheckFrequency)
则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。
func (p *ConnPool) Get(ctx context.Context) (*Conn, error) { if p.closed() { return nil, ErrClosed } //这边和前面sql获取连接函数的流程不同。sql是先看看连接池有没有空闲连接,有的话先获取不到再排队。这边是直接先排队获取令牌,排队函数后面会分析。 err := p.waitTurn(ctx) if err != nil { return nil, err } //前面没出error的话,就已经排队轮候到了。接下来就是获取的流程。 for { p.connsMu.Lock() //从空闲连接里面先获取一个空闲连接。 cn := p.popIdle() p.connsMu.Unlock() if cn == nil { // 没有空闲连接时候直接跳出循环。 break } // 判断是否已经过时,是的话close掉了然后继续取出。 if p.isStaleConn(cn) { _ = p.CloseConn(cn) continue } atomic.AddUint32(&p.stats.Hits, 1) return cn, nil } atomic.AddUint32(&p.stats.Misses, 1) // 如果没有空闲连接的话,这边就直接新建连接了。 newcn, err := p.newConn(ctx, true) if err != nil { // 归还令牌。 p.freeTurn() return nil, err } return newcn, nil }
我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()
这句话可以看出,获取连接这个动作,是从 idleConns
里面获取的,而里面的函数也证明了这一点。同时我的理解是:
sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。
redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。
而:
func (p *ConnPool) freeTurn() { <-p.queue } func (p *ConnPool) waitTurn(ctx context.Context) error { ... case p.queue <- struct{}{}: return nil ... }
就是在靠queue这个chan来维持令牌数量。
那么 conns
的作用是什么呢?我们可以来看看新建连接这个函数:
func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { cn, err := p.dialConn(ctx, pooled) if err != nil { return nil, err } p.connsMu.Lock() p.conns = append(p.conns, cn) if pooled { // 如果连接池满了,会在后面移除。 if p.poolSize >= p.opt.PoolSize { cn.pooled = false } else { p.poolSize++ } } p.connsMu.Unlock() return cn, nil }
基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns
里面,而是先放 conns
里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。
func (p *ConnPool) Put(cn *Conn) { if cn.rd.Buffered() > 0 { internal.Logger.Printf("Conn has unread data") p.Remove(cn, BadConnError{}) return } //这就是我们刚刚说的后面了,前面标记过不要入池的,这边就删除了。当然了,里面也会进行freeTurn操作。 if !cn.pooled { p.Remove(cn, nil) return } p.connsMu.Lock() p.idleConns = append(p.idleConns, cn) p.idleConnsLen++ p.connsMu.Unlock() //我们可以看到很明显的这个归还号码牌的动作。 p.freeTurn() }
其实归还的过程,就是从 conns
转移到 idleConns
的过程。当然了,如果新建这个连接时候发现已经 超卖
了,后面归还时候就不转移,直接删除了。
等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:
先 waitTurn
,拿到令牌。而令牌数量是根据pool里面的 queue
决定的。
拿到令牌了,去库房 idleConns
里面拿空闲的连接。没有的话就自己 newConn
一个,并且把他记录到 conns
里面。
is used up, call put
to return: that is, transfer from conns
to idleConns
. When returning it, check whether it was marked oversold at newConn
. If so, it will not be transferred to idleConns
.
I was puzzled for a long time. Since you always need to obtain a token to get a connection, the number of tokens is fixed. Why is it still oversold? After looking through the source code, my answer is:
Although the Get
method to obtain the connection is newConn
, this private method is subject to token control and will not cause oversold. . But this method accepts parameters: pooledbool
. So I guess I am worried that when other people call this method, they will pass true regardless of the situation, causing the poolSize to become larger and larger.
In general, the number of connections in the redis connection pool is controlled by the
queue
chan which I call the token.
As you can see from the above, the most basic guarantee of the connection pool is thread safety when obtaining the connection. But when implementing many additional features, they are implemented from different angles. Still very interesting. But regardless of whether the storage structure is chan or slice, this can be achieved very well. If you use slice to store connections like SQL or redis, you have to maintain a structure to represent the effect of queuing.
Author of this article: Xiao Ling seeks good luck Date: 2020-02-28 Original link: https://juejin.im/post/5e58e3b7f265da57537eb7ed
The above is the detailed content of Several implementation cases about Golang connection pool. For more information, please follow other related articles on the PHP Chinese website!