client.go 80 KB


  1. package fasthttp
  2. import (
  3. "bufio"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. // Do performs the given http request and fills the given http response.
  15. //
  16. // Request must contain at least non-zero RequestURI with full url (including
  17. // scheme and host) or non-zero Host header + RequestURI.
  18. //
  19. // Client determines the server to be requested in the following order:
  20. //
  21. // - from RequestURI if it contains full url with scheme and host;
  22. // - from Host header otherwise.
  23. //
  24. // The function doesn't follow redirects. Use Get* for following redirects.
  25. //
  26. // Response is ignored if resp is nil.
  27. //
  28. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  29. // to the requested host are busy.
  30. //
  31. // It is recommended obtaining req and resp via AcquireRequest
  32. // and AcquireResponse in performance-critical code.
  33. func Do(req *Request, resp *Response) error {
  34. return defaultClient.Do(req, resp)
  35. }
  36. // DoTimeout performs the given request and waits for response during
  37. // the given timeout duration.
  38. //
  39. // Request must contain at least non-zero RequestURI with full url (including
  40. // scheme and host) or non-zero Host header + RequestURI.
  41. //
  42. // Client determines the server to be requested in the following order:
  43. //
  44. // - from RequestURI if it contains full url with scheme and host;
  45. // - from Host header otherwise.
  46. //
  47. // The function doesn't follow redirects. Use Get* for following redirects.
  48. //
  49. // Response is ignored if resp is nil.
  50. //
  51. // ErrTimeout is returned if the response wasn't returned during
  52. // the given timeout.
  53. //
  54. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  55. // to the requested host are busy.
  56. //
  57. // It is recommended obtaining req and resp via AcquireRequest
  58. // and AcquireResponse in performance-critical code.
  59. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  60. return defaultClient.DoTimeout(req, resp, timeout)
  61. }
  62. // DoDeadline performs the given request and waits for response until
  63. // the given deadline.
  64. //
  65. // Request must contain at least non-zero RequestURI with full url (including
  66. // scheme and host) or non-zero Host header + RequestURI.
  67. //
  68. // Client determines the server to be requested in the following order:
  69. //
  70. // - from RequestURI if it contains full url with scheme and host;
  71. // - from Host header otherwise.
  72. //
  73. // The function doesn't follow redirects. Use Get* for following redirects.
  74. //
  75. // Response is ignored if resp is nil.
  76. //
  77. // ErrTimeout is returned if the response wasn't returned until
  78. // the given deadline.
  79. //
  80. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  81. // to the requested host are busy.
  82. //
  83. // It is recommended obtaining req and resp via AcquireRequest
  84. // and AcquireResponse in performance-critical code.
  85. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  86. return defaultClient.DoDeadline(req, resp, deadline)
  87. }
  88. // DoRedirects performs the given http request and fills the given http response,
  89. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  90. // maxRedirectsCount, ErrTooManyRedirects is returned.
  91. //
  92. // Request must contain at least non-zero RequestURI with full url (including
  93. // scheme and host) or non-zero Host header + RequestURI.
  94. //
  95. // Client determines the server to be requested in the following order:
  96. //
  97. // - from RequestURI if it contains full url with scheme and host;
  98. // - from Host header otherwise.
  99. //
  100. // Response is ignored if resp is nil.
  101. //
  102. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  103. // to the requested host are busy.
  104. //
  105. // It is recommended obtaining req and resp via AcquireRequest
  106. // and AcquireResponse in performance-critical code.
  107. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  108. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  109. return err
  110. }
  111. // Get returns the status code and body of url.
  112. //
  113. // The contents of dst will be replaced by the body and returned, if the dst
  114. // is too small a new slice will be allocated.
  115. //
  116. // The function follows redirects. Use Do* for manually handling redirects.
  117. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  118. return defaultClient.Get(dst, url)
  119. }
  120. // GetTimeout returns the status code and body of url.
  121. //
  122. // The contents of dst will be replaced by the body and returned, if the dst
  123. // is too small a new slice will be allocated.
  124. //
  125. // The function follows redirects. Use Do* for manually handling redirects.
  126. //
  127. // ErrTimeout error is returned if url contents couldn't be fetched
  128. // during the given timeout.
  129. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  130. return defaultClient.GetTimeout(dst, url, timeout)
  131. }
  132. // GetDeadline returns the status code and body of url.
  133. //
  134. // The contents of dst will be replaced by the body and returned, if the dst
  135. // is too small a new slice will be allocated.
  136. //
  137. // The function follows redirects. Use Do* for manually handling redirects.
  138. //
  139. // ErrTimeout error is returned if url contents couldn't be fetched
  140. // until the given deadline.
  141. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  142. return defaultClient.GetDeadline(dst, url, deadline)
  143. }
  144. // Post sends POST request to the given url with the given POST arguments.
  145. //
  146. // The contents of dst will be replaced by the body and returned, if the dst
  147. // is too small a new slice will be allocated.
  148. //
  149. // The function follows redirects. Use Do* for manually handling redirects.
  150. //
  151. // Empty POST body is sent if postArgs is nil.
  152. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  153. return defaultClient.Post(dst, url, postArgs)
  154. }
  155. var defaultClient Client
  156. // Client implements http client.
  157. //
  158. // Copying Client by value is prohibited. Create new instance instead.
  159. //
  160. // It is safe calling Client methods from concurrently running goroutines.
  161. //
  162. // The fields of a Client should not be changed while it is in use.
  163. type Client struct {
  164. noCopy noCopy
  165. // Client name. Used in User-Agent request header.
  166. //
  167. // Default client name is used if not set.
  168. Name string
  169. // NoDefaultUserAgentHeader when set to true, causes the default
  170. // User-Agent header to be excluded from the Request.
  171. NoDefaultUserAgentHeader bool
  172. // Callback for establishing new connections to hosts.
  173. //
  174. // Default DialTimeout is used if not set.
  175. DialTimeout DialFuncWithTimeout
  176. // Callback for establishing new connections to hosts.
  177. //
  178. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  179. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  180. //
  181. // If not set, DialTimeout is used.
  182. Dial DialFunc
  183. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  184. //
  185. // This option is used only if default TCP dialer is used,
  186. // i.e. if Dial is blank.
  187. //
  188. // By default client connects only to ipv4 addresses,
  189. // since unfortunately ipv6 remains broken in many networks worldwide :)
  190. DialDualStack bool
  191. // TLS config for https connections.
  192. //
  193. // Default TLS config is used if not set.
  194. TLSConfig *tls.Config
  195. // Maximum number of connections per each host which may be established.
  196. //
  197. // DefaultMaxConnsPerHost is used if not set.
  198. MaxConnsPerHost int
  199. // Idle keep-alive connections are closed after this duration.
  200. //
  201. // By default idle connections are closed
  202. // after DefaultMaxIdleConnDuration.
  203. MaxIdleConnDuration time.Duration
  204. // Keep-alive connections are closed after this duration.
  205. //
  206. // By default connection duration is unlimited.
  207. MaxConnDuration time.Duration
  208. // Maximum number of attempts for idempotent calls.
  209. //
  210. // DefaultMaxIdemponentCallAttempts is used if not set.
  211. MaxIdemponentCallAttempts int
  212. // Per-connection buffer size for responses' reading.
  213. // This also limits the maximum header size.
  214. //
  215. // Default buffer size is used if 0.
  216. ReadBufferSize int
  217. // Per-connection buffer size for requests' writing.
  218. //
  219. // Default buffer size is used if 0.
  220. WriteBufferSize int
  221. // Maximum duration for full response reading (including body).
  222. //
  223. // By default response read timeout is unlimited.
  224. ReadTimeout time.Duration
  225. // Maximum duration for full request writing (including body).
  226. //
  227. // By default request write timeout is unlimited.
  228. WriteTimeout time.Duration
  229. // Maximum response body size.
  230. //
  231. // The client returns ErrBodyTooLarge if this limit is greater than 0
  232. // and response body is greater than the limit.
  233. //
  234. // By default response body size is unlimited.
  235. MaxResponseBodySize int
  236. // Header names are passed as-is without normalization
  237. // if this option is set.
  238. //
  239. // Disabled header names' normalization may be useful only for proxying
  240. // responses to other clients expecting case-sensitive
  241. // header names. See https://github.com/valyala/fasthttp/issues/57
  242. // for details.
  243. //
  244. // By default request and response header names are normalized, i.e.
  245. // The first letter and the first letters following dashes
  246. // are uppercased, while all the other letters are lowercased.
  247. // Examples:
  248. //
  249. // * HOST -> Host
  250. // * content-type -> Content-Type
  251. // * cONTENT-lenGTH -> Content-Length
  252. DisableHeaderNamesNormalizing bool
  253. // Path values are sent as-is without normalization.
  254. //
  255. // Disabled path normalization may be useful for proxying incoming requests
  256. // to servers that are expecting paths to be forwarded as-is.
  257. //
  258. // By default path values are normalized, i.e.
  259. // extra slashes are removed, special characters are encoded.
  260. DisablePathNormalizing bool
  261. // Maximum duration for waiting for a free connection.
  262. //
  263. // By default will not waiting, return ErrNoFreeConns immediately.
  264. MaxConnWaitTimeout time.Duration
  265. // RetryIf controls whether a retry should be attempted after an error.
  266. //
  267. // By default will use isIdempotent function.
  268. RetryIf RetryIfFunc
  269. // Connection pool strategy. Can be either LIFO or FIFO (default).
  270. ConnPoolStrategy ConnPoolStrategyType
  271. // StreamResponseBody enables response body streaming.
  272. StreamResponseBody bool
  273. // ConfigureClient configures the fasthttp.HostClient.
  274. ConfigureClient func(hc *HostClient) error
  275. mLock sync.RWMutex
  276. mOnce sync.Once
  277. m map[string]*HostClient
  278. ms map[string]*HostClient
  279. readerPool sync.Pool
  280. writerPool sync.Pool
  281. }
  282. // Get returns the status code and body of url.
  283. //
  284. // The contents of dst will be replaced by the body and returned, if the dst
  285. // is too small a new slice will be allocated.
  286. //
  287. // The function follows redirects. Use Do* for manually handling redirects.
  288. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  289. return clientGetURL(dst, url, c)
  290. }
  291. // GetTimeout returns the status code and body of url.
  292. //
  293. // The contents of dst will be replaced by the body and returned, if the dst
  294. // is too small a new slice will be allocated.
  295. //
  296. // The function follows redirects. Use Do* for manually handling redirects.
  297. //
  298. // ErrTimeout error is returned if url contents couldn't be fetched
  299. // during the given timeout.
  300. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  301. return clientGetURLTimeout(dst, url, timeout, c)
  302. }
  303. // GetDeadline returns the status code and body of url.
  304. //
  305. // The contents of dst will be replaced by the body and returned, if the dst
  306. // is too small a new slice will be allocated.
  307. //
  308. // The function follows redirects. Use Do* for manually handling redirects.
  309. //
  310. // ErrTimeout error is returned if url contents couldn't be fetched
  311. // until the given deadline.
  312. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  313. return clientGetURLDeadline(dst, url, deadline, c)
  314. }
  315. // Post sends POST request to the given url with the given POST arguments.
  316. //
  317. // The contents of dst will be replaced by the body and returned, if the dst
  318. // is too small a new slice will be allocated.
  319. //
  320. // The function follows redirects. Use Do* for manually handling redirects.
  321. //
  322. // Empty POST body is sent if postArgs is nil.
  323. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  324. return clientPostURL(dst, url, postArgs, c)
  325. }
  326. // DoTimeout performs the given request and waits for response during
  327. // the given timeout duration.
  328. //
  329. // Request must contain at least non-zero RequestURI with full url (including
  330. // scheme and host) or non-zero Host header + RequestURI.
  331. //
  332. // Client determines the server to be requested in the following order:
  333. //
  334. // - from RequestURI if it contains full url with scheme and host;
  335. // - from Host header otherwise.
  336. //
  337. // The function doesn't follow redirects. Use Get* for following redirects.
  338. //
  339. // Response is ignored if resp is nil.
  340. //
  341. // ErrTimeout is returned if the response wasn't returned during
  342. // the given timeout.
  343. // Immediately returns ErrTimeout if timeout value is negative.
  344. //
  345. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  346. // to the requested host are busy.
  347. //
  348. // It is recommended obtaining req and resp via AcquireRequest
  349. // and AcquireResponse in performance-critical code.
  350. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  351. req.timeout = timeout
  352. if req.timeout <= 0 {
  353. return ErrTimeout
  354. }
  355. return c.Do(req, resp)
  356. }
  357. // DoDeadline performs the given request and waits for response until
  358. // the given deadline.
  359. //
  360. // Request must contain at least non-zero RequestURI with full url (including
  361. // scheme and host) or non-zero Host header + RequestURI.
  362. //
  363. // Client determines the server to be requested in the following order:
  364. //
  365. // - from RequestURI if it contains full url with scheme and host;
  366. // - from Host header otherwise.
  367. //
  368. // The function doesn't follow redirects. Use Get* for following redirects.
  369. //
  370. // Response is ignored if resp is nil.
  371. //
  372. // ErrTimeout is returned if the response wasn't returned until
  373. // the given deadline.
  374. // Immediately returns ErrTimeout if the deadline has already been reached.
  375. //
  376. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  377. // to the requested host are busy.
  378. //
  379. // It is recommended obtaining req and resp via AcquireRequest
  380. // and AcquireResponse in performance-critical code.
  381. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  382. req.timeout = time.Until(deadline)
  383. if req.timeout <= 0 {
  384. return ErrTimeout
  385. }
  386. return c.Do(req, resp)
  387. }
  388. // DoRedirects performs the given http request and fills the given http response,
  389. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  390. // maxRedirectsCount, ErrTooManyRedirects is returned.
  391. //
  392. // Request must contain at least non-zero RequestURI with full url (including
  393. // scheme and host) or non-zero Host header + RequestURI.
  394. //
  395. // Client determines the server to be requested in the following order:
  396. //
  397. // - from RequestURI if it contains full url with scheme and host;
  398. // - from Host header otherwise.
  399. //
  400. // Response is ignored if resp is nil.
  401. //
  402. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  403. // to the requested host are busy.
  404. //
  405. // It is recommended obtaining req and resp via AcquireRequest
  406. // and AcquireResponse in performance-critical code.
  407. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  408. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  409. return err
  410. }
  411. // Do performs the given http request and fills the given http response.
  412. //
  413. // Request must contain at least non-zero RequestURI with full url (including
  414. // scheme and host) or non-zero Host header + RequestURI.
  415. //
  416. // Client determines the server to be requested in the following order:
  417. //
  418. // - from RequestURI if it contains full url with scheme and host;
  419. // - from Host header otherwise.
  420. //
  421. // Response is ignored if resp is nil.
  422. //
  423. // The function doesn't follow redirects. Use Get* for following redirects.
  424. //
  425. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  426. // to the requested host are busy.
  427. //
  428. // It is recommended obtaining req and resp via AcquireRequest
  429. // and AcquireResponse in performance-critical code.
  430. func (c *Client) Do(req *Request, resp *Response) error {
  431. uri := req.URI()
  432. if uri == nil {
  433. return ErrorInvalidURI
  434. }
  435. host := uri.Host()
  436. isTLS := false
  437. if uri.isHTTPS() {
  438. isTLS = true
  439. } else if !uri.isHTTP() {
  440. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  441. }
  442. c.mOnce.Do(func() {
  443. c.mLock.Lock()
  444. c.m = make(map[string]*HostClient)
  445. c.ms = make(map[string]*HostClient)
  446. c.mLock.Unlock()
  447. })
  448. startCleaner := false
  449. c.mLock.RLock()
  450. m := c.m
  451. if isTLS {
  452. m = c.ms
  453. }
  454. hc := m[string(host)]
  455. if hc != nil {
  456. atomic.AddInt32(&hc.pendingClientRequests, 1)
  457. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  458. }
  459. c.mLock.RUnlock()
  460. if hc == nil {
  461. c.mLock.Lock()
  462. hc = m[string(host)]
  463. if hc == nil {
  464. hc = &HostClient{
  465. Addr: AddMissingPort(string(host), isTLS),
  466. Name: c.Name,
  467. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  468. Dial: c.Dial,
  469. DialTimeout: c.DialTimeout,
  470. DialDualStack: c.DialDualStack,
  471. IsTLS: isTLS,
  472. TLSConfig: c.TLSConfig,
  473. MaxConns: c.MaxConnsPerHost,
  474. MaxIdleConnDuration: c.MaxIdleConnDuration,
  475. MaxConnDuration: c.MaxConnDuration,
  476. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  477. ReadBufferSize: c.ReadBufferSize,
  478. WriteBufferSize: c.WriteBufferSize,
  479. ReadTimeout: c.ReadTimeout,
  480. WriteTimeout: c.WriteTimeout,
  481. MaxResponseBodySize: c.MaxResponseBodySize,
  482. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  483. DisablePathNormalizing: c.DisablePathNormalizing,
  484. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  485. RetryIf: c.RetryIf,
  486. ConnPoolStrategy: c.ConnPoolStrategy,
  487. StreamResponseBody: c.StreamResponseBody,
  488. clientReaderPool: &c.readerPool,
  489. clientWriterPool: &c.writerPool,
  490. }
  491. if c.ConfigureClient != nil {
  492. if err := c.ConfigureClient(hc); err != nil {
  493. c.mLock.Unlock()
  494. return err
  495. }
  496. }
  497. m[string(host)] = hc
  498. if len(m) == 1 {
  499. startCleaner = true
  500. }
  501. }
  502. atomic.AddInt32(&hc.pendingClientRequests, 1)
  503. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  504. c.mLock.Unlock()
  505. }
  506. if startCleaner {
  507. go c.mCleaner(m)
  508. }
  509. return hc.Do(req, resp)
  510. }
  511. // CloseIdleConnections closes any connections which were previously
  512. // connected from previous requests but are now sitting idle in a
  513. // "keep-alive" state. It does not interrupt any connections currently
  514. // in use.
  515. func (c *Client) CloseIdleConnections() {
  516. c.mLock.RLock()
  517. for _, v := range c.m {
  518. v.CloseIdleConnections()
  519. }
  520. for _, v := range c.ms {
  521. v.CloseIdleConnections()
  522. }
  523. c.mLock.RUnlock()
  524. }
  525. func (c *Client) mCleaner(m map[string]*HostClient) {
  526. mustStop := false
  527. sleep := c.MaxIdleConnDuration
  528. if sleep < time.Second {
  529. sleep = time.Second
  530. } else if sleep > 10*time.Second {
  531. sleep = 10 * time.Second
  532. }
  533. for {
  534. time.Sleep(sleep)
  535. c.mLock.Lock()
  536. for k, v := range m {
  537. v.connsLock.Lock()
  538. /* #nosec G601 */
  539. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  540. delete(m, k)
  541. }
  542. v.connsLock.Unlock()
  543. }
  544. if len(m) == 0 {
  545. mustStop = true
  546. }
  547. c.mLock.Unlock()
  548. if mustStop {
  549. break
  550. }
  551. }
  552. }
  553. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  554. // http client may establish per host by default (i.e. if
  555. // Client.MaxConnsPerHost isn't set).
  556. const DefaultMaxConnsPerHost = 512
  557. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  558. // connection is closed.
  559. const DefaultMaxIdleConnDuration = 10 * time.Second
  560. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  561. const DefaultMaxIdemponentCallAttempts = 5
  562. // DialFunc must establish connection to addr.
  563. //
  564. // There is no need in establishing TLS (SSL) connection for https.
  565. // The client automatically converts connection to TLS
  566. // if HostClient.IsTLS is set.
  567. //
  568. // TCP address passed to DialFunc always contains host and port.
  569. // Example TCP addr values:
  570. //
  571. // - foobar.com:80
  572. // - foobar.com:443
  573. // - foobar.com:8080
  574. type DialFunc func(addr string) (net.Conn, error)
  575. // DialFuncWithTimeout must establish connection to addr.
  576. // Unlike DialFunc, it also accepts a timeout.
  577. //
  578. // There is no need in establishing TLS (SSL) connection for https.
  579. // The client automatically converts connection to TLS
  580. // if HostClient.IsTLS is set.
  581. //
  582. // TCP address passed to DialFuncWithTimeout always contains host and port.
  583. // Example TCP addr values:
  584. //
  585. // - foobar.com:80
  586. // - foobar.com:443
  587. // - foobar.com:8080
  588. type DialFuncWithTimeout func(addr string, timeout time.Duration) (net.Conn, error)
  589. // RetryIfFunc signature of retry if function.
  590. //
  591. // Request argument passed to RetryIfFunc, if there are any request errors.
  592. type RetryIfFunc func(request *Request) bool
  593. // RoundTripper wraps every request/response.
  594. type RoundTripper interface {
  595. RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
  596. }
  597. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue.
  598. type ConnPoolStrategyType int
  599. const (
  600. FIFO ConnPoolStrategyType = iota
  601. LIFO
  602. )
  603. // HostClient balances http requests among hosts listed in Addr.
  604. //
  605. // HostClient may be used for balancing load among multiple upstream hosts.
  606. // While multiple addresses passed to HostClient.Addr may be used for balancing
  607. // load among them, it would be better using LBClient instead, since HostClient
  608. // may unevenly balance load among upstream hosts.
  609. //
  610. // It is forbidden copying HostClient instances. Create new instances instead.
  611. //
  612. // It is safe calling HostClient methods from concurrently running goroutines.
  613. type HostClient struct {
  614. noCopy noCopy
  615. // Comma-separated list of upstream HTTP server host addresses,
  616. // which are passed to Dial or DialTimeout in a round-robin manner.
  617. //
  618. // Each address may contain port if default dialer is used.
  619. // For example,
  620. //
  621. // - foobar.com:80
  622. // - foobar.com:443
  623. // - foobar.com:8080
  624. Addr string
  625. // Client name. Used in User-Agent request header.
  626. Name string
  627. // NoDefaultUserAgentHeader when set to true, causes the default
  628. // User-Agent header to be excluded from the Request.
  629. NoDefaultUserAgentHeader bool
  630. // Callback for establishing new connections to hosts.
  631. //
  632. // Default DialTimeout is used if not set.
  633. DialTimeout DialFuncWithTimeout
  634. // Callback for establishing new connections to hosts.
  635. //
  636. // Note that if Dial is set instead of DialTimeout, Dial will ignore Request timeout.
  637. // If you want the tcp dial process to account for request timeouts, use DialTimeout instead.
  638. //
  639. // If not set, DialTimeout is used.
  640. Dial DialFunc
  641. // Attempt to connect to both ipv4 and ipv6 host addresses
  642. // if set to true.
  643. //
  644. // This option is used only if default TCP dialer is used,
  645. // i.e. if Dial and DialTimeout are blank.
  646. //
  647. // By default client connects only to ipv4 addresses,
  648. // since unfortunately ipv6 remains broken in many networks worldwide :)
  649. DialDualStack bool
  650. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  651. IsTLS bool
  652. // Optional TLS config.
  653. TLSConfig *tls.Config
  654. // Maximum number of connections which may be established to all hosts
  655. // listed in Addr.
  656. //
  657. // You can change this value while the HostClient is being used
  658. // with HostClient.SetMaxConns(value)
  659. //
  660. // DefaultMaxConnsPerHost is used if not set.
  661. MaxConns int
  662. // Keep-alive connections are closed after this duration.
  663. //
  664. // By default connection duration is unlimited.
  665. MaxConnDuration time.Duration
  666. // Idle keep-alive connections are closed after this duration.
  667. //
  668. // By default idle connections are closed
  669. // after DefaultMaxIdleConnDuration.
  670. MaxIdleConnDuration time.Duration
  671. // Maximum number of attempts for idempotent calls.
  672. //
  673. // DefaultMaxIdemponentCallAttempts is used if not set.
  674. MaxIdemponentCallAttempts int
  675. // Per-connection buffer size for responses' reading.
  676. // This also limits the maximum header size.
  677. //
  678. // Default buffer size is used if 0.
  679. ReadBufferSize int
  680. // Per-connection buffer size for requests' writing.
  681. //
  682. // Default buffer size is used if 0.
  683. WriteBufferSize int
  684. // Maximum duration for full response reading (including body).
  685. //
  686. // By default response read timeout is unlimited.
  687. ReadTimeout time.Duration
  688. // Maximum duration for full request writing (including body).
  689. //
  690. // By default request write timeout is unlimited.
  691. WriteTimeout time.Duration
  692. // Maximum response body size.
  693. //
  694. // The client returns ErrBodyTooLarge if this limit is greater than 0
  695. // and response body is greater than the limit.
  696. //
  697. // By default response body size is unlimited.
  698. MaxResponseBodySize int
  699. // Header names are passed as-is without normalization
  700. // if this option is set.
  701. //
  702. // Disabled header names' normalization may be useful only for proxying
  703. // responses to other clients expecting case-sensitive
  704. // header names. See https://github.com/valyala/fasthttp/issues/57
  705. // for details.
  706. //
  707. // By default request and response header names are normalized, i.e.
  708. // The first letter and the first letters following dashes
  709. // are uppercased, while all the other letters are lowercased.
  710. // Examples:
  711. //
  712. // * HOST -> Host
  713. // * content-type -> Content-Type
  714. // * cONTENT-lenGTH -> Content-Length
  715. DisableHeaderNamesNormalizing bool
  716. // Path values are sent as-is without normalization.
  717. //
  718. // Disabled path normalization may be useful for proxying incoming requests
  719. // to servers that are expecting paths to be forwarded as-is.
  720. //
  721. // By default path values are normalized, i.e.
  722. // extra slashes are removed, special characters are encoded.
  723. DisablePathNormalizing bool
  724. // Will not log potentially sensitive content in error logs.
  725. //
  726. // This option is useful for servers that handle sensitive data
  727. // in the request/response.
  728. //
  729. // Client logs full errors by default.
  730. SecureErrorLogMessage bool
  731. // Maximum duration for waiting for a free connection.
  732. //
  733. // By default will not waiting, return ErrNoFreeConns immediately
  734. MaxConnWaitTimeout time.Duration
  735. // RetryIf controls whether a retry should be attempted after an error.
  736. //
  737. // By default will use isIdempotent function
  738. RetryIf RetryIfFunc
  739. // Transport defines a transport-like mechanism that wraps every request/response.
  740. Transport RoundTripper
  741. // Connection pool strategy. Can be either LIFO or FIFO (default).
  742. ConnPoolStrategy ConnPoolStrategyType
  743. // StreamResponseBody enables response body streaming.
  744. StreamResponseBody bool
  745. lastUseTime uint32
  746. connsLock sync.Mutex
  747. connsCount int
  748. conns []*clientConn
  749. connsWait *wantConnQueue
  750. addrsLock sync.Mutex
  751. addrs []string
  752. addrIdx uint32
  753. tlsConfigMap map[string]*tls.Config
  754. tlsConfigMapLock sync.Mutex
  755. readerPool sync.Pool
  756. writerPool sync.Pool
  757. clientReaderPool *sync.Pool
  758. clientWriterPool *sync.Pool
  759. pendingRequests int32
  760. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  761. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  762. pendingClientRequests int32
  763. connsCleanerRun bool
  764. }
  765. type clientConn struct {
  766. c net.Conn
  767. createdTime time.Time
  768. lastUseTime time.Time
  769. }
  770. var startTimeUnix = time.Now().Unix()
  771. // LastUseTime returns time the client was last used.
  772. func (c *HostClient) LastUseTime() time.Time {
  773. n := atomic.LoadUint32(&c.lastUseTime)
  774. return time.Unix(startTimeUnix+int64(n), 0)
  775. }
  776. // Get returns the status code and body of url.
  777. //
  778. // The contents of dst will be replaced by the body and returned, if the dst
  779. // is too small a new slice will be allocated.
  780. //
  781. // The function follows redirects. Use Do* for manually handling redirects.
  782. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  783. return clientGetURL(dst, url, c)
  784. }
  785. // GetTimeout returns the status code and body of url.
  786. //
  787. // The contents of dst will be replaced by the body and returned, if the dst
  788. // is too small a new slice will be allocated.
  789. //
  790. // The function follows redirects. Use Do* for manually handling redirects.
  791. //
  792. // ErrTimeout error is returned if url contents couldn't be fetched
  793. // during the given timeout.
  794. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  795. return clientGetURLTimeout(dst, url, timeout, c)
  796. }
  797. // GetDeadline returns the status code and body of url.
  798. //
  799. // The contents of dst will be replaced by the body and returned, if the dst
  800. // is too small a new slice will be allocated.
  801. //
  802. // The function follows redirects. Use Do* for manually handling redirects.
  803. //
  804. // ErrTimeout error is returned if url contents couldn't be fetched
  805. // until the given deadline.
  806. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  807. return clientGetURLDeadline(dst, url, deadline, c)
  808. }
  809. // Post sends POST request to the given url with the given POST arguments.
  810. //
  811. // The contents of dst will be replaced by the body and returned, if the dst
  812. // is too small a new slice will be allocated.
  813. //
  814. // The function follows redirects. Use Do* for manually handling redirects.
  815. //
  816. // Empty POST body is sent if postArgs is nil.
  817. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  818. return clientPostURL(dst, url, postArgs, c)
  819. }
  820. type clientDoer interface {
  821. Do(req *Request, resp *Response) error
  822. }
  823. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  824. req := AcquireRequest()
  825. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  826. ReleaseRequest(req)
  827. return statusCode, body, err
  828. }
  829. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  830. deadline := time.Now().Add(timeout)
  831. return clientGetURLDeadline(dst, url, deadline, c)
  832. }
  833. type clientURLResponse struct {
  834. statusCode int
  835. body []byte
  836. err error
  837. }
  838. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  839. timeout := time.Until(deadline)
  840. if timeout <= 0 {
  841. return 0, dst, ErrTimeout
  842. }
  843. var ch chan clientURLResponse
  844. chv := clientURLResponseChPool.Get()
  845. if chv == nil {
  846. chv = make(chan clientURLResponse, 1)
  847. }
  848. ch = chv.(chan clientURLResponse)
  849. // Note that the request continues execution on ErrTimeout until
  850. // client-specific ReadTimeout exceeds. This helps limiting load
  851. // on slow hosts by MaxConns* concurrent requests.
  852. //
  853. // Without this 'hack' the load on slow host could exceed MaxConns*
  854. // concurrent requests, since timed out requests on client side
  855. // usually continue execution on the host.
  856. var mu sync.Mutex
  857. var timedout, responded bool
  858. go func() {
  859. req := AcquireRequest()
  860. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  861. mu.Lock()
  862. if !timedout {
  863. ch <- clientURLResponse{
  864. statusCode: statusCodeCopy,
  865. body: bodyCopy,
  866. err: errCopy,
  867. }
  868. responded = true
  869. }
  870. mu.Unlock()
  871. ReleaseRequest(req)
  872. }()
  873. tc := AcquireTimer(timeout)
  874. select {
  875. case resp := <-ch:
  876. statusCode = resp.statusCode
  877. body = resp.body
  878. err = resp.err
  879. case <-tc.C:
  880. mu.Lock()
  881. if responded {
  882. resp := <-ch
  883. statusCode = resp.statusCode
  884. body = resp.body
  885. err = resp.err
  886. } else {
  887. timedout = true
  888. err = ErrTimeout
  889. body = dst
  890. }
  891. mu.Unlock()
  892. }
  893. ReleaseTimer(tc)
  894. clientURLResponseChPool.Put(chv)
  895. return statusCode, body, err
  896. }
  897. var clientURLResponseChPool sync.Pool
  898. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  899. req := AcquireRequest()
  900. defer ReleaseRequest(req)
  901. req.Header.SetMethod(MethodPost)
  902. req.Header.SetContentTypeBytes(strPostArgsContentType)
  903. if postArgs != nil {
  904. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  905. return 0, nil, err
  906. }
  907. }
  908. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  909. return statusCode, body, err
  910. }
  911. var (
  912. // ErrMissingLocation is returned by clients when the Location header is missing on
  913. // an HTTP response with a redirect status code.
  914. ErrMissingLocation = errors.New("missing Location header for http redirect")
  915. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  916. // exceed the max count.
  917. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  918. // HostClients are only able to follow redirects to the same protocol.
  919. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol," +
  920. " please use Client instead")
  921. )
  922. const defaultMaxRedirectsCount = 16
  923. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  924. resp := AcquireResponse()
  925. bodyBuf := resp.bodyBuffer()
  926. resp.keepBodyBuffer = true
  927. oldBody := bodyBuf.B
  928. bodyBuf.B = dst
  929. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  930. body = bodyBuf.B
  931. bodyBuf.B = oldBody
  932. resp.keepBodyBuffer = false
  933. ReleaseResponse(resp)
  934. return statusCode, body, err
  935. }
  936. func doRequestFollowRedirects(
  937. req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer,
  938. ) (statusCode int, body []byte, err error) {
  939. redirectsCount := 0
  940. for {
  941. req.SetRequestURI(url)
  942. if err := req.parseURI(); err != nil {
  943. return 0, nil, err
  944. }
  945. if err = c.Do(req, resp); err != nil {
  946. break
  947. }
  948. statusCode = resp.Header.StatusCode()
  949. if !StatusCodeIsRedirect(statusCode) {
  950. break
  951. }
  952. redirectsCount++
  953. if redirectsCount > maxRedirectsCount {
  954. err = ErrTooManyRedirects
  955. break
  956. }
  957. location := resp.Header.peek(strLocation)
  958. if len(location) == 0 {
  959. err = ErrMissingLocation
  960. break
  961. }
  962. url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
  963. }
  964. return statusCode, body, err
  965. }
  966. func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
  967. u := AcquireURI()
  968. u.Update(baseURL)
  969. u.UpdateBytes(location)
  970. u.DisablePathNormalizing = disablePathNormalizing
  971. redirectURL := u.String()
  972. ReleaseURI(u)
  973. return redirectURL
  974. }
  975. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  976. func StatusCodeIsRedirect(statusCode int) bool {
  977. return statusCode == StatusMovedPermanently ||
  978. statusCode == StatusFound ||
  979. statusCode == StatusSeeOther ||
  980. statusCode == StatusTemporaryRedirect ||
  981. statusCode == StatusPermanentRedirect
  982. }
  983. var (
  984. requestPool sync.Pool
  985. responsePool sync.Pool
  986. )
  987. // AcquireRequest returns an empty Request instance from request pool.
  988. //
  989. // The returned Request instance may be passed to ReleaseRequest when it is
  990. // no longer needed. This allows Request recycling, reduces GC pressure
  991. // and usually improves performance.
  992. func AcquireRequest() *Request {
  993. v := requestPool.Get()
  994. if v == nil {
  995. return &Request{}
  996. }
  997. return v.(*Request)
  998. }
  999. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  1000. //
  1001. // It is forbidden accessing req and/or its' members after returning
  1002. // it to request pool.
  1003. func ReleaseRequest(req *Request) {
  1004. req.Reset()
  1005. requestPool.Put(req)
  1006. }
  1007. // AcquireResponse returns an empty Response instance from response pool.
  1008. //
  1009. // The returned Response instance may be passed to ReleaseResponse when it is
  1010. // no longer needed. This allows Response recycling, reduces GC pressure
  1011. // and usually improves performance.
  1012. func AcquireResponse() *Response {
  1013. v := responsePool.Get()
  1014. if v == nil {
  1015. return &Response{}
  1016. }
  1017. return v.(*Response)
  1018. }
  1019. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  1020. //
  1021. // It is forbidden accessing resp and/or its' members after returning
  1022. // it to response pool.
  1023. func ReleaseResponse(resp *Response) {
  1024. resp.Reset()
  1025. responsePool.Put(resp)
  1026. }
  1027. // DoTimeout performs the given request and waits for response during
  1028. // the given timeout duration.
  1029. //
  1030. // Request must contain at least non-zero RequestURI with full url (including
  1031. // scheme and host) or non-zero Host header + RequestURI.
  1032. //
  1033. // The function doesn't follow redirects. Use Get* for following redirects.
  1034. //
  1035. // Response is ignored if resp is nil.
  1036. //
  1037. // ErrTimeout is returned if the response wasn't returned during
  1038. // the given timeout.
  1039. // Immediately returns ErrTimeout if timeout value is negative.
  1040. //
  1041. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1042. // to the host are busy.
  1043. //
  1044. // It is recommended obtaining req and resp via AcquireRequest
  1045. // and AcquireResponse in performance-critical code.
  1046. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1047. req.timeout = timeout
  1048. if req.timeout <= 0 {
  1049. return ErrTimeout
  1050. }
  1051. return c.Do(req, resp)
  1052. }
  1053. // DoDeadline performs the given request and waits for response until
  1054. // the given deadline.
  1055. //
  1056. // Request must contain at least non-zero RequestURI with full url (including
  1057. // scheme and host) or non-zero Host header + RequestURI.
  1058. //
  1059. // The function doesn't follow redirects. Use Get* for following redirects.
  1060. //
  1061. // Response is ignored if resp is nil.
  1062. //
  1063. // ErrTimeout is returned if the response wasn't returned until
  1064. // the given deadline.
  1065. // Immediately returns ErrTimeout if the deadline has already been reached.
  1066. //
  1067. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1068. // to the host are busy.
  1069. //
  1070. // It is recommended obtaining req and resp via AcquireRequest
  1071. // and AcquireResponse in performance-critical code.
  1072. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1073. req.timeout = time.Until(deadline)
  1074. if req.timeout <= 0 {
  1075. return ErrTimeout
  1076. }
  1077. return c.Do(req, resp)
  1078. }
  1079. // DoRedirects performs the given http request and fills the given http response,
  1080. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1081. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1082. //
  1083. // Request must contain at least non-zero RequestURI with full url (including
  1084. // scheme and host) or non-zero Host header + RequestURI.
  1085. //
  1086. // Client determines the server to be requested in the following order:
  1087. //
  1088. // - from RequestURI if it contains full url with scheme and host;
  1089. // - from Host header otherwise.
  1090. //
  1091. // Response is ignored if resp is nil.
  1092. //
  1093. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1094. // to the requested host are busy.
  1095. //
  1096. // It is recommended obtaining req and resp via AcquireRequest
  1097. // and AcquireResponse in performance-critical code.
  1098. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1099. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1100. return err
  1101. }
  1102. // Do performs the given http request and sets the corresponding response.
  1103. //
  1104. // Request must contain at least non-zero RequestURI with full url (including
  1105. // scheme and host) or non-zero Host header + RequestURI.
  1106. //
  1107. // The function doesn't follow redirects. Use Get* for following redirects.
  1108. //
  1109. // Response is ignored if resp is nil.
  1110. //
  1111. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1112. // to the host are busy.
  1113. //
  1114. // It is recommended obtaining req and resp via AcquireRequest
  1115. // and AcquireResponse in performance-critical code.
  1116. func (c *HostClient) Do(req *Request, resp *Response) error {
  1117. var err error
  1118. var retry bool
  1119. maxAttempts := c.MaxIdemponentCallAttempts
  1120. if maxAttempts <= 0 {
  1121. maxAttempts = DefaultMaxIdemponentCallAttempts
  1122. }
  1123. isRequestRetryable := isIdempotent
  1124. if c.RetryIf != nil {
  1125. isRequestRetryable = c.RetryIf
  1126. }
  1127. attempts := 0
  1128. hasBodyStream := req.IsBodyStream()
  1129. // If a request has a timeout we store the timeout
  1130. // and calculate a deadline so we can keep updating the
  1131. // timeout on each retry.
  1132. deadline := time.Time{}
  1133. timeout := req.timeout
  1134. if timeout > 0 {
  1135. deadline = time.Now().Add(timeout)
  1136. }
  1137. atomic.AddInt32(&c.pendingRequests, 1)
  1138. for {
  1139. // If the original timeout was set, we need to update
  1140. // the one set on the request to reflect the remaining time.
  1141. if timeout > 0 {
  1142. req.timeout = time.Until(deadline)
  1143. if req.timeout <= 0 {
  1144. err = ErrTimeout
  1145. break
  1146. }
  1147. }
  1148. retry, err = c.do(req, resp)
  1149. if err == nil || !retry {
  1150. break
  1151. }
  1152. if hasBodyStream {
  1153. break
  1154. }
  1155. if !isRequestRetryable(req) {
  1156. // Retry non-idempotent requests if the server closes
  1157. // the connection before sending the response.
  1158. //
  1159. // This case is possible if the server closes the idle
  1160. // keep-alive connection on timeout.
  1161. //
  1162. // Apache and nginx usually do this.
  1163. if err != io.EOF {
  1164. break
  1165. }
  1166. }
  1167. attempts++
  1168. if attempts >= maxAttempts {
  1169. break
  1170. }
  1171. }
  1172. atomic.AddInt32(&c.pendingRequests, -1)
  1173. // Restore the original timeout.
  1174. req.timeout = timeout
  1175. if err == io.EOF {
  1176. err = ErrConnectionClosed
  1177. }
  1178. return err
  1179. }
  1180. // PendingRequests returns the current number of requests the client
  1181. // is executing.
  1182. //
  1183. // This function may be used for balancing load among multiple HostClient
  1184. // instances.
  1185. func (c *HostClient) PendingRequests() int {
  1186. return int(atomic.LoadInt32(&c.pendingRequests))
  1187. }
  1188. func isIdempotent(req *Request) bool {
  1189. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1190. }
  1191. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1192. if resp == nil {
  1193. resp = AcquireResponse()
  1194. defer ReleaseResponse(resp)
  1195. }
  1196. ok, err := c.doNonNilReqResp(req, resp)
  1197. return ok, err
  1198. }
  1199. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1200. if req == nil {
  1201. // for debugging purposes
  1202. panic("BUG: req cannot be nil")
  1203. }
  1204. if resp == nil {
  1205. // for debugging purposes
  1206. panic("BUG: resp cannot be nil")
  1207. }
  1208. // Secure header error logs configuration
  1209. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1210. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1211. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1212. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1213. if c.IsTLS != req.URI().isHTTPS() {
  1214. return false, ErrHostClientRedirectToDifferentScheme
  1215. }
  1216. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
  1217. // Free up resources occupied by response before sending the request,
  1218. // so the GC may reclaim these resources (e.g. response body).
  1219. // backing up SkipBody in case it was set explicitly
  1220. customSkipBody := resp.SkipBody
  1221. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1222. resp.Reset()
  1223. resp.SkipBody = customSkipBody
  1224. resp.StreamBody = customStreamBody
  1225. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1226. userAgentOld := req.Header.UserAgent()
  1227. if len(userAgentOld) == 0 {
  1228. userAgent := c.Name
  1229. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1230. userAgent = defaultUserAgent
  1231. }
  1232. if userAgent != "" {
  1233. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  1234. }
  1235. }
  1236. return c.transport().RoundTrip(c, req, resp)
  1237. }
  1238. func (c *HostClient) transport() RoundTripper {
  1239. if c.Transport == nil {
  1240. return DefaultTransport
  1241. }
  1242. return c.Transport
  1243. }
  1244. var (
  1245. // ErrNoFreeConns is returned when no free connections available
  1246. // to the given host.
  1247. //
  1248. // Increase the allowed number of connections per host if you
  1249. // see this error.
  1250. ErrNoFreeConns = errors.New("no free connections available to host")
  1251. // ErrConnectionClosed may be returned from client methods if the server
  1252. // closes connection before returning the first response byte.
  1253. //
  1254. // If you see this error, then either fix the server by returning
  1255. // 'Connection: close' response header before closing the connection
  1256. // or add 'Connection: close' request header before sending requests
  1257. // to broken server.
  1258. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1259. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1260. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1261. // If you see this error, then you need to check your HostClient configuration.
  1262. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1263. )
  1264. type timeoutError struct{}
  1265. func (e *timeoutError) Error() string {
  1266. return "timeout"
  1267. }
  1268. // Only implement the Timeout() function of the net.Error interface.
  1269. // This allows for checks like:
  1270. //
  1271. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1272. func (e *timeoutError) Timeout() bool {
  1273. return true
  1274. }
  1275. // ErrTimeout is returned from timed out calls.
  1276. var ErrTimeout = &timeoutError{}
  1277. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1278. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1279. c.connsLock.Lock()
  1280. c.MaxConns = newMaxConns
  1281. c.connsLock.Unlock()
  1282. }
  1283. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1284. createConn := false
  1285. startCleaner := false
  1286. var n int
  1287. c.connsLock.Lock()
  1288. n = len(c.conns)
  1289. if n == 0 {
  1290. maxConns := c.MaxConns
  1291. if maxConns <= 0 {
  1292. maxConns = DefaultMaxConnsPerHost
  1293. }
  1294. if c.connsCount < maxConns {
  1295. c.connsCount++
  1296. createConn = true
  1297. if !c.connsCleanerRun && !connectionClose {
  1298. startCleaner = true
  1299. c.connsCleanerRun = true
  1300. }
  1301. }
  1302. } else {
  1303. switch c.ConnPoolStrategy {
  1304. case LIFO:
  1305. n--
  1306. cc = c.conns[n]
  1307. c.conns[n] = nil
  1308. c.conns = c.conns[:n]
  1309. case FIFO:
  1310. cc = c.conns[0]
  1311. copy(c.conns, c.conns[1:])
  1312. c.conns[n-1] = nil
  1313. c.conns = c.conns[:n-1]
  1314. default:
  1315. c.connsLock.Unlock()
  1316. return nil, ErrConnPoolStrategyNotImpl
  1317. }
  1318. }
  1319. c.connsLock.Unlock()
  1320. if cc != nil {
  1321. return cc, nil
  1322. }
  1323. if !createConn {
  1324. if c.MaxConnWaitTimeout <= 0 {
  1325. return nil, ErrNoFreeConns
  1326. }
  1327. //nolint:dupword
  1328. // reqTimeout c.MaxConnWaitTimeout wait duration
  1329. // d1 d2 min(d1, d2)
  1330. // 0(not set) d2 d2
  1331. // d1 0(don't wait) 0(don't wait)
  1332. // 0(not set) d2 d2
  1333. timeout := c.MaxConnWaitTimeout
  1334. timeoutOverridden := false
  1335. // reqTimeout == 0 means not set
  1336. if reqTimeout > 0 && reqTimeout < timeout {
  1337. timeout = reqTimeout
  1338. timeoutOverridden = true
  1339. }
  1340. // wait for a free connection
  1341. tc := AcquireTimer(timeout)
  1342. defer ReleaseTimer(tc)
  1343. w := &wantConn{
  1344. ready: make(chan struct{}, 1),
  1345. }
  1346. defer func() {
  1347. if err != nil {
  1348. w.cancel(c, err)
  1349. }
  1350. }()
  1351. c.queueForIdle(w)
  1352. select {
  1353. case <-w.ready:
  1354. return w.conn, w.err
  1355. case <-tc.C:
  1356. if timeoutOverridden {
  1357. return nil, ErrTimeout
  1358. }
  1359. return nil, ErrNoFreeConns
  1360. }
  1361. }
  1362. if startCleaner {
  1363. go c.connsCleaner()
  1364. }
  1365. conn, err := c.dialHostHard(reqTimeout)
  1366. if err != nil {
  1367. c.decConnsCount()
  1368. return nil, err
  1369. }
  1370. cc = acquireClientConn(conn)
  1371. return cc, nil
  1372. }
  1373. func (c *HostClient) queueForIdle(w *wantConn) {
  1374. c.connsLock.Lock()
  1375. defer c.connsLock.Unlock()
  1376. if c.connsWait == nil {
  1377. c.connsWait = &wantConnQueue{}
  1378. }
  1379. c.connsWait.clearFront()
  1380. c.connsWait.pushBack(w)
  1381. }
  1382. func (c *HostClient) dialConnFor(w *wantConn) {
  1383. conn, err := c.dialHostHard(0)
  1384. if err != nil {
  1385. w.tryDeliver(nil, err)
  1386. c.decConnsCount()
  1387. return
  1388. }
  1389. cc := acquireClientConn(conn)
  1390. if !w.tryDeliver(cc, nil) {
  1391. // not delivered, return idle connection
  1392. c.releaseConn(cc)
  1393. }
  1394. }
  1395. // CloseIdleConnections closes any connections which were previously
  1396. // connected from previous requests but are now sitting idle in a
  1397. // "keep-alive" state. It does not interrupt any connections currently
  1398. // in use.
  1399. func (c *HostClient) CloseIdleConnections() {
  1400. c.connsLock.Lock()
  1401. scratch := append([]*clientConn{}, c.conns...)
  1402. for i := range c.conns {
  1403. c.conns[i] = nil
  1404. }
  1405. c.conns = c.conns[:0]
  1406. c.connsLock.Unlock()
  1407. for _, cc := range scratch {
  1408. c.closeConn(cc)
  1409. }
  1410. }
  1411. func (c *HostClient) connsCleaner() {
  1412. var (
  1413. scratch []*clientConn
  1414. maxIdleConnDuration = c.MaxIdleConnDuration
  1415. )
  1416. if maxIdleConnDuration <= 0 {
  1417. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1418. }
  1419. for {
  1420. currentTime := time.Now()
  1421. // Determine idle connections to be closed.
  1422. c.connsLock.Lock()
  1423. conns := c.conns
  1424. n := len(conns)
  1425. i := 0
  1426. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1427. i++
  1428. }
  1429. sleepFor := maxIdleConnDuration
  1430. if i < n {
  1431. // + 1 so we actually sleep past the expiration time and not up to it.
  1432. // Otherwise the > check above would still fail.
  1433. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1434. }
  1435. scratch = append(scratch[:0], conns[:i]...)
  1436. if i > 0 {
  1437. m := copy(conns, conns[i:])
  1438. for i = m; i < n; i++ {
  1439. conns[i] = nil
  1440. }
  1441. c.conns = conns[:m]
  1442. }
  1443. c.connsLock.Unlock()
  1444. // Close idle connections.
  1445. for i, cc := range scratch {
  1446. c.closeConn(cc)
  1447. scratch[i] = nil
  1448. }
  1449. // Determine whether to stop the connsCleaner.
  1450. c.connsLock.Lock()
  1451. mustStop := c.connsCount == 0
  1452. if mustStop {
  1453. c.connsCleanerRun = false
  1454. }
  1455. c.connsLock.Unlock()
  1456. if mustStop {
  1457. break
  1458. }
  1459. time.Sleep(sleepFor)
  1460. }
  1461. }
  1462. func (c *HostClient) closeConn(cc *clientConn) {
  1463. c.decConnsCount()
  1464. cc.c.Close()
  1465. releaseClientConn(cc)
  1466. }
  1467. func (c *HostClient) decConnsCount() {
  1468. if c.MaxConnWaitTimeout <= 0 {
  1469. c.connsLock.Lock()
  1470. c.connsCount--
  1471. c.connsLock.Unlock()
  1472. return
  1473. }
  1474. c.connsLock.Lock()
  1475. defer c.connsLock.Unlock()
  1476. dialed := false
  1477. if q := c.connsWait; q != nil && q.len() > 0 {
  1478. for q.len() > 0 {
  1479. w := q.popFront()
  1480. if w.waiting() {
  1481. go c.dialConnFor(w)
  1482. dialed = true
  1483. break
  1484. }
  1485. }
  1486. }
  1487. if !dialed {
  1488. c.connsCount--
  1489. }
  1490. }
  1491. // ConnsCount returns connection count of HostClient.
  1492. func (c *HostClient) ConnsCount() int {
  1493. c.connsLock.Lock()
  1494. defer c.connsLock.Unlock()
  1495. return c.connsCount
  1496. }
  1497. func acquireClientConn(conn net.Conn) *clientConn {
  1498. v := clientConnPool.Get()
  1499. if v == nil {
  1500. v = &clientConn{}
  1501. }
  1502. cc := v.(*clientConn)
  1503. cc.c = conn
  1504. cc.createdTime = time.Now()
  1505. return cc
  1506. }
  1507. func releaseClientConn(cc *clientConn) {
  1508. // Reset all fields.
  1509. *cc = clientConn{}
  1510. clientConnPool.Put(cc)
  1511. }
  1512. var clientConnPool sync.Pool
  1513. func (c *HostClient) releaseConn(cc *clientConn) {
  1514. cc.lastUseTime = time.Now()
  1515. if c.MaxConnWaitTimeout <= 0 {
  1516. c.connsLock.Lock()
  1517. c.conns = append(c.conns, cc)
  1518. c.connsLock.Unlock()
  1519. return
  1520. }
  1521. // try to deliver an idle connection to a *wantConn
  1522. c.connsLock.Lock()
  1523. defer c.connsLock.Unlock()
  1524. delivered := false
  1525. if q := c.connsWait; q != nil && q.len() > 0 {
  1526. for q.len() > 0 {
  1527. w := q.popFront()
  1528. if w.waiting() {
  1529. delivered = w.tryDeliver(cc, nil)
  1530. break
  1531. }
  1532. }
  1533. }
  1534. if !delivered {
  1535. c.conns = append(c.conns, cc)
  1536. }
  1537. }
  1538. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1539. var v any
  1540. if c.clientWriterPool != nil {
  1541. v = c.clientWriterPool.Get()
  1542. if v == nil {
  1543. n := c.WriteBufferSize
  1544. if n <= 0 {
  1545. n = defaultWriteBufferSize
  1546. }
  1547. return bufio.NewWriterSize(conn, n)
  1548. }
  1549. } else {
  1550. v = c.writerPool.Get()
  1551. if v == nil {
  1552. n := c.WriteBufferSize
  1553. if n <= 0 {
  1554. n = defaultWriteBufferSize
  1555. }
  1556. return bufio.NewWriterSize(conn, n)
  1557. }
  1558. }
  1559. bw := v.(*bufio.Writer)
  1560. bw.Reset(conn)
  1561. return bw
  1562. }
  1563. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1564. if c.clientWriterPool != nil {
  1565. c.clientWriterPool.Put(bw)
  1566. } else {
  1567. c.writerPool.Put(bw)
  1568. }
  1569. }
  1570. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1571. var v any
  1572. if c.clientReaderPool != nil {
  1573. v = c.clientReaderPool.Get()
  1574. if v == nil {
  1575. n := c.ReadBufferSize
  1576. if n <= 0 {
  1577. n = defaultReadBufferSize
  1578. }
  1579. return bufio.NewReaderSize(conn, n)
  1580. }
  1581. } else {
  1582. v = c.readerPool.Get()
  1583. if v == nil {
  1584. n := c.ReadBufferSize
  1585. if n <= 0 {
  1586. n = defaultReadBufferSize
  1587. }
  1588. return bufio.NewReaderSize(conn, n)
  1589. }
  1590. }
  1591. br := v.(*bufio.Reader)
  1592. br.Reset(conn)
  1593. return br
  1594. }
  1595. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1596. if c.clientReaderPool != nil {
  1597. c.clientReaderPool.Put(br)
  1598. } else {
  1599. c.readerPool.Put(br)
  1600. }
  1601. }
  1602. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1603. if c == nil {
  1604. c = &tls.Config{}
  1605. } else {
  1606. c = c.Clone()
  1607. }
  1608. if c.ServerName == "" {
  1609. serverName := tlsServerName(addr)
  1610. if serverName == "*" {
  1611. c.InsecureSkipVerify = true
  1612. } else {
  1613. c.ServerName = serverName
  1614. }
  1615. }
  1616. return c
  1617. }
  1618. func tlsServerName(addr string) string {
  1619. if !strings.Contains(addr, ":") {
  1620. return addr
  1621. }
  1622. host, _, err := net.SplitHostPort(addr)
  1623. if err != nil {
  1624. return "*"
  1625. }
  1626. return host
  1627. }
  1628. func (c *HostClient) nextAddr() string {
  1629. c.addrsLock.Lock()
  1630. if c.addrs == nil {
  1631. c.addrs = strings.Split(c.Addr, ",")
  1632. }
  1633. addr := c.addrs[0]
  1634. if len(c.addrs) > 1 {
  1635. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
  1636. c.addrIdx++
  1637. }
  1638. c.addrsLock.Unlock()
  1639. return addr
  1640. }
  1641. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1642. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or if
  1643. // c.DialTimeout has not been set and c.Dial has been set.
  1644. // attempt to dial all the available hosts before giving up.
  1645. c.addrsLock.Lock()
  1646. n := len(c.addrs)
  1647. c.addrsLock.Unlock()
  1648. if n == 0 {
  1649. // It looks like c.addrs isn't initialized yet.
  1650. n = 1
  1651. }
  1652. timeout := c.ReadTimeout + c.WriteTimeout
  1653. if timeout <= 0 {
  1654. timeout = DefaultDialTimeout
  1655. }
  1656. deadline := time.Now().Add(timeout)
  1657. for n > 0 {
  1658. addr := c.nextAddr()
  1659. tlsConfig := c.cachedTLSConfig(addr)
  1660. conn, err = dialAddr(addr, c.Dial, c.DialTimeout, c.DialDualStack, c.IsTLS, tlsConfig, dialTimeout, c.WriteTimeout)
  1661. if err == nil {
  1662. return conn, nil
  1663. }
  1664. if time.Since(deadline) >= 0 {
  1665. break
  1666. }
  1667. n--
  1668. }
  1669. return nil, err
  1670. }
  1671. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1672. if !c.IsTLS {
  1673. return nil
  1674. }
  1675. c.tlsConfigMapLock.Lock()
  1676. if c.tlsConfigMap == nil {
  1677. c.tlsConfigMap = make(map[string]*tls.Config)
  1678. }
  1679. cfg := c.tlsConfigMap[addr]
  1680. if cfg == nil {
  1681. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1682. c.tlsConfigMap[addr] = cfg
  1683. }
  1684. c.tlsConfigMapLock.Unlock()
  1685. return cfg
  1686. }
  1687. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1688. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1689. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1690. defer func() {
  1691. if retErr != nil {
  1692. rawConn.Close()
  1693. }
  1694. }()
  1695. conn := tls.Client(rawConn, tlsConfig)
  1696. err := conn.SetDeadline(deadline)
  1697. if err != nil {
  1698. return nil, err
  1699. }
  1700. err = conn.Handshake()
  1701. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1702. return nil, ErrTLSHandshakeTimeout
  1703. }
  1704. if err != nil {
  1705. return nil, err
  1706. }
  1707. err = conn.SetDeadline(time.Time{})
  1708. if err != nil {
  1709. return nil, err
  1710. }
  1711. return conn, nil
  1712. }
  1713. func dialAddr(
  1714. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool,
  1715. tlsConfig *tls.Config, dialTimeout, writeTimeout time.Duration,
  1716. ) (net.Conn, error) {
  1717. deadline := time.Now().Add(writeTimeout)
  1718. conn, err := callDialFunc(addr, dial, dialWithTimeout, dialDualStack, isTLS, dialTimeout)
  1719. if err != nil {
  1720. return nil, err
  1721. }
  1722. if conn == nil {
  1723. return nil, errors.New("dialling unsuccessful. Please report this bug")
  1724. }
  1725. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1726. // This doesn't cover just tls.Conn but also other TLS implementations.
  1727. _, isTLSAlready := conn.(interface{ Handshake() error })
  1728. if isTLS && !isTLSAlready {
  1729. if writeTimeout == 0 {
  1730. return tls.Client(conn, tlsConfig), nil
  1731. }
  1732. return tlsClientHandshake(conn, tlsConfig, deadline)
  1733. }
  1734. return conn, nil
  1735. }
  1736. func callDialFunc(
  1737. addr string, dial DialFunc, dialWithTimeout DialFuncWithTimeout, dialDualStack, isTLS bool, timeout time.Duration,
  1738. ) (net.Conn, error) {
  1739. if dialWithTimeout != nil {
  1740. return dialWithTimeout(addr, timeout)
  1741. }
  1742. if dial != nil {
  1743. return dial(addr)
  1744. }
  1745. addr = AddMissingPort(addr, isTLS)
  1746. if timeout > 0 {
  1747. if dialDualStack {
  1748. return DialDualStackTimeout(addr, timeout)
  1749. }
  1750. return DialTimeout(addr, timeout)
  1751. }
  1752. if dialDualStack {
  1753. return DialDualStack(addr)
  1754. }
  1755. return Dial(addr)
  1756. }
  1757. // AddMissingPort adds a port to a host if it is missing.
  1758. // A literal IPv6 address in hostport must be enclosed in square
  1759. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1760. func AddMissingPort(addr string, isTLS bool) string {
  1761. addrLen := len(addr)
  1762. if addrLen == 0 {
  1763. return addr
  1764. }
  1765. isIP6 := addr[0] == '['
  1766. if isIP6 {
  1767. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1768. isIP6WithoutPort := addr[addrLen-1] == ']'
  1769. if !isIP6WithoutPort {
  1770. return addr
  1771. }
  1772. } else { // IPv4
  1773. columnPos := strings.LastIndexByte(addr, ':')
  1774. if columnPos > 0 {
  1775. return addr
  1776. }
  1777. }
  1778. port := ":80"
  1779. if isTLS {
  1780. port = ":443"
  1781. }
  1782. return addr + port
  1783. }
  1784. // A wantConn records state about a wanted connection
  1785. // (that is, an active call to getConn).
  1786. // The conn may be gotten by dialing or by finding an idle connection,
  1787. // or a cancellation may make the conn no longer wanted.
  1788. // These three options are racing against each other and use
  1789. // wantConn to coordinate and agree about the winning outcome.
  1790. //
  1791. // Inspired by net/http/transport.go.
  1792. type wantConn struct {
  1793. ready chan struct{}
  1794. mu sync.Mutex // protects conn, err, close(ready)
  1795. conn *clientConn
  1796. err error
  1797. }
  1798. // waiting reports whether w is still waiting for an answer (connection or error).
  1799. func (w *wantConn) waiting() bool {
  1800. select {
  1801. case <-w.ready:
  1802. return false
  1803. default:
  1804. return true
  1805. }
  1806. }
  1807. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1808. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1809. w.mu.Lock()
  1810. defer w.mu.Unlock()
  1811. if w.conn != nil || w.err != nil {
  1812. return false
  1813. }
  1814. w.conn = conn
  1815. w.err = err
  1816. if w.conn == nil && w.err == nil {
  1817. panic("fasthttp: internal error: misuse of tryDeliver")
  1818. }
  1819. close(w.ready)
  1820. return true
  1821. }
  1822. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1823. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1824. func (w *wantConn) cancel(c *HostClient, err error) {
  1825. w.mu.Lock()
  1826. if w.conn == nil && w.err == nil {
  1827. close(w.ready) // catch misbehavior in future delivery
  1828. }
  1829. conn := w.conn
  1830. w.conn = nil
  1831. w.err = err
  1832. w.mu.Unlock()
  1833. if conn != nil {
  1834. c.releaseConn(conn)
  1835. }
  1836. }
  1837. // A wantConnQueue is a queue of wantConns.
  1838. //
  1839. // Inspired by net/http/transport.go.
  1840. type wantConnQueue struct {
  1841. // This is a queue, not a dequeue.
  1842. // It is split into two stages - head[headPos:] and tail.
  1843. // popFront is trivial (headPos++) on the first stage, and
  1844. // pushBack is trivial (append) on the second stage.
  1845. // If the first stage is empty, popFront can swap the
  1846. // first and second stages to remedy the situation.
  1847. //
  1848. // This two-stage split is analogous to the use of two lists
  1849. // in Okasaki's purely functional queue but without the
  1850. // overhead of reversing the list when swapping stages.
  1851. head []*wantConn
  1852. headPos int
  1853. tail []*wantConn
  1854. }
  1855. // len returns the number of items in the queue.
  1856. func (q *wantConnQueue) len() int {
  1857. return len(q.head) - q.headPos + len(q.tail)
  1858. }
  1859. // pushBack adds w to the back of the queue.
  1860. func (q *wantConnQueue) pushBack(w *wantConn) {
  1861. q.tail = append(q.tail, w)
  1862. }
  1863. // popFront removes and returns the wantConn at the front of the queue.
  1864. func (q *wantConnQueue) popFront() *wantConn {
  1865. if q.headPos >= len(q.head) {
  1866. if len(q.tail) == 0 {
  1867. return nil
  1868. }
  1869. // Pick up tail as new head, clear tail.
  1870. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1871. }
  1872. w := q.head[q.headPos]
  1873. q.head[q.headPos] = nil
  1874. q.headPos++
  1875. return w
  1876. }
  1877. // peekFront returns the wantConn at the front of the queue without removing it.
  1878. func (q *wantConnQueue) peekFront() *wantConn {
  1879. if q.headPos < len(q.head) {
  1880. return q.head[q.headPos]
  1881. }
  1882. if len(q.tail) > 0 {
  1883. return q.tail[0]
  1884. }
  1885. return nil
  1886. }
  1887. // clearFront pops any wantConns that are no longer waiting from the head of the
  1888. // queue, reporting whether any were popped.
  1889. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1890. for {
  1891. w := q.peekFront()
  1892. if w == nil || w.waiting() {
  1893. return cleaned
  1894. }
  1895. q.popFront()
  1896. cleaned = true
  1897. }
  1898. }
  1899. // PipelineClient pipelines requests over a limited set of concurrent
  1900. // connections to the given Addr.
  1901. //
  1902. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1903. // context switches and network level overhead.
  1904. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1905. //
  1906. // It is forbidden copying PipelineClient instances. Create new instances
  1907. // instead.
  1908. //
  1909. // It is safe calling PipelineClient methods from concurrently running
  1910. // goroutines.
  1911. type PipelineClient struct {
  1912. noCopy noCopy
  1913. // Address of the host to connect to.
  1914. Addr string
  1915. // PipelineClient name. Used in User-Agent request header.
  1916. Name string
  1917. // NoDefaultUserAgentHeader when set to true, causes the default
  1918. // User-Agent header to be excluded from the Request.
  1919. NoDefaultUserAgentHeader bool
  1920. // The maximum number of concurrent connections to the Addr.
  1921. //
  1922. // A single connection is used by default.
  1923. MaxConns int
  1924. // The maximum number of pending pipelined requests over
  1925. // a single connection to Addr.
  1926. //
  1927. // DefaultMaxPendingRequests is used by default.
  1928. MaxPendingRequests int
  1929. // The maximum delay before sending pipelined requests as a batch
  1930. // to the server.
  1931. //
  1932. // By default requests are sent immediately to the server.
  1933. MaxBatchDelay time.Duration
  1934. // Callback for connection establishing to the host.
  1935. //
  1936. // Default Dial is used if not set.
  1937. Dial DialFunc
  1938. // Attempt to connect to both ipv4 and ipv6 host addresses
  1939. // if set to true.
  1940. //
  1941. // This option is used only if default TCP dialer is used,
  1942. // i.e. if Dial is blank.
  1943. //
  1944. // By default client connects only to ipv4 addresses,
  1945. // since unfortunately ipv6 remains broken in many networks worldwide :)
  1946. DialDualStack bool
  1947. // Response header names are passed as-is without normalization
  1948. // if this option is set.
  1949. //
  1950. // Disabled header names' normalization may be useful only for proxying
  1951. // responses to other clients expecting case-sensitive
  1952. // header names. See https://github.com/valyala/fasthttp/issues/57
  1953. // for details.
  1954. //
  1955. // By default request and response header names are normalized, i.e.
  1956. // The first letter and the first letters following dashes
  1957. // are uppercased, while all the other letters are lowercased.
  1958. // Examples:
  1959. //
  1960. // * HOST -> Host
  1961. // * content-type -> Content-Type
  1962. // * cONTENT-lenGTH -> Content-Length
  1963. DisableHeaderNamesNormalizing bool
  1964. // Path values are sent as-is without normalization
  1965. //
  1966. // Disabled path normalization may be useful for proxying incoming requests
  1967. // to servers that are expecting paths to be forwarded as-is.
  1968. //
  1969. // By default path values are normalized, i.e.
  1970. // extra slashes are removed, special characters are encoded.
  1971. DisablePathNormalizing bool
  1972. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  1973. IsTLS bool
  1974. // Optional TLS config.
  1975. TLSConfig *tls.Config
  1976. // Idle connection to the host is closed after this duration.
  1977. //
  1978. // By default idle connection is closed after
  1979. // DefaultMaxIdleConnDuration.
  1980. MaxIdleConnDuration time.Duration
  1981. // Buffer size for responses' reading.
  1982. // This also limits the maximum header size.
  1983. //
  1984. // Default buffer size is used if 0.
  1985. ReadBufferSize int
  1986. // Buffer size for requests' writing.
  1987. //
  1988. // Default buffer size is used if 0.
  1989. WriteBufferSize int
  1990. // Maximum duration for full response reading (including body).
  1991. //
  1992. // By default response read timeout is unlimited.
  1993. ReadTimeout time.Duration
  1994. // Maximum duration for full request writing (including body).
  1995. //
  1996. // By default request write timeout is unlimited.
  1997. WriteTimeout time.Duration
  1998. // Logger for logging client errors.
  1999. //
  2000. // By default standard logger from log package is used.
  2001. Logger Logger
  2002. connClients []*pipelineConnClient
  2003. connClientsLock sync.Mutex
  2004. }
  2005. type pipelineConnClient struct {
  2006. noCopy noCopy
  2007. Addr string
  2008. Name string
  2009. NoDefaultUserAgentHeader bool
  2010. MaxPendingRequests int
  2011. MaxBatchDelay time.Duration
  2012. Dial DialFunc
  2013. DialDualStack bool
  2014. DisableHeaderNamesNormalizing bool
  2015. DisablePathNormalizing bool
  2016. IsTLS bool
  2017. TLSConfig *tls.Config
  2018. MaxIdleConnDuration time.Duration
  2019. ReadBufferSize int
  2020. WriteBufferSize int
  2021. ReadTimeout time.Duration
  2022. WriteTimeout time.Duration
  2023. Logger Logger
  2024. workPool sync.Pool
  2025. chLock sync.Mutex
  2026. chW chan *pipelineWork
  2027. chR chan *pipelineWork
  2028. tlsConfigLock sync.Mutex
  2029. tlsConfig *tls.Config
  2030. }
  2031. type pipelineWork struct {
  2032. reqCopy Request
  2033. respCopy Response
  2034. req *Request
  2035. resp *Response
  2036. t *time.Timer
  2037. deadline time.Time
  2038. err error
  2039. done chan struct{}
  2040. }
  2041. // DoTimeout performs the given request and waits for response during
  2042. // the given timeout duration.
  2043. //
  2044. // Request must contain at least non-zero RequestURI with full url (including
  2045. // scheme and host) or non-zero Host header + RequestURI.
  2046. //
  2047. // The function doesn't follow redirects.
  2048. //
  2049. // Response is ignored if resp is nil.
  2050. //
  2051. // ErrTimeout is returned if the response wasn't returned during
  2052. // the given timeout.
  2053. //
  2054. // It is recommended obtaining req and resp via AcquireRequest
  2055. // and AcquireResponse in performance-critical code.
  2056. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2057. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2058. }
  2059. // DoDeadline performs the given request and waits for response until
  2060. // the given deadline.
  2061. //
  2062. // Request must contain at least non-zero RequestURI with full url (including
  2063. // scheme and host) or non-zero Host header + RequestURI.
  2064. //
  2065. // The function doesn't follow redirects.
  2066. //
  2067. // Response is ignored if resp is nil.
  2068. //
  2069. // ErrTimeout is returned if the response wasn't returned until
  2070. // the given deadline.
  2071. //
  2072. // It is recommended obtaining req and resp via AcquireRequest
  2073. // and AcquireResponse in performance-critical code.
  2074. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2075. return c.getConnClient().DoDeadline(req, resp, deadline)
  2076. }
  2077. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2078. c.init()
  2079. timeout := time.Until(deadline)
  2080. if timeout <= 0 {
  2081. return ErrTimeout
  2082. }
  2083. if c.DisablePathNormalizing {
  2084. req.URI().DisablePathNormalizing = true
  2085. }
  2086. userAgentOld := req.Header.UserAgent()
  2087. if len(userAgentOld) == 0 {
  2088. userAgent := c.Name
  2089. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2090. userAgent = defaultUserAgent
  2091. }
  2092. if userAgent != "" {
  2093. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2094. }
  2095. }
  2096. w := c.acquirePipelineWork(timeout)
  2097. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2098. w.req = &w.reqCopy
  2099. w.resp = &w.respCopy
  2100. // Make a copy of the request in order to avoid data races on timeouts
  2101. req.copyToSkipBody(&w.reqCopy)
  2102. swapRequestBody(req, &w.reqCopy)
  2103. // Put the request to outgoing queue
  2104. select {
  2105. case c.chW <- w:
  2106. // Fast path: len(c.ch) < cap(c.ch)
  2107. default:
  2108. // Slow path
  2109. select {
  2110. case c.chW <- w:
  2111. case <-w.t.C:
  2112. c.releasePipelineWork(w)
  2113. return ErrTimeout
  2114. }
  2115. }
  2116. // Wait for the response
  2117. var err error
  2118. select {
  2119. case <-w.done:
  2120. if resp != nil {
  2121. w.respCopy.copyToSkipBody(resp)
  2122. swapResponseBody(resp, &w.respCopy)
  2123. }
  2124. err = w.err
  2125. c.releasePipelineWork(w)
  2126. case <-w.t.C:
  2127. err = ErrTimeout
  2128. }
  2129. return err
  2130. }
  2131. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2132. v := c.workPool.Get()
  2133. if v != nil {
  2134. w = v.(*pipelineWork)
  2135. } else {
  2136. w = &pipelineWork{
  2137. done: make(chan struct{}, 1),
  2138. }
  2139. }
  2140. if timeout > 0 {
  2141. if w.t == nil {
  2142. w.t = time.NewTimer(timeout)
  2143. } else {
  2144. w.t.Reset(timeout)
  2145. }
  2146. w.deadline = time.Now().Add(timeout)
  2147. } else {
  2148. w.deadline = zeroTime
  2149. }
  2150. return w
  2151. }
  2152. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2153. if w.t != nil {
  2154. w.t.Stop()
  2155. }
  2156. w.reqCopy.Reset()
  2157. w.respCopy.Reset()
  2158. w.req = nil
  2159. w.resp = nil
  2160. w.err = nil
  2161. c.workPool.Put(w)
  2162. }
  2163. // Do performs the given http request and sets the corresponding response.
  2164. //
  2165. // Request must contain at least non-zero RequestURI with full url (including
  2166. // scheme and host) or non-zero Host header + RequestURI.
  2167. //
  2168. // The function doesn't follow redirects. Use Get* for following redirects.
  2169. //
  2170. // Response is ignored if resp is nil.
  2171. //
  2172. // It is recommended obtaining req and resp via AcquireRequest
  2173. // and AcquireResponse in performance-critical code.
  2174. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2175. return c.getConnClient().Do(req, resp)
  2176. }
  2177. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2178. c.init()
  2179. if c.DisablePathNormalizing {
  2180. req.URI().DisablePathNormalizing = true
  2181. }
  2182. userAgentOld := req.Header.UserAgent()
  2183. if len(userAgentOld) == 0 {
  2184. userAgent := c.Name
  2185. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2186. userAgent = defaultUserAgent
  2187. }
  2188. if userAgent != "" {
  2189. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2190. }
  2191. }
  2192. w := c.acquirePipelineWork(0)
  2193. w.req = req
  2194. if resp != nil {
  2195. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2196. w.resp = resp
  2197. } else {
  2198. w.resp = &w.respCopy
  2199. }
  2200. // Put the request to outgoing queue
  2201. select {
  2202. case c.chW <- w:
  2203. default:
  2204. // Try substituting the oldest w with the current one.
  2205. select {
  2206. case wOld := <-c.chW:
  2207. wOld.err = ErrPipelineOverflow
  2208. wOld.done <- struct{}{}
  2209. default:
  2210. }
  2211. select {
  2212. case c.chW <- w:
  2213. default:
  2214. c.releasePipelineWork(w)
  2215. return ErrPipelineOverflow
  2216. }
  2217. }
  2218. // Wait for the response
  2219. <-w.done
  2220. err := w.err
  2221. c.releasePipelineWork(w)
  2222. return err
  2223. }
  2224. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2225. c.connClientsLock.Lock()
  2226. cc := c.getConnClientUnlocked()
  2227. c.connClientsLock.Unlock()
  2228. return cc
  2229. }
  2230. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2231. if len(c.connClients) == 0 {
  2232. return c.newConnClient()
  2233. }
  2234. // Return the client with the minimum number of pending requests.
  2235. minCC := c.connClients[0]
  2236. minReqs := minCC.PendingRequests()
  2237. if minReqs == 0 {
  2238. return minCC
  2239. }
  2240. for i := 1; i < len(c.connClients); i++ {
  2241. cc := c.connClients[i]
  2242. reqs := cc.PendingRequests()
  2243. if reqs == 0 {
  2244. return cc
  2245. }
  2246. if reqs < minReqs {
  2247. minCC = cc
  2248. minReqs = reqs
  2249. }
  2250. }
  2251. maxConns := c.MaxConns
  2252. if maxConns <= 0 {
  2253. maxConns = 1
  2254. }
  2255. if len(c.connClients) < maxConns {
  2256. return c.newConnClient()
  2257. }
  2258. return minCC
  2259. }
  2260. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2261. cc := &pipelineConnClient{
  2262. Addr: c.Addr,
  2263. Name: c.Name,
  2264. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2265. MaxPendingRequests: c.MaxPendingRequests,
  2266. MaxBatchDelay: c.MaxBatchDelay,
  2267. Dial: c.Dial,
  2268. DialDualStack: c.DialDualStack,
  2269. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2270. DisablePathNormalizing: c.DisablePathNormalizing,
  2271. IsTLS: c.IsTLS,
  2272. TLSConfig: c.TLSConfig,
  2273. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2274. ReadBufferSize: c.ReadBufferSize,
  2275. WriteBufferSize: c.WriteBufferSize,
  2276. ReadTimeout: c.ReadTimeout,
  2277. WriteTimeout: c.WriteTimeout,
  2278. Logger: c.Logger,
  2279. }
  2280. c.connClients = append(c.connClients, cc)
  2281. return cc
  2282. }
  2283. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2284. // if the requests' queue is overflowed.
  2285. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
  2286. // DefaultMaxPendingRequests is the default value
  2287. // for PipelineClient.MaxPendingRequests.
  2288. const DefaultMaxPendingRequests = 1024
  2289. func (c *pipelineConnClient) init() {
  2290. c.chLock.Lock()
  2291. if c.chR == nil {
  2292. maxPendingRequests := c.MaxPendingRequests
  2293. if maxPendingRequests <= 0 {
  2294. maxPendingRequests = DefaultMaxPendingRequests
  2295. }
  2296. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2297. if c.chW == nil {
  2298. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2299. }
  2300. go func() {
  2301. // Keep restarting the worker if it fails (connection errors for example).
  2302. for {
  2303. if err := c.worker(); err != nil {
  2304. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2305. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2306. // Throttle client reconnections on timeout errors
  2307. time.Sleep(time.Second)
  2308. }
  2309. } else {
  2310. c.chLock.Lock()
  2311. stop := len(c.chR) == 0 && len(c.chW) == 0
  2312. if !stop {
  2313. c.chR = nil
  2314. c.chW = nil
  2315. }
  2316. c.chLock.Unlock()
  2317. if stop {
  2318. break
  2319. }
  2320. }
  2321. }
  2322. }()
  2323. }
  2324. c.chLock.Unlock()
  2325. }
  2326. func (c *pipelineConnClient) worker() error {
  2327. tlsConfig := c.cachedTLSConfig()
  2328. conn, err := dialAddr(c.Addr, c.Dial, nil, c.DialDualStack, c.IsTLS, tlsConfig, 0, c.WriteTimeout)
  2329. if err != nil {
  2330. return err
  2331. }
  2332. // Start reader and writer
  2333. stopW := make(chan struct{})
  2334. doneW := make(chan error)
  2335. go func() {
  2336. doneW <- c.writer(conn, stopW)
  2337. }()
  2338. stopR := make(chan struct{})
  2339. doneR := make(chan error)
  2340. go func() {
  2341. doneR <- c.reader(conn, stopR)
  2342. }()
  2343. // Wait until reader and writer are stopped
  2344. select {
  2345. case err = <-doneW:
  2346. conn.Close()
  2347. close(stopR)
  2348. <-doneR
  2349. case err = <-doneR:
  2350. conn.Close()
  2351. close(stopW)
  2352. <-doneW
  2353. }
  2354. // Notify pending readers
  2355. for len(c.chR) > 0 {
  2356. w := <-c.chR
  2357. w.err = errPipelineConnStopped
  2358. w.done <- struct{}{}
  2359. }
  2360. return err
  2361. }
  2362. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2363. if !c.IsTLS {
  2364. return nil
  2365. }
  2366. c.tlsConfigLock.Lock()
  2367. cfg := c.tlsConfig
  2368. if cfg == nil {
  2369. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2370. c.tlsConfig = cfg
  2371. }
  2372. c.tlsConfigLock.Unlock()
  2373. return cfg
  2374. }
  2375. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2376. writeBufferSize := c.WriteBufferSize
  2377. if writeBufferSize <= 0 {
  2378. writeBufferSize = defaultWriteBufferSize
  2379. }
  2380. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2381. defer bw.Flush()
  2382. chR := c.chR
  2383. chW := c.chW
  2384. writeTimeout := c.WriteTimeout
  2385. maxIdleConnDuration := c.MaxIdleConnDuration
  2386. if maxIdleConnDuration <= 0 {
  2387. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2388. }
  2389. maxBatchDelay := c.MaxBatchDelay
  2390. var (
  2391. stopTimer = time.NewTimer(time.Hour)
  2392. flushTimer = time.NewTimer(time.Hour)
  2393. flushTimerCh <-chan time.Time
  2394. instantTimerCh = make(chan time.Time)
  2395. w *pipelineWork
  2396. err error
  2397. )
  2398. close(instantTimerCh)
  2399. for {
  2400. againChW:
  2401. select {
  2402. case w = <-chW:
  2403. // Fast path: len(chW) > 0
  2404. default:
  2405. // Slow path
  2406. stopTimer.Reset(maxIdleConnDuration)
  2407. select {
  2408. case w = <-chW:
  2409. case <-stopTimer.C:
  2410. return nil
  2411. case <-stopCh:
  2412. return nil
  2413. case <-flushTimerCh:
  2414. if err = bw.Flush(); err != nil {
  2415. return err
  2416. }
  2417. flushTimerCh = nil
  2418. goto againChW
  2419. }
  2420. }
  2421. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2422. w.err = ErrTimeout
  2423. w.done <- struct{}{}
  2424. continue
  2425. }
  2426. w.resp.parseNetConn(conn)
  2427. if writeTimeout > 0 {
  2428. // Set Deadline every time, since golang has fixed the performance issue
  2429. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2430. currentTime := time.Now()
  2431. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2432. w.err = err
  2433. w.done <- struct{}{}
  2434. return err
  2435. }
  2436. }
  2437. if err = w.req.Write(bw); err != nil {
  2438. w.err = err
  2439. w.done <- struct{}{}
  2440. return err
  2441. }
  2442. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2443. if maxBatchDelay > 0 {
  2444. flushTimer.Reset(maxBatchDelay)
  2445. flushTimerCh = flushTimer.C
  2446. } else {
  2447. flushTimerCh = instantTimerCh
  2448. }
  2449. }
  2450. againChR:
  2451. select {
  2452. case chR <- w:
  2453. // Fast path: len(chR) < cap(chR)
  2454. default:
  2455. // Slow path
  2456. select {
  2457. case chR <- w:
  2458. case <-stopCh:
  2459. w.err = errPipelineConnStopped
  2460. w.done <- struct{}{}
  2461. return nil
  2462. case <-flushTimerCh:
  2463. if err = bw.Flush(); err != nil {
  2464. w.err = err
  2465. w.done <- struct{}{}
  2466. return err
  2467. }
  2468. flushTimerCh = nil
  2469. goto againChR
  2470. }
  2471. }
  2472. }
  2473. }
  2474. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2475. readBufferSize := c.ReadBufferSize
  2476. if readBufferSize <= 0 {
  2477. readBufferSize = defaultReadBufferSize
  2478. }
  2479. br := bufio.NewReaderSize(conn, readBufferSize)
  2480. chR := c.chR
  2481. readTimeout := c.ReadTimeout
  2482. var (
  2483. w *pipelineWork
  2484. err error
  2485. )
  2486. for {
  2487. select {
  2488. case w = <-chR:
  2489. // Fast path: len(chR) > 0
  2490. default:
  2491. // Slow path
  2492. select {
  2493. case w = <-chR:
  2494. case <-stopCh:
  2495. return nil
  2496. }
  2497. }
  2498. if readTimeout > 0 {
  2499. // Set Deadline every time, since golang has fixed the performance issue
  2500. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2501. currentTime := time.Now()
  2502. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2503. w.err = err
  2504. w.done <- struct{}{}
  2505. return err
  2506. }
  2507. }
  2508. if err = w.resp.Read(br); err != nil {
  2509. w.err = err
  2510. w.done <- struct{}{}
  2511. return err
  2512. }
  2513. w.done <- struct{}{}
  2514. }
  2515. }
  2516. func (c *pipelineConnClient) logger() Logger {
  2517. if c.Logger != nil {
  2518. return c.Logger
  2519. }
  2520. return defaultLogger
  2521. }
  2522. // PendingRequests returns the current number of pending requests pipelined
  2523. // to the server.
  2524. //
  2525. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2526. // each connection to the server may keep up to MaxPendingRequests requests
  2527. // in the queue before sending them to the server.
  2528. //
  2529. // This function may be used for balancing load among multiple PipelineClient
  2530. // instances.
  2531. func (c *PipelineClient) PendingRequests() int {
  2532. c.connClientsLock.Lock()
  2533. n := 0
  2534. for _, cc := range c.connClients {
  2535. n += cc.PendingRequests()
  2536. }
  2537. c.connClientsLock.Unlock()
  2538. return n
  2539. }
  2540. func (c *pipelineConnClient) PendingRequests() int {
  2541. c.init()
  2542. c.chLock.Lock()
  2543. n := len(c.chR) + len(c.chW)
  2544. c.chLock.Unlock()
  2545. return n
  2546. }
  2547. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2548. var DefaultTransport RoundTripper = &transport{}
  2549. type transport struct{}
  2550. func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
  2551. customSkipBody := resp.SkipBody
  2552. customStreamBody := resp.StreamBody
  2553. var deadline time.Time
  2554. if req.timeout > 0 {
  2555. deadline = time.Now().Add(req.timeout)
  2556. }
  2557. cc, err := hc.acquireConn(req.timeout, req.ConnectionClose())
  2558. if err != nil {
  2559. return false, err
  2560. }
  2561. conn := cc.c
  2562. resp.parseNetConn(conn)
  2563. writeDeadline := deadline
  2564. if hc.WriteTimeout > 0 {
  2565. tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
  2566. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  2567. writeDeadline = tmpWriteDeadline
  2568. }
  2569. }
  2570. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  2571. hc.closeConn(cc)
  2572. return true, err
  2573. }
  2574. resetConnection := false
  2575. if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
  2576. req.SetConnectionClose()
  2577. resetConnection = true
  2578. }
  2579. bw := hc.acquireWriter(conn)
  2580. err = req.Write(bw)
  2581. if resetConnection {
  2582. req.Header.ResetConnectionClose()
  2583. }
  2584. if err == nil {
  2585. err = bw.Flush()
  2586. }
  2587. hc.releaseWriter(bw)
  2588. // Return ErrTimeout on any timeout.
  2589. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  2590. err = ErrTimeout
  2591. }
  2592. isConnRST := isConnectionReset(err)
  2593. if err != nil && !isConnRST {
  2594. hc.closeConn(cc)
  2595. return true, err
  2596. }
  2597. readDeadline := deadline
  2598. if hc.ReadTimeout > 0 {
  2599. tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
  2600. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  2601. readDeadline = tmpReadDeadline
  2602. }
  2603. }
  2604. if err = conn.SetReadDeadline(readDeadline); err != nil {
  2605. hc.closeConn(cc)
  2606. return true, err
  2607. }
  2608. if customSkipBody || req.Header.IsHead() {
  2609. resp.SkipBody = true
  2610. }
  2611. if hc.DisableHeaderNamesNormalizing {
  2612. resp.Header.DisableNormalizing()
  2613. }
  2614. br := hc.acquireReader(conn)
  2615. err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
  2616. if err != nil {
  2617. hc.releaseReader(br)
  2618. hc.closeConn(cc)
  2619. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  2620. needRetry := err != ErrBodyTooLarge
  2621. return needRetry, err
  2622. }
  2623. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST
  2624. if customStreamBody && resp.bodyStream != nil {
  2625. rbs := resp.bodyStream
  2626. resp.bodyStream = newCloseReader(rbs, func() error {
  2627. hc.releaseReader(br)
  2628. if r, ok := rbs.(*requestStream); ok {
  2629. releaseRequestStream(r)
  2630. }
  2631. if closeConn || resp.ConnectionClose() {
  2632. hc.closeConn(cc)
  2633. } else {
  2634. hc.releaseConn(cc)
  2635. }
  2636. return nil
  2637. })
  2638. return false, nil
  2639. } else {
  2640. hc.releaseReader(br)
  2641. }
  2642. if closeConn {
  2643. hc.closeConn(cc)
  2644. } else {
  2645. hc.releaseConn(cc)
  2646. }
  2647. return false, nil
  2648. }