123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package fasthttp
- import (
- "errors"
- "net"
- "runtime"
- "strings"
- "sync"
- "time"
- )
- // workerPool serves incoming connections via a pool of workers
- // in FILO order, i.e. the most recently stopped worker will serve the next
- // incoming connection.
- //
- // Such a scheme keeps CPU caches hot (in theory).
- type workerPool struct {
- // Function for serving server connections.
- // It must leave c unclosed.
- WorkerFunc ServeHandler
- MaxWorkersCount int
- LogAllErrors bool
- MaxIdleWorkerDuration time.Duration
- Logger Logger
- lock sync.Mutex
- workersCount int
- mustStop bool
- ready []*workerChan
- stopCh chan struct{}
- workerChanPool sync.Pool
- connState func(net.Conn, ConnState)
- }
- type workerChan struct {
- lastUseTime time.Time
- ch chan net.Conn
- }
- func (wp *workerPool) Start() {
- if wp.stopCh != nil {
- return
- }
- wp.stopCh = make(chan struct{})
- stopCh := wp.stopCh
- wp.workerChanPool.New = func() any {
- return &workerChan{
- ch: make(chan net.Conn, workerChanCap),
- }
- }
- go func() {
- var scratch []*workerChan
- for {
- wp.clean(&scratch)
- select {
- case <-stopCh:
- return
- default:
- time.Sleep(wp.getMaxIdleWorkerDuration())
- }
- }
- }()
- }
- func (wp *workerPool) Stop() {
- if wp.stopCh == nil {
- return
- }
- close(wp.stopCh)
- wp.stopCh = nil
- // Stop all the workers waiting for incoming connections.
- // Do not wait for busy workers - they will stop after
- // serving the connection and noticing wp.mustStop = true.
- wp.lock.Lock()
- ready := wp.ready
- for i := range ready {
- ready[i].ch <- nil
- ready[i] = nil
- }
- wp.ready = ready[:0]
- wp.mustStop = true
- wp.lock.Unlock()
- }
- func (wp *workerPool) getMaxIdleWorkerDuration() time.Duration {
- if wp.MaxIdleWorkerDuration <= 0 {
- return 10 * time.Second
- }
- return wp.MaxIdleWorkerDuration
- }
- func (wp *workerPool) clean(scratch *[]*workerChan) {
- maxIdleWorkerDuration := wp.getMaxIdleWorkerDuration()
- // Clean least recently used workers if they didn't serve connections
- // for more than maxIdleWorkerDuration.
- criticalTime := time.Now().Add(-maxIdleWorkerDuration)
- wp.lock.Lock()
- ready := wp.ready
- n := len(ready)
- // Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
- l, r := 0, n-1
- for l <= r {
- mid := (l + r) / 2
- if criticalTime.After(wp.ready[mid].lastUseTime) {
- l = mid + 1
- } else {
- r = mid - 1
- }
- }
- i := r
- if i == -1 {
- wp.lock.Unlock()
- return
- }
- *scratch = append((*scratch)[:0], ready[:i+1]...)
- m := copy(ready, ready[i+1:])
- for i = m; i < n; i++ {
- ready[i] = nil
- }
- wp.ready = ready[:m]
- wp.lock.Unlock()
- // Notify obsolete workers to stop.
- // This notification must be outside the wp.lock, since ch.ch
- // may be blocking and may consume a lot of time if many workers
- // are located on non-local CPUs.
- tmp := *scratch
- for i := range tmp {
- tmp[i].ch <- nil
- tmp[i] = nil
- }
- }
- func (wp *workerPool) Serve(c net.Conn) bool {
- ch := wp.getCh()
- if ch == nil {
- return false
- }
- ch.ch <- c
- return true
- }
- var workerChanCap = func() int {
- // Use blocking workerChan if GOMAXPROCS=1.
- // This immediately switches Serve to WorkerFunc, which results
- // in higher performance (under go1.5 at least).
- if runtime.GOMAXPROCS(0) == 1 {
- return 0
- }
- // Use non-blocking workerChan if GOMAXPROCS>1,
- // since otherwise the Serve caller (Acceptor) may lag accepting
- // new connections if WorkerFunc is CPU-bound.
- return 1
- }()
- func (wp *workerPool) getCh() *workerChan {
- var ch *workerChan
- createWorker := false
- wp.lock.Lock()
- ready := wp.ready
- n := len(ready) - 1
- if n < 0 {
- if wp.workersCount < wp.MaxWorkersCount {
- createWorker = true
- wp.workersCount++
- }
- } else {
- ch = ready[n]
- ready[n] = nil
- wp.ready = ready[:n]
- }
- wp.lock.Unlock()
- if ch == nil {
- if !createWorker {
- return nil
- }
- vch := wp.workerChanPool.Get()
- ch = vch.(*workerChan)
- go func() {
- wp.workerFunc(ch)
- wp.workerChanPool.Put(vch)
- }()
- }
- return ch
- }
- func (wp *workerPool) release(ch *workerChan) bool {
- ch.lastUseTime = time.Now()
- wp.lock.Lock()
- if wp.mustStop {
- wp.lock.Unlock()
- return false
- }
- wp.ready = append(wp.ready, ch)
- wp.lock.Unlock()
- return true
- }
- func (wp *workerPool) workerFunc(ch *workerChan) {
- var c net.Conn
- var err error
- for c = range ch.ch {
- if c == nil {
- break
- }
- if err = wp.WorkerFunc(c); err != nil && err != errHijacked {
- errStr := err.Error()
- if wp.LogAllErrors || !(strings.Contains(errStr, "broken pipe") ||
- strings.Contains(errStr, "reset by peer") ||
- strings.Contains(errStr, "request headers: small read buffer") ||
- strings.Contains(errStr, "unexpected EOF") ||
- strings.Contains(errStr, "i/o timeout") ||
- errors.Is(err, ErrBadTrailer)) {
- wp.Logger.Printf("error when serving connection %q<->%q: %v", c.LocalAddr(), c.RemoteAddr(), err)
- }
- }
- if err == errHijacked {
- wp.connState(c, StateHijacked)
- } else {
- _ = c.Close()
- wp.connState(c, StateClosed)
- }
- if !wp.release(ch) {
- break
- }
- }
- wp.lock.Lock()
- wp.workersCount--
- wp.lock.Unlock()
- }
|