buffer_pool.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. // Copyright (c) 2014, Suryandaru Triandana <syndtr@gmail.com>
  2. // All rights reserved.
  3. //
  4. // Use of this source code is governed by a BSD-style license that can be
  5. // found in the LICENSE file.
  6. package util
  7. import (
  8. "fmt"
  9. "sync"
  10. "sync/atomic"
  11. "time"
  12. )
  13. type buffer struct {
  14. b []byte
  15. miss int
  16. }
  17. // BufferPool is a 'buffer pool'.
  18. type BufferPool struct {
  19. pool [6]chan []byte
  20. size [5]uint32
  21. sizeMiss [5]uint32
  22. sizeHalf [5]uint32
  23. baseline [4]int
  24. baseline0 int
  25. mu sync.RWMutex
  26. closed bool
  27. closeC chan struct{}
  28. get uint32
  29. put uint32
  30. half uint32
  31. less uint32
  32. equal uint32
  33. greater uint32
  34. miss uint32
  35. }
  36. func (p *BufferPool) poolNum(n int) int {
  37. if n <= p.baseline0 && n > p.baseline0/2 {
  38. return 0
  39. }
  40. for i, x := range p.baseline {
  41. if n <= x {
  42. return i + 1
  43. }
  44. }
  45. return len(p.baseline) + 1
  46. }
  47. // Get returns buffer with length of n.
  48. func (p *BufferPool) Get(n int) []byte {
  49. if p == nil {
  50. return make([]byte, n)
  51. }
  52. p.mu.RLock()
  53. defer p.mu.RUnlock()
  54. if p.closed {
  55. return make([]byte, n)
  56. }
  57. atomic.AddUint32(&p.get, 1)
  58. poolNum := p.poolNum(n)
  59. pool := p.pool[poolNum]
  60. if poolNum == 0 {
  61. // Fast path.
  62. select {
  63. case b := <-pool:
  64. switch {
  65. case cap(b) > n:
  66. if cap(b)-n >= n {
  67. atomic.AddUint32(&p.half, 1)
  68. select {
  69. case pool <- b:
  70. default:
  71. }
  72. return make([]byte, n)
  73. } else {
  74. atomic.AddUint32(&p.less, 1)
  75. return b[:n]
  76. }
  77. case cap(b) == n:
  78. atomic.AddUint32(&p.equal, 1)
  79. return b[:n]
  80. default:
  81. atomic.AddUint32(&p.greater, 1)
  82. }
  83. default:
  84. atomic.AddUint32(&p.miss, 1)
  85. }
  86. return make([]byte, n, p.baseline0)
  87. } else {
  88. sizePtr := &p.size[poolNum-1]
  89. select {
  90. case b := <-pool:
  91. switch {
  92. case cap(b) > n:
  93. if cap(b)-n >= n {
  94. atomic.AddUint32(&p.half, 1)
  95. sizeHalfPtr := &p.sizeHalf[poolNum-1]
  96. if atomic.AddUint32(sizeHalfPtr, 1) == 20 {
  97. atomic.StoreUint32(sizePtr, uint32(cap(b)/2))
  98. atomic.StoreUint32(sizeHalfPtr, 0)
  99. } else {
  100. select {
  101. case pool <- b:
  102. default:
  103. }
  104. }
  105. return make([]byte, n)
  106. } else {
  107. atomic.AddUint32(&p.less, 1)
  108. return b[:n]
  109. }
  110. case cap(b) == n:
  111. atomic.AddUint32(&p.equal, 1)
  112. return b[:n]
  113. default:
  114. atomic.AddUint32(&p.greater, 1)
  115. if uint32(cap(b)) >= atomic.LoadUint32(sizePtr) {
  116. select {
  117. case pool <- b:
  118. default:
  119. }
  120. }
  121. }
  122. default:
  123. atomic.AddUint32(&p.miss, 1)
  124. }
  125. if size := atomic.LoadUint32(sizePtr); uint32(n) > size {
  126. if size == 0 {
  127. atomic.CompareAndSwapUint32(sizePtr, 0, uint32(n))
  128. } else {
  129. sizeMissPtr := &p.sizeMiss[poolNum-1]
  130. if atomic.AddUint32(sizeMissPtr, 1) == 20 {
  131. atomic.StoreUint32(sizePtr, uint32(n))
  132. atomic.StoreUint32(sizeMissPtr, 0)
  133. }
  134. }
  135. return make([]byte, n)
  136. } else {
  137. return make([]byte, n, size)
  138. }
  139. }
  140. }
  141. // Put adds given buffer to the pool.
  142. func (p *BufferPool) Put(b []byte) {
  143. if p == nil {
  144. return
  145. }
  146. p.mu.RLock()
  147. defer p.mu.RUnlock()
  148. if p.closed {
  149. return
  150. }
  151. atomic.AddUint32(&p.put, 1)
  152. pool := p.pool[p.poolNum(cap(b))]
  153. select {
  154. case pool <- b:
  155. default:
  156. }
  157. }
  158. func (p *BufferPool) Close() {
  159. if p == nil {
  160. return
  161. }
  162. p.mu.Lock()
  163. if !p.closed {
  164. p.closed = true
  165. p.closeC <- struct{}{}
  166. }
  167. p.mu.Unlock()
  168. }
  169. func (p *BufferPool) String() string {
  170. if p == nil {
  171. return "<nil>"
  172. }
  173. return fmt.Sprintf("BufferPool{B·%d Z·%v Zm·%v Zh·%v G·%d P·%d H·%d <·%d =·%d >·%d M·%d}",
  174. p.baseline0, p.size, p.sizeMiss, p.sizeHalf, p.get, p.put, p.half, p.less, p.equal, p.greater, p.miss)
  175. }
  176. func (p *BufferPool) drain() {
  177. ticker := time.NewTicker(2 * time.Second)
  178. defer ticker.Stop()
  179. for {
  180. select {
  181. case <-ticker.C:
  182. for _, ch := range p.pool {
  183. select {
  184. case <-ch:
  185. default:
  186. }
  187. }
  188. case <-p.closeC:
  189. close(p.closeC)
  190. for _, ch := range p.pool {
  191. close(ch)
  192. }
  193. return
  194. }
  195. }
  196. }
  197. // NewBufferPool creates a new initialized 'buffer pool'.
  198. func NewBufferPool(baseline int) *BufferPool {
  199. if baseline <= 0 {
  200. panic("baseline can't be <= 0")
  201. }
  202. p := &BufferPool{
  203. baseline0: baseline,
  204. baseline: [...]int{baseline / 4, baseline / 2, baseline * 2, baseline * 4},
  205. closeC: make(chan struct{}, 1),
  206. }
  207. for i, cap := range []int{2, 2, 4, 4, 2, 1} {
  208. p.pool[i] = make(chan []byte, cap)
  209. }
  210. go p.drain()
  211. return p
  212. }