下面由golang教學欄位介紹給大家Golang連結池的幾個實作案例,希望對需要的朋友有幫助!

因為TCP的三次握手等等原因,建立一個連線是一件成本比較高的行為。所以在一個需要多次與特定實體互動的程式中,就需要維持一個連結池,裡面有可以重複使用的連結可供重複使用。
而維持一個連接池,最基本的要求就是要做到:thread safe(線程安全),尤其是在Golang這種特性是goroutine的語言中。
實作簡單的連線池
1 2 3 4 5 6 | type Pool struct {
m sync.Mutex
res chan io.Closer
factory func() (io.Closer,error)
closed bool
}
|
登入後複製
這個簡單的連線池,我們利用chan來儲存池裡的連線。而新建結構體的方法也比較簡單:
1 2 3 4 5 6 7 8 9 | func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
if size <= 0 {
return nil, errors.New( "size的值太小了。" )
}
return &Pool{
factory: fn,
res: make(chan io.Closer, size),
}, nil
}
|
登入後複製
只需要提供對應的工廠函數和連接池的大小就可以了。
取得連線
那我們要怎麼從中取得資源呢?因為我們內部儲存連接的結構是chan,所以只需要簡單的select就可以保證線程安全:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | func (p *Pool) Acquire() (io.Closer,error) {
select {
case r,ok := <-p.res:
log.Println( "Acquire:共享资源" )
if !ok {
return nil,ErrPoolClosed
}
return r,nil
default :
log.Println( "Acquire:新生成资源" )
return p.factory()
}
}
|
登入後複製
我們先從連接池的res這個chan裡面獲取,如果沒有的話我們就利用我們早已經準備好的工廠函數進行構造連接。同時我們在從res取得連線的時候利用ok先確定了這個連線池是否已經關閉。如果已經關閉的話我們就回傳早已經準備好的連線已關閉錯誤。
關閉連線池
那麼既然提到關閉連線池,我們是怎麼樣關閉連線池的呢?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | func (p *Pool) Close() {
p.m.Lock()
defer p.m.Unlock()
if p.closed {
return
}
p.closed = true
close(p.res)
for r:=range p.res {
r.Close()
}
}
|
登入後複製
這邊我們需要先進行p.m.Lock()*上鎖操作,這麼做是因為我們需要對結構體裡面的*closed進行讀寫。需要先把這個標誌位元設定好後,關掉res這個chan,讓Acquire方法無法再取得新的連線。我們再對res這個chan裡面的連接進行Close操作。
釋放連線
釋放連線首先得有個前提,就是連線池還沒關閉。如果連接池已經關閉再往res裡面送連接的話就好觸發panic。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | func (p *Pool) Release(r io.Closer){
p.m.Lock()
defer p.m.Unlock()
if p.closed {
r.Close()
return
}
select {
case p.res <- r:
log.Println( "资源释放到池子里了" )
default :
log.Println( "资源池满了,释放这个资源吧" )
r.Close()
}
}
|
登入後複製
以上就是一個簡單且執行緒安全的連線池實作方式了。我們可以看到的是,現在連接池雖然已經實現了,但是還有幾個小缺點:
#我們對連接最大的數量沒有限制,如果線程池空的話都我們預設就直接新建一個連線回傳了。一旦並發量高的話將會不斷新建連接,很容易(尤其是MySQL)造成 too many connections
的報錯發生。
既然我們需要保證最大可取得連線數量,那麼我們就不希望數量定的太死。希望空閒的時候可以維護一定的空閒連線數量idleNum,但是又希望我們能限制最大可取得連線數量maxNum。
第一種情況是併發過多的情況,那麼如果並發量過少呢?現在我們在新建一個連線並且歸還後,我們很長一段時間不再使用這個連線。那麼這個連結很有可能在幾個小時甚至更長之前就已經建立的了。長時間閒置的連結我們並沒有辦法保證它的可用性。便有可能我們下次取得的連線是已經失效的連線。
那麼我們可以從已經成熟使用的MySQL連線池庫和Redis連線池庫中看看,它們是怎麼解決這些問題的。
Golang標準函式庫的Sql連線池
Golang的連線池實作在標準函式庫 database/sql/sql.go
下。當我們運行:
1 | db, err := sql.Open( "mysql" , "xxxx" )
|
登入後複製
的時候,就會開啟一個連線池。我們可以看看返回的db的結構體:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | type DB struct {
waitDuration int64
mu sync.Mutex
freeConn []*driverConn
connRequests map[uint64]chan connRequest
nextRequest uint64
numOpen int
openerCh chan struct{}
closed bool
maxIdle int
maxOpen int
maxLifetime time.Duration
cleanerCh chan struct{}
waitCount int64
maxIdleClosed int64
maxLifetimeClosed int64
}
|
登入後複製
上面省去了一些暫時不需要關注的field。我們可以看的,DB這個連接池內部儲存連接的結構freeConn,並不是我們之前使用的chan,而是[]*driverConn,一個連接切片。同時我們也可以看到,裡面有maxIdle等相關變數來控制空閒連線數量。值得注意的是,DB的初始化函數Open函數並沒有新資料庫連線。而新建連接在哪個函數呢?我們可以在Query方法一路往回找,我們可以看到這個函數: func(db*DB)conn(ctx context.Context,strategy connReuseStrategy)(*driverConn,error)
。而我們從連接池取得連線的方法,就從這裡開始:
获取连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 | func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
db.mu.Lock()
if db.closed {
db.mu.Unlock()
return nil, errDBClosed
}
select {
default :
case <-ctx.Done():
db.mu.Unlock()
return nil, ctx.Err()
}
lifetime := db.maxLifetime
numFree := len(db.freeConn)
if strategy == cachedOrNewConn && numFree > 0 {
conn := db.freeConn[0]
copy (db.freeConn, db.freeConn[1:])
db.freeConn = db.freeConn[:numFree-1]
conn.inUse = true
db.mu.Unlock()
if conn.expired(lifetime) {
conn.Close()
return nil, driver.ErrBadConn
}
conn.Lock()
err := conn.lastErr
conn.Unlock()
if err == driver.ErrBadConn {
conn.Close()
return nil, driver.ErrBadConn
}
return conn, nil
}
if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
req := make(chan connRequest, 1)
reqKey := db.nextRequestKeyLocked()
db.connRequests[reqKey] = req
db.waitCount++
db.mu.Unlock()
waitStart := time.Now()
select {
case <-ctx.Done():
db.mu.Lock()
delete (db.connRequests, reqKey)
db.mu.Unlock()
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
select {
default :
case ret, ok := <-req:
if ok && ret.conn != nil {
db.putConn(ret.conn, ret.err, false)
}
}
return nil, ctx.Err()
case ret, ok := <-req:
atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
if !ok {
return nil, errDBClosed
}
if ret.err == nil && ret.conn.expired(lifetime) {
ret.conn.Close()
return nil, driver.ErrBadConn
}
if ret.conn == nil {
return nil, ret.err
}
ret.conn.Lock()
err := ret.conn.lastErr
ret.conn.Unlock()
if err == driver.ErrBadConn {
ret.conn.Close()
return nil, driver.ErrBadConn
}
return ret.conn, ret.err
}
}
db.numOpen++
db.mu.Unlock()
ci, err := db.connector.Connect(ctx)
if err != nil {
db.mu.Lock()
db.numOpen--
db.maybeOpenNewConnections()
db.mu.Unlock()
return nil, err
}
db.mu.Lock()
dc := &driverConn{
db: db,
createdAt: nowFunc(),
ci: ci,
inUse: true,
}
db.addDepLocked(dc, dc)
db.mu.Unlock()
return dc, nil
}
复制代码
|
登入後複製
简单来说,DB结构体除了用的是slice来存储连接,还加了一个类似排队机制的connRequests来解决获取等待连接的过程。同时在判断连接健康性都有很好的兼顾。那么既然有了排队机制,归还连接的时候是怎么做的呢?
释放连接
我们可以直接找到 func(db*DB)putConnDBLocked(dc*driverConn,err error)bool
这个方法。就像注释说的,这个方法主要的目的是:
Satisfy a connRequest or put the driverConn in the idle pool and return true or return false.
我们主要来看看里面重点那几行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | ...
if db.maxOpen > 0 && db.numOpen > db.maxOpen {
return false
}
if c := len(db.connRequests); c > 0 {
var req chan connRequest
var reqKey uint64
for reqKey, req = range db.connRequests {
break
}
delete (db.connRequests, reqKey)
if err == nil {
dc.inUse = true
}
req <- connRequest{
conn: dc,
err: err,
}
return true
} else if err == nil && !db.closed {
if db.maxIdleConnsLocked() > len(db.freeConn) {
db.freeConn = append(db.freeConn, dc)
db.startCleanerLocked()
return true
}
db.maxIdleClosed++
}
...
|
登入後複製
我们可以看到,当归还连接时候,如果有在排队轮候的请求就不归还给池子直接发给在轮候的人了。
现在基本就解决前面说的小问题了。不会出现连接太多导致无法控制too many connections的情况。也很好了维持了连接池的最小数量。同时也做了相关对于连接健康性的检查操作。
值得注意的是,作为标准库的代码,相关注释和代码都非常完美,真的可以看的神清气爽。
redis Golang实现的Redis客户端
这个Golang实现的Redis客户端,是怎么实现连接池的。这边的思路非常奇妙,还是能学习到不少好思路。当然了,由于代码注释比较少,啃起来第一下还是有点迷糊的。相关代码地址在https://github.com/go-redis/redis/blob/master/internal/pool/pool.go 可以看到。
而它的连接池结构如下
1 2 3 4 5 6 7 8 9 10 11 12 | type ConnPool struct {
...
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
stats Stats
_closed uint32
closedCh chan struct{}
}
|
登入後複製
我们可以看到里面存储连接的结构还是slice。但是我们可以重点看看 queue
, conns
, idleConns
这几个变量,后面会提及到。但是值得注意的是!我们可以看到,这里有两个[]*Conn结构:conns
、 idleConns
,那么问题来了:
到底连接存在哪里?
新建连接池连接
我们先从新建连接池连接开始看:
1 2 3 4 5 6 7 8 | func NewConnPool(opt *Options) *ConnPool {
....
p.checkMinIdleConns()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
....
}
|
登入後複製
初始化连接池的函数有个和前面两个不同的地方。
checkMinIdleConns
方法,在连接池初始化的时候就会往连接池填满空闲的连接。
go p.reaper(opt.IdleCheckFrequency)
则会在初始化连接池的时候就会起一个go程,周期性的淘汰连接池里面要被淘汰的连接。
获取连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 | func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
if p.closed() {
return nil, ErrClosed
}
err := p.waitTurn(ctx)
if err != nil {
return nil, err
}
for {
p.connsMu.Lock()
cn := p.popIdle()
p.connsMu.Unlock()
if cn == nil {
break
}
if p.isStaleConn(cn) {
_ = p.CloseConn(cn)
continue
}
atomic.AddUint32(&p.stats.Hits, 1)
return cn, nil
}
atomic.AddUint32(&p.stats.Misses, 1)
newcn, err := p.newConn(ctx, true)
if err != nil {
p.freeTurn()
return nil, err
}
return newcn, nil
}
|
登入後複製
我们可以试着回答开头那个问题:连接到底存在哪里?答案是从 cn:=p.popIdle()
这句话可以看出,获取连接这个动作,是从 idleConns
里面获取的,而里面的函数也证明了这一点。同时我的理解是:
sql的排队意味着我对连接池申请连接后,把自己的编号告诉连接池。连接那边一看到有空闲了,就叫我的号。我答应了一声,然后连接池就直接给个连接给我。我如果不归还,连接池就一直不叫下一个号。
redis这边的意思是,我去和连接池申请的不是连接而是令牌。我就一直排队等着,连接池给我令牌了,我才去仓库里面找空闲连接或者自己新建一个连接。用完了连接除了归还连接外,还得归还令牌。当然了,如果我自己新建连接出错了,我哪怕拿不到连接回家,我也得把令牌给回连接池,不然连接池的令牌数少了,最大连接数也会变小。
而:
1 2 3 4 5 6 7 8 9 | func (p *ConnPool) freeTurn() {
<-p.queue
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
...
case p.queue <- struct{}{}:
return nil
...
}
|
登入後複製
就是在靠queue这个chan来维持令牌数量。
那么 conns
的作用是什么呢?我们可以来看看新建连接这个函数:
新建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
cn, err := p.dialConn(ctx, pooled)
if err != nil {
return nil, err
}
p.connsMu.Lock()
p.conns = append(p.conns, cn)
if pooled {
if p.poolSize >= p.opt.PoolSize {
cn.pooled = false
} else {
p.poolSize++
}
}
p.connsMu.Unlock()
return cn, nil
}
|
登入後複製
基本逻辑出来了。就是如果新建连接的话,我并不会直接放在 idleConns
里面,而是先放 conns
里面。同时先看池子满了没有。满的话后面归还的时候会标记,后面会删除。那么这个后面会删除,指的是什么时候呢?那就是下面说的归还连接的时候了。
归还连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | func (p *ConnPool) Put(cn *Conn) {
if cn.rd.Buffered() > 0 {
internal.Logger.Printf( "Conn has unread data" )
p.Remove(cn, BadConnError{})
return
}
if !cn.pooled {
p.Remove(cn, nil)
return
}
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
p.connsMu.Unlock()
p.freeTurn()
}
|
登入後複製
其实归还的过程,就是从 conns
转移到 idleConns
的过程。当然了,如果新建这个连接时候发现已经 超卖
了,后面归还时候就不转移,直接删除了。
等等,上面的逻辑似乎有点不对?我们来理一下获取连接流程:
先 waitTurn
,拿到令牌。而令牌数量是根据pool里面的 queue
决定的。
拿到令牌了,去库房 idleConns
里面拿空闲的连接。没有的话就自己 newConn
一个,并且把他记录到 conns
里面。
用完了,就叫 put
歸還:也就是從 conns
轉移到 idleConns
。歸還的時候就檢查在 newConn
時候是不是已經做了超賣標記了。是的話就不轉移到 idleConns
。
我當時已經疑惑了好久,既然始終需要獲得令牌才能連接,那麼令牌數量是定的。為什麼還會超賣呢?翻了一下原始碼,我的答案是:
雖然Get
方法取得連線是newConn
這個私人方法,受到令牌管制導致不會出現超賣。但這個方法接受傳參:pooledbool
。所以我猜是擔心其他人呼叫這個方法時候,不管三七二十一就傳了true,導致poolSize越來越大。
總的來說,redis這個連接池的連接數控制,還是在 queue
這個我稱為令牌的chan進行操作。
總結
上面可以看到,連接池的最基本的保證,就是取得連線時候的執行緒安全性。但在實現許多額外特性時候卻又從不同角度來實現。還是非常有趣的。但不管儲存結構是用chan還是還是slice,都可以很好的實現這一點。如果像sql或redis那樣用slice來儲存連接,就得維護一個結構來表示排隊等候的效果。
本文作者:Xiao淩求個好運氣日期:2020-02-28 原文鏈接:https://juejin.im/post/5e58e3b7f265da57537eb7ed
以上是關於Golang連線池的幾個實作案例的詳細內容。更多資訊請關注PHP中文網其他相關文章!