数据库连接池的基本原理是在内部对象池中维护一定数量的数据库连接,并对外暴露数据库连接获取和返回方法。(外部使用者可通过getConnection 方法获取连接,使用完毕后再通过releaseConnection方法将连接返回,注意此时连接并没有关闭,而是由连接池管理器回收,并为下一次使用做好准备。)
- type DB struct {
- // Atomic access only. At top of struct to prevent mis-alignment
- // on 32-bit platforms. Of type time.Duration.
- waitDuration int64 //新连接等待时间
- connector driver.Connector
- // numClosed is an atomic counter which represents a total number of
- // closed connections. Stmt.openStmt checks it before cleaning closed
- // connections in Stmt.css.
- numClosed uint64
- mu sync.Mutex //锁,操作DB成员时用到
- freeConn []*driverConn //空闲连接
- connRequests map[uint64]chan connRequest
- nextRequest uint64 // Next key to use in connRequests.
- numOpen int //已建立连接或等待建立连接数
- // Used to signal the need for new connections
- // a goroutine running connectionOpener() reads on this chan and
- // maybeOpenNewConnections sends on the chan (one send per needed connection)
- // It is closed during db.Close(). The close tells the connectionOpener
- // goroutine to exit.
- openerCh chan struct{}
- resetterCh chan *driverConn
- closed bool
- dep map[finalCloser]depSet
- lastPut map[*driverConn]string // stacktrace of last conn's put; debug only
- maxIdle int // 最大空闲连接数
- maxOpen int // 已建立连接或等待建立连接数
- maxLifetime time.Duration // 连接最大存活期,超过这个时间将不再复用
- cleanerCh chan struct{}
- waitCount int64 // Total number of connections waited for.
- maxIdleClosed int64 // Total number of connections closed due to idle.
- maxLifetimeClosed int64 //连接最长存活期,超过这个时间连接将不会复用
- stop func() // stop cancels the connection opener and the session resetter.
- }

- func (db *DB) conn(ctx context.Context, strategy connReuseStrategy) (*driverConn, error) {
- db.mu.Lock()
- if db.closed {
- db.mu.Unlock()
- return nil, errDBClosed
- }
- // Check if the context is expired.
- select {
- default:
- case <-ctx.Done():
- db.mu.Unlock()
- return nil, ctx.Err()
- }
- lifetime := db.maxLifetime
- //从freeCoon取一个空闲连接
- numFree := len(db.freeConn)
- if strategy == cachedOrNewConn && numFree > 0 {
- conn := db.freeConn[0]
- copy(db.freeConn, db.freeConn[1:])
- db.freeConn = db.freeConn[:numFree-1]
- conn.inUse = true
- db.mu.Unlock()
- if conn.expired(lifetime) {
- conn.Close()
- return nil, driver.ErrBadConn
- }
- // Lock around reading lastErr to ensure the session resetter finished.
- conn.Lock()
- err := conn.lastErr
- conn.Unlock()
- if err == driver.ErrBadConn {
- conn.Close()
- return nil, driver.ErrBadConn
- }
- return conn, nil
- }
- // 如果没有空闲连接,而且当前建立的连接数已达到最大限制则加入connRequest队列
- // 并阻塞在这里,直到其他协程将占用的连接释放或connectionOpenner创建
- if db.maxOpen > 0 && db.numOpen >= db.maxOpen {
- // Make the connRequest channel. It's buffered so that the
- // connectionOpener doesn't block while waiting for the req to be read.
- req := make(chan connRequest, 1)
- reqKey := db.nextRequestKeyLocked()
- db.connRequests[reqKey] = req
- db.waitCount++
- db.mu.Unlock()
- waitStart := time.Now()
- // Timeout the connection request with the context.
- select {
- case <-ctx.Done():
- // Remove the connection request and ensure no value has been sent
- // on it after removing.
- db.mu.Lock()
- delete(db.connRequests, reqKey)
- db.mu.Unlock()
- atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
- select {
- default:
- case ret, ok := <-req:
- if ok && ret.conn != nil {
- db.putConn(ret.conn, ret.err, false)
- }
- }
- return nil, ctx.Err()
- case ret, ok := <-req://阻塞
- atomic.AddInt64(&db.waitDuration, int64(time.Since(waitStart)))
- if !ok {
- return nil, errDBClosed
- }
- if ret.err == nil && ret.conn.expired(lifetime) {//连接过期
- ret.conn.Close()
- return nil, driver.ErrBadConn
- }
- if ret.conn == nil {
- return nil, ret.err
- }
- // Lock around reading lastErr to ensure the session resetter finished.
- ret.conn.Lock()
- err := ret.conn.lastErr
- ret.conn.Unlock()
- if err == driver.ErrBadConn {
- ret.conn.Close()
- return nil, driver.ErrBadConn
- }
- return ret.conn, ret.err
- }
- }
- db.numOpen++ // optimistically
- db.mu.Unlock()
- ci, err := db.connector.Connect(ctx)
- if err != nil {
- db.mu.Lock()
- db.numOpen-- // correct for earlier optimism
- db.maybeOpenNewConnections()//通知coonectionOpener协程尝试重新建立连接
- db.mu.Unlock()
- return nil, err
- }
- db.mu.Lock()
- dc := &driverConn{
- db: db,
- createdAt: nowFunc(),
- ci: ci,
- inUse: true,
- }
- db.addDepLocked(dc, dc)
- db.mu.Unlock()
- return dc, nil
- }

