lbclient.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. package fasthttp
  2. import (
  3. "sync"
  4. "sync/atomic"
  5. "time"
  6. )
  7. // BalancingClient is the interface for clients, which may be passed
  8. // to LBClient.Clients.
  9. type BalancingClient interface {
  10. DoDeadline(req *Request, resp *Response, deadline time.Time) error
  11. PendingRequests() int
  12. }
  13. // LBClient balances requests among available LBClient.Clients.
  14. //
  15. // It has the following features:
  16. //
  17. // - Balances load among available clients using 'least loaded' + 'least total'
  18. // hybrid technique.
  19. // - Dynamically decreases load on unhealthy clients.
  20. //
  21. // It is forbidden copying LBClient instances. Create new instances instead.
  22. //
  23. // It is safe calling LBClient methods from concurrently running goroutines.
  24. type LBClient struct {
  25. noCopy noCopy
  26. // Clients must contain non-zero clients list.
  27. // Incoming requests are balanced among these clients.
  28. Clients []BalancingClient
  29. // HealthCheck is a callback called after each request.
  30. //
  31. // The request, response and the error returned by the client
  32. // is passed to HealthCheck, so the callback may determine whether
  33. // the client is healthy.
  34. //
  35. // Load on the current client is decreased if HealthCheck returns false.
  36. //
  37. // By default HealthCheck returns false if err != nil.
  38. HealthCheck func(req *Request, resp *Response, err error) bool
  39. // Timeout is the request timeout used when calling LBClient.Do.
  40. //
  41. // DefaultLBClientTimeout is used by default.
  42. Timeout time.Duration
  43. cs []*lbClient
  44. once sync.Once
  45. mu sync.RWMutex
  46. }
  47. // DefaultLBClientTimeout is the default request timeout used by LBClient
  48. // when calling LBClient.Do.
  49. //
  50. // The timeout may be overridden via LBClient.Timeout.
  51. const DefaultLBClientTimeout = time.Second
  52. // DoDeadline calls DoDeadline on the least loaded client.
  53. func (cc *LBClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  54. return cc.get().DoDeadline(req, resp, deadline)
  55. }
  56. // DoTimeout calculates deadline and calls DoDeadline on the least loaded client.
  57. func (cc *LBClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  58. deadline := time.Now().Add(timeout)
  59. return cc.get().DoDeadline(req, resp, deadline)
  60. }
  61. // Do calculates timeout using LBClient.Timeout and calls DoTimeout
  62. // on the least loaded client.
  63. func (cc *LBClient) Do(req *Request, resp *Response) error {
  64. timeout := cc.Timeout
  65. if timeout <= 0 {
  66. timeout = DefaultLBClientTimeout
  67. }
  68. return cc.DoTimeout(req, resp, timeout)
  69. }
  70. func (cc *LBClient) init() {
  71. cc.mu.Lock()
  72. defer cc.mu.Unlock()
  73. if len(cc.Clients) == 0 {
  74. // developer sanity-check
  75. panic("BUG: LBClient.Clients cannot be empty")
  76. }
  77. for _, c := range cc.Clients {
  78. cc.cs = append(cc.cs, &lbClient{
  79. c: c,
  80. healthCheck: cc.HealthCheck,
  81. })
  82. }
  83. }
  84. // AddClient adds a new client to the balanced clients and
  85. // returns the new total number of clients.
  86. func (cc *LBClient) AddClient(c BalancingClient) int {
  87. cc.mu.Lock()
  88. cc.cs = append(cc.cs, &lbClient{
  89. c: c,
  90. healthCheck: cc.HealthCheck,
  91. })
  92. cc.mu.Unlock()
  93. return len(cc.cs)
  94. }
  95. // RemoveClients removes clients using the provided callback.
  96. // If rc returns true, the passed client will be removed.
  97. // Returns the new total number of clients.
  98. func (cc *LBClient) RemoveClients(rc func(BalancingClient) bool) int {
  99. cc.mu.Lock()
  100. n := 0
  101. for idx, cs := range cc.cs {
  102. cc.cs[idx] = nil
  103. if rc(cs.c) {
  104. continue
  105. }
  106. cc.cs[n] = cs
  107. n++
  108. }
  109. cc.cs = cc.cs[:n]
  110. cc.mu.Unlock()
  111. return len(cc.cs)
  112. }
  113. func (cc *LBClient) get() *lbClient {
  114. cc.once.Do(cc.init)
  115. cc.mu.RLock()
  116. cs := cc.cs
  117. minC := cs[0]
  118. minN := minC.PendingRequests()
  119. minT := atomic.LoadUint64(&minC.total)
  120. for _, c := range cs[1:] {
  121. n := c.PendingRequests()
  122. t := atomic.LoadUint64(&c.total) /* #nosec G601 */
  123. if n < minN || (n == minN && t < minT) {
  124. minC = c
  125. minN = n
  126. minT = t
  127. }
  128. }
  129. cc.mu.RUnlock()
  130. return minC
  131. }
  132. type lbClient struct {
  133. c BalancingClient
  134. healthCheck func(req *Request, resp *Response, err error) bool
  135. penalty uint32
  136. // total amount of requests handled.
  137. total uint64
  138. }
  139. func (c *lbClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  140. err := c.c.DoDeadline(req, resp, deadline)
  141. if !c.isHealthy(req, resp, err) && c.incPenalty() {
  142. // Penalize the client returning error, so the next requests
  143. // are routed to another clients.
  144. time.AfterFunc(penaltyDuration, c.decPenalty)
  145. } else {
  146. atomic.AddUint64(&c.total, 1)
  147. }
  148. return err
  149. }
  150. func (c *lbClient) PendingRequests() int {
  151. n := c.c.PendingRequests()
  152. m := atomic.LoadUint32(&c.penalty)
  153. return n + int(m)
  154. }
  155. func (c *lbClient) isHealthy(req *Request, resp *Response, err error) bool {
  156. if c.healthCheck == nil {
  157. return err == nil
  158. }
  159. return c.healthCheck(req, resp, err)
  160. }
  161. func (c *lbClient) incPenalty() bool {
  162. m := atomic.AddUint32(&c.penalty, 1)
  163. if m > maxPenalty {
  164. c.decPenalty()
  165. return false
  166. }
  167. return true
  168. }
  169. func (c *lbClient) decPenalty() {
  170. atomic.AddUint32(&c.penalty, ^uint32(0))
  171. }
  172. const (
  173. maxPenalty = 300
  174. penaltyDuration = 3 * time.Second
  175. )