workerpool.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. package fasthttp
  2. import (
  3. "errors"
  4. "net"
  5. "runtime"
  6. "strings"
  7. "sync"
  8. "time"
  9. )
  10. // workerPool serves incoming connections via a pool of workers
  11. // in FILO order, i.e. the most recently stopped worker will serve the next
  12. // incoming connection.
  13. //
  14. // Such a scheme keeps CPU caches hot (in theory).
  15. type workerPool struct {
  16. // Function for serving server connections.
  17. // It must leave c unclosed.
  18. WorkerFunc ServeHandler
  19. MaxWorkersCount int
  20. LogAllErrors bool
  21. MaxIdleWorkerDuration time.Duration
  22. Logger Logger
  23. lock sync.Mutex
  24. workersCount int
  25. mustStop bool
  26. ready []*workerChan
  27. stopCh chan struct{}
  28. workerChanPool sync.Pool
  29. connState func(net.Conn, ConnState)
  30. }
  31. type workerChan struct {
  32. lastUseTime time.Time
  33. ch chan net.Conn
  34. }
  35. func (wp *workerPool) Start() {
  36. if wp.stopCh != nil {
  37. return
  38. }
  39. wp.stopCh = make(chan struct{})
  40. stopCh := wp.stopCh
  41. wp.workerChanPool.New = func() interface{} {
  42. return &workerChan{
  43. ch: make(chan net.Conn, workerChanCap),
  44. }
  45. }
  46. go func() {
  47. var scratch []*workerChan
  48. for {
  49. wp.clean(&scratch)
  50. select {
  51. case <-stopCh:
  52. return
  53. default:
  54. time.Sleep(wp.getMaxIdleWorkerDuration())
  55. }
  56. }
  57. }()
  58. }
  59. func (wp *workerPool) Stop() {
  60. if wp.stopCh == nil {
  61. return
  62. }
  63. close(wp.stopCh)
  64. wp.stopCh = nil
  65. // Stop all the workers waiting for incoming connections.
  66. // Do not wait for busy workers - they will stop after
  67. // serving the connection and noticing wp.mustStop = true.
  68. wp.lock.Lock()
  69. ready := wp.ready
  70. for i := range ready {
  71. ready[i].ch <- nil
  72. ready[i] = nil
  73. }
  74. wp.ready = ready[:0]
  75. wp.mustStop = true
  76. wp.lock.Unlock()
  77. }
  78. func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
  79. if wp.MaxIdleWorkerDuration <= 0 {
  80. return 10 * time.Second
  81. }
  82. return wp.MaxIdleWorkerDuration
  83. }
  84. func (wp *workerPool) clean(scratch *[]*workerChan) {
  85. maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
  86. // Clean least recently used workers if they didn't serve connections
  87. // for more than maxIdleWorkerDuration.
  88. criticalTime := time.Now().Add(-maxIdleWorkerDuration)
  89. wp.lock.Lock()
  90. ready := wp.ready
  91. n := len(ready)
  92. // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
  93. l, r, mid := 0, n-1, 0
  94. for l <= r {
  95. mid = (l + r) / 2
  96. if criticalTime.After(wp.ready[mid].lastUseTime) {
  97. l = mid + 1
  98. } else {
  99. r = mid - 1
  100. }
  101. }
  102. i := r
  103. if i == -1 {
  104. wp.lock.Unlock()
  105. return
  106. }
  107. *scratch = append((*scratch)[:0], ready[:i+1]...)
  108. m := copy(ready, ready[i+1:])
  109. for i = m; i < n; i++ {
  110. ready[i] = nil
  111. }
  112. wp.ready = ready[:m]
  113. wp.lock.Unlock()
  114. // Notify obsolete workers to stop.
  115. // This notification must be outside the wp.lock, since ch.ch
  116. // may be blocking and may consume a lot of time if many workers
  117. // are located on non-local CPUs.
  118. tmp := *scratch
  119. for i := range tmp {
  120. tmp[i].ch <- nil
  121. tmp[i] = nil
  122. }
  123. }
  124. func (wp *workerPool) Serve(c net.Conn) bool {
  125. ch := wp.getCh()
  126. if ch == nil {
  127. return false
  128. }
  129. ch.ch <- c
  130. return true
  131. }
  132. var workerChanCap = func() int {
  133. // Use blocking workerChan if GOMAXPROCS=1.
  134. // This immediately switches Serve to WorkerFunc, which results
  135. // in higher performance (under go1.5 at least).
  136. if runtime.GOMAXPROCS(0) == 1 {
  137. return 0
  138. }
  139. // Use non-blocking workerChan if GOMAXPROCS>1,
  140. // since otherwise the Serve caller (Acceptor) may lag accepting
  141. // new connections if WorkerFunc is CPU-bound.
  142. return 1
  143. }()
  144. func (wp *workerPool) getCh() *workerChan {
  145. var ch *workerChan
  146. createWorker := false
  147. wp.lock.Lock()
  148. ready := wp.ready
  149. n := len(ready) - 1
  150. if n < 0 {
  151. if wp.workersCount < wp.MaxWorkersCount {
  152. createWorker = true
  153. wp.workersCount++
  154. }
  155. } else {
  156. ch = ready[n]
  157. ready[n] = nil
  158. wp.ready = ready[:n]
  159. }
  160. wp.lock.Unlock()
  161. if ch == nil {
  162. if !createWorker {
  163. return nil
  164. }
  165. vch := wp.workerChanPool.Get()
  166. ch = vch.(*workerChan)
  167. go func() {
  168. wp.workerFunc(ch)
  169. wp.workerChanPool.Put(vch)
  170. }()
  171. }
  172. return ch
  173. }
  174. func (wp *workerPool) release(ch *workerChan) bool {
  175. ch.lastUseTime = time.Now()
  176. wp.lock.Lock()
  177. if wp.mustStop {
  178. wp.lock.Unlock()
  179. return false
  180. }
  181. wp.ready = append(wp.ready, ch)
  182. wp.lock.Unlock()
  183. return true
  184. }
  185. func (wp *workerPool) workerFunc(ch *workerChan) {
  186. var c net.Conn
  187. var err error
  188. for c = range ch.ch {
  189. if c == nil {
  190. break
  191. }
  192. if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
  193. errStr := err.Error()
  194. if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
  195. strings.Contains(errStr, "reset by peer") ||
  196. strings.Contains(errStr, "request headers: small read buffer") ||
  197. strings.Contains(errStr, "unexpected EOF") ||
  198. strings.Contains(errStr, "i/o timeout") ||
  199. errors.Is(err, ErrBadTrailer)) {
  200. wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
  201. }
  202. }
  203. if err == errHijacked {
  204. wp.connState(c, StateHijacked)
  205. } else {
  206. _ = c.Close()
  207. wp.connState(c, StateClosed)
  208. }
  209. c = nil
  210. if !wp.release(ch) {
  211. break
  212. }
  213. }
  214. wp.lock.Lock()
  215. wp.workersCount--
  216. wp.lock.Unlock()
  217. }