- func (db *DB) putConn(dc *driverConn, err error, resetSession bool) {
- db.mu.Lock()
- if !dc.inUse {
- if debugGetPut {
- fmt.Printf("putConn(%v) DUPLICATE was: %s\n\nPREVIOUS was: %s", dc, stack(), db.lastPut[dc])
- }
- panic("sql: connection returned that was never out")
- }
- if debugGetPut {
- db.lastPut[dc] = stack()
- }
- dc.inUse = false
- for _, fn := range dc.onPut {
- fn()
- }
- dc.onPut = nil
- //连接失效则不再放入到连接池中
- if err == driver.ErrBadConn {
- // Don't reuse bad connections.
- // Since the conn is considered bad and is being discarded, treat it
- // as closed. Don't decrement the open count here, finalClose will
- // take care of that.
- db.maybeOpenNewConnections()
- db.mu.Unlock()
- dc.Close()
- return
- }
- if putConnHook != nil {
- putConnHook(db, dc)
- }
- if db.closed {
- // Connections do not need to be reset if they will be closed.
- // Prevents writing to resetterCh after the DB has closed.
- resetSession = false
- }
- if resetSession {
- if _, resetSession = dc.ci.(driver.SessionResetter); resetSession {
- // Lock the driverConn here so it isn't released until
- // the connection is reset.
- // The lock must be taken before the connection is put into
- // the pool to prevent it from being taken out before it is reset.
- dc.Lock()
- }
- }
- //归还连接
- added := db.putConnDBLocked(dc, nil)
- db.mu.Unlock()
- if !added {
- if resetSession {
- dc.Unlock()
- }
- dc.Close()
- return
- }
- if !resetSession {
- return
- }
- select {
- default:
- // If the resetterCh is blocking then mark the connection
- // as bad and continue on.
- dc.lastErr = driver.ErrBadConn
- dc.Unlock()
- case db.resetterCh <- dc:
- }
- }

- func (db *DB) putConnDBLocked(dc *driverConn, err error) bool {
- if db.closed {
- return false
- }
- if db.maxOpen > 0 && db.numOpen > db.maxOpen {
- return false
- }
- //如果有等待的连接则直接将连接发给他们,没有等待的连接则放到空闲连接里面
- if c := len(db.connRequests); c > 0 {
- var req chan connRequest
- var reqKey uint64
- for reqKey, req = range db.connRequests {
- break
- }
- delete(db.connRequests, reqKey) // Remove from pending requests.
- if err == nil {
- dc.inUse = true
- }
- req <- connRequest{
- conn: dc,
- err: err,
- }
- return true
- } else if err == nil && !db.closed {
- if db.maxIdleConnsLocked() > len(db.freeConn) {
- db.freeConn = append(db.freeConn, dc)
- db.startCleanerLocked()
- return true
- }
- db.maxIdleClosed++
- }
- return false
- }

