123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239 |
- // Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
- // All rights reserved.
- //
- // Use of this source code is governed by a BSD-style license that can be
- // found in the LICENSE file.
- package util
- import (
- "fmt"
- "sync"
- "sync/atomic"
- "time"
- )
- type buffer struct {
- b []byte
- miss int
- }
- // BufferPool is a 'buffer pool'.
- type BufferPool struct {
- pool [6]chan []byte
- size [5]uint32
- sizeMiss [5]uint32
- sizeHalf [5]uint32
- baseline [4]int
- baseline0 int
- mu sync.RWMutex
- closed bool
- closeC chan struct{}
- get uint32
- put uint32
- half uint32
- less uint32
- equal uint32
- greater uint32
- miss uint32
- }
- func (p *BufferPool) poolNum(n int) int {
- if n <= p.baseline0 && n > p.baseline0/2 {
- return 0
- }
- for i, x := range p.baseline {
- if n <= x {
- return i + 1
- }
- }
- return len(p.baseline) + 1
- }
- // Get returns buffer with length of n.
- func (p *BufferPool) Get(n int) []byte {
- if p == nil {
- return make([]byte, n)
- }
- p.mu.RLock()
- defer p.mu.RUnlock()
- if p.closed {
- return make([]byte, n)
- }
- atomic.AddUint32(&p.get, 1)
- poolNum := p.poolNum(n)
- pool := p.pool[poolNum]
- if poolNum == 0 {
- // Fast path.
- select {
- case b := <-pool:
- switch {
- case cap(b) > n:
- if cap(b)-n >= n {
- atomic.AddUint32(&p.half, 1)
- select {
- case pool <- b:
- default:
- }
- return make([]byte, n)
- } else {
- atomic.AddUint32(&p.less, 1)
- return b[:n]
- }
- case cap(b) == n:
- atomic.AddUint32(&p.equal, 1)
- return b[:n]
- default:
- atomic.AddUint32(&p.greater, 1)
- }
- default:
- atomic.AddUint32(&p.miss, 1)
- }
- return make([]byte, n, p.baseline0)
- } else {
- sizePtr := &p.size[poolNum-1]
- select {
- case b := <-pool:
- switch {
- case cap(b) > n:
- if cap(b)-n >= n {
- atomic.AddUint32(&p.half, 1)
- sizeHalfPtr := &p.sizeHalf[poolNum-1]
- if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
- atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
- atomic.StoreUint32(sizeHalfPtr, 0)
- } else {
- select {
- case pool <- b:
- default:
- }
- }
- return make([]byte, n)
- } else {
- atomic.AddUint32(&p.less, 1)
- return b[:n]
- }
- case cap(b) == n:
- atomic.AddUint32(&p.equal, 1)
- return b[:n]
- default:
- atomic.AddUint32(&p.greater, 1)
- if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
- select {
- case pool <- b:
- default:
- }
- }
- }
- default:
- atomic.AddUint32(&p.miss, 1)
- }
- if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
- if size == 0 {
- atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
- } else {
- sizeMissPtr := &p.sizeMiss[poolNum-1]
- if atomic.AddUint32(sizeMissPtr, 1) == 20 {
- atomic.StoreUint32(sizePtr, uint32(n))
- atomic.StoreUint32(sizeMissPtr, 0)
- }
- }
- return make([]byte, n)
- } else {
- return make([]byte, n, size)
- }
- }
- }
- // Put adds given buffer to the pool.
- func (p *BufferPool) Put(b []byte) {
- if p == nil {
- return
- }
- p.mu.RLock()
- defer p.mu.RUnlock()
- if p.closed {
- return
- }
- atomic.AddUint32(&p.put, 1)
- pool := p.pool[p.poolNum(cap(b))]
- select {
- case pool <- b:
- default:
- }
- }
- func (p *BufferPool) Close() {
- if p == nil {
- return
- }
- p.mu.Lock()
- if !p.closed {
- p.closed = true
- p.closeC <- struct{}{}
- }
- p.mu.Unlock()
- }
- func (p *BufferPool) String() string {
- if p == nil {
- return "<nil>"
- }
- return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
- p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
- }
- func (p *BufferPool) drain() {
- ticker := time.NewTicker(2 * time.Second)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- for _, ch := range p.pool {
- select {
- case <-ch:
- default:
- }
- }
- case <-p.closeC:
- close(p.closeC)
- for _, ch := range p.pool {
- close(ch)
- }
- return
- }
- }
- }
- // NewBufferPool creates a new initialized 'buffer pool'.
- func NewBufferPool(baseline int) *BufferPool {
- if baseline <= 0 {
- panic("baseline can't be <= 0")
- }
- p := &BufferPool{
- baseline0: baseline,
- baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
- closeC: make(chan struct{}, 1),
- }
- for i, cap := range []int{2, 2, 4, 4, 2, 1} {
- p.pool[i] = make(chan []byte, cap)
- }
- go p.drain()
- return p
- }
|