client.go 78 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958
  1. package fasthttp
  2. import (
  3. "bufio"
  4. "crypto/tls"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "net"
  9. "strings"
  10. "sync"
  11. "sync/atomic"
  12. "time"
  13. )
  14. // Do performs the given http request and fills the given http response.
  15. //
  16. // Request must contain at least non-zero RequestURI with full url (including
  17. // scheme and host) or non-zero Host header + RequestURI.
  18. //
  19. // Client determines the server to be requested in the following order:
  20. //
  21. // - from RequestURI if it contains full url with scheme and host;
  22. // - from Host header otherwise.
  23. //
  24. // The function doesn't follow redirects. Use Get* for following redirects.
  25. //
  26. // Response is ignored if resp is nil.
  27. //
  28. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  29. // to the requested host are busy.
  30. //
  31. // It is recommended obtaining req and resp via AcquireRequest
  32. // and AcquireResponse in performance-critical code.
  33. func Do(req *Request, resp *Response) error {
  34. return defaultClient.Do(req, resp)
  35. }
  36. // DoTimeout performs the given request and waits for response during
  37. // the given timeout duration.
  38. //
  39. // Request must contain at least non-zero RequestURI with full url (including
  40. // scheme and host) or non-zero Host header + RequestURI.
  41. //
  42. // Client determines the server to be requested in the following order:
  43. //
  44. // - from RequestURI if it contains full url with scheme and host;
  45. // - from Host header otherwise.
  46. //
  47. // The function doesn't follow redirects. Use Get* for following redirects.
  48. //
  49. // Response is ignored if resp is nil.
  50. //
  51. // ErrTimeout is returned if the response wasn't returned during
  52. // the given timeout.
  53. //
  54. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  55. // to the requested host are busy.
  56. //
  57. // It is recommended obtaining req and resp via AcquireRequest
  58. // and AcquireResponse in performance-critical code.
  59. func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  60. return defaultClient.DoTimeout(req, resp, timeout)
  61. }
  62. // DoDeadline performs the given request and waits for response until
  63. // the given deadline.
  64. //
  65. // Request must contain at least non-zero RequestURI with full url (including
  66. // scheme and host) or non-zero Host header + RequestURI.
  67. //
  68. // Client determines the server to be requested in the following order:
  69. //
  70. // - from RequestURI if it contains full url with scheme and host;
  71. // - from Host header otherwise.
  72. //
  73. // The function doesn't follow redirects. Use Get* for following redirects.
  74. //
  75. // Response is ignored if resp is nil.
  76. //
  77. // ErrTimeout is returned if the response wasn't returned until
  78. // the given deadline.
  79. //
  80. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  81. // to the requested host are busy.
  82. //
  83. // It is recommended obtaining req and resp via AcquireRequest
  84. // and AcquireResponse in performance-critical code.
  85. func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  86. return defaultClient.DoDeadline(req, resp, deadline)
  87. }
  88. // DoRedirects performs the given http request and fills the given http response,
  89. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  90. // maxRedirectsCount, ErrTooManyRedirects is returned.
  91. //
  92. // Request must contain at least non-zero RequestURI with full url (including
  93. // scheme and host) or non-zero Host header + RequestURI.
  94. //
  95. // Client determines the server to be requested in the following order:
  96. //
  97. // - from RequestURI if it contains full url with scheme and host;
  98. // - from Host header otherwise.
  99. //
  100. // Response is ignored if resp is nil.
  101. //
  102. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  103. // to the requested host are busy.
  104. //
  105. // It is recommended obtaining req and resp via AcquireRequest
  106. // and AcquireResponse in performance-critical code.
  107. func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  108. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
  109. return err
  110. }
  111. // Get returns the status code and body of url.
  112. //
  113. // The contents of dst will be replaced by the body and returned, if the dst
  114. // is too small a new slice will be allocated.
  115. //
  116. // The function follows redirects. Use Do* for manually handling redirects.
  117. func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  118. return defaultClient.Get(dst, url)
  119. }
  120. // GetTimeout returns the status code and body of url.
  121. //
  122. // The contents of dst will be replaced by the body and returned, if the dst
  123. // is too small a new slice will be allocated.
  124. //
  125. // The function follows redirects. Use Do* for manually handling redirects.
  126. //
  127. // ErrTimeout error is returned if url contents couldn't be fetched
  128. // during the given timeout.
  129. func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  130. return defaultClient.GetTimeout(dst, url, timeout)
  131. }
  132. // GetDeadline returns the status code and body of url.
  133. //
  134. // The contents of dst will be replaced by the body and returned, if the dst
  135. // is too small a new slice will be allocated.
  136. //
  137. // The function follows redirects. Use Do* for manually handling redirects.
  138. //
  139. // ErrTimeout error is returned if url contents couldn't be fetched
  140. // until the given deadline.
  141. func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  142. return defaultClient.GetDeadline(dst, url, deadline)
  143. }
  144. // Post sends POST request to the given url with the given POST arguments.
  145. //
  146. // The contents of dst will be replaced by the body and returned, if the dst
  147. // is too small a new slice will be allocated.
  148. //
  149. // The function follows redirects. Use Do* for manually handling redirects.
  150. //
  151. // Empty POST body is sent if postArgs is nil.
  152. func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  153. return defaultClient.Post(dst, url, postArgs)
  154. }
  155. var defaultClient Client
  156. // Client implements http client.
  157. //
  158. // Copying Client by value is prohibited. Create new instance instead.
  159. //
  160. // It is safe calling Client methods from concurrently running goroutines.
  161. //
  162. // The fields of a Client should not be changed while it is in use.
  163. type Client struct {
  164. noCopy noCopy
  165. // Client name. Used in User-Agent request header.
  166. //
  167. // Default client name is used if not set.
  168. Name string
  169. // NoDefaultUserAgentHeader when set to true, causes the default
  170. // User-Agent header to be excluded from the Request.
  171. NoDefaultUserAgentHeader bool
  172. // Callback for establishing new connections to hosts.
  173. //
  174. // Default Dial is used if not set.
  175. Dial DialFunc
  176. // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
  177. //
  178. // This option is used only if default TCP dialer is used,
  179. // i.e. if Dial is blank.
  180. //
  181. // By default client connects only to ipv4 addresses,
  182. // since unfortunately ipv6 remains broken in many networks worldwide :)
  183. DialDualStack bool
  184. // TLS config for https connections.
  185. //
  186. // Default TLS config is used if not set.
  187. TLSConfig *tls.Config
  188. // Maximum number of connections per each host which may be established.
  189. //
  190. // DefaultMaxConnsPerHost is used if not set.
  191. MaxConnsPerHost int
  192. // Idle keep-alive connections are closed after this duration.
  193. //
  194. // By default idle connections are closed
  195. // after DefaultMaxIdleConnDuration.
  196. MaxIdleConnDuration time.Duration
  197. // Keep-alive connections are closed after this duration.
  198. //
  199. // By default connection duration is unlimited.
  200. MaxConnDuration time.Duration
  201. // Maximum number of attempts for idempotent calls
  202. //
  203. // DefaultMaxIdemponentCallAttempts is used if not set.
  204. MaxIdemponentCallAttempts int
  205. // Per-connection buffer size for responses' reading.
  206. // This also limits the maximum header size.
  207. //
  208. // Default buffer size is used if 0.
  209. ReadBufferSize int
  210. // Per-connection buffer size for requests' writing.
  211. //
  212. // Default buffer size is used if 0.
  213. WriteBufferSize int
  214. // Maximum duration for full response reading (including body).
  215. //
  216. // By default response read timeout is unlimited.
  217. ReadTimeout time.Duration
  218. // Maximum duration for full request writing (including body).
  219. //
  220. // By default request write timeout is unlimited.
  221. WriteTimeout time.Duration
  222. // Maximum response body size.
  223. //
  224. // The client returns ErrBodyTooLarge if this limit is greater than 0
  225. // and response body is greater than the limit.
  226. //
  227. // By default response body size is unlimited.
  228. MaxResponseBodySize int
  229. // Header names are passed as-is without normalization
  230. // if this option is set.
  231. //
  232. // Disabled header names' normalization may be useful only for proxying
  233. // responses to other clients expecting case-sensitive
  234. // header names. See https://github.com/valyala/fasthttp/issues/57
  235. // for details.
  236. //
  237. // By default request and response header names are normalized, i.e.
  238. // The first letter and the first letters following dashes
  239. // are uppercased, while all the other letters are lowercased.
  240. // Examples:
  241. //
  242. // * HOST -> Host
  243. // * content-type -> Content-Type
  244. // * cONTENT-lenGTH -> Content-Length
  245. DisableHeaderNamesNormalizing bool
  246. // Path values are sent as-is without normalization
  247. //
  248. // Disabled path normalization may be useful for proxying incoming requests
  249. // to servers that are expecting paths to be forwarded as-is.
  250. //
  251. // By default path values are normalized, i.e.
  252. // extra slashes are removed, special characters are encoded.
  253. DisablePathNormalizing bool
  254. // Maximum duration for waiting for a free connection.
  255. //
  256. // By default will not waiting, return ErrNoFreeConns immediately
  257. MaxConnWaitTimeout time.Duration
  258. // RetryIf controls whether a retry should be attempted after an error.
  259. //
  260. // By default will use isIdempotent function
  261. RetryIf RetryIfFunc
  262. // Connection pool strategy. Can be either LIFO or FIFO (default).
  263. ConnPoolStrategy ConnPoolStrategyType
  264. // StreamResponseBody enables response body streaming
  265. StreamResponseBody bool
  266. // ConfigureClient configures the fasthttp.HostClient.
  267. ConfigureClient func(hc *HostClient) error
  268. mLock sync.RWMutex
  269. mOnce sync.Once
  270. m map[string]*HostClient
  271. ms map[string]*HostClient
  272. readerPool sync.Pool
  273. writerPool sync.Pool
  274. }
  275. // Get returns the status code and body of url.
  276. //
  277. // The contents of dst will be replaced by the body and returned, if the dst
  278. // is too small a new slice will be allocated.
  279. //
  280. // The function follows redirects. Use Do* for manually handling redirects.
  281. func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  282. return clientGetURL(dst, url, c)
  283. }
  284. // GetTimeout returns the status code and body of url.
  285. //
  286. // The contents of dst will be replaced by the body and returned, if the dst
  287. // is too small a new slice will be allocated.
  288. //
  289. // The function follows redirects. Use Do* for manually handling redirects.
  290. //
  291. // ErrTimeout error is returned if url contents couldn't be fetched
  292. // during the given timeout.
  293. func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  294. return clientGetURLTimeout(dst, url, timeout, c)
  295. }
  296. // GetDeadline returns the status code and body of url.
  297. //
  298. // The contents of dst will be replaced by the body and returned, if the dst
  299. // is too small a new slice will be allocated.
  300. //
  301. // The function follows redirects. Use Do* for manually handling redirects.
  302. //
  303. // ErrTimeout error is returned if url contents couldn't be fetched
  304. // until the given deadline.
  305. func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  306. return clientGetURLDeadline(dst, url, deadline, c)
  307. }
  308. // Post sends POST request to the given url with the given POST arguments.
  309. //
  310. // The contents of dst will be replaced by the body and returned, if the dst
  311. // is too small a new slice will be allocated.
  312. //
  313. // The function follows redirects. Use Do* for manually handling redirects.
  314. //
  315. // Empty POST body is sent if postArgs is nil.
  316. func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  317. return clientPostURL(dst, url, postArgs, c)
  318. }
  319. // DoTimeout performs the given request and waits for response during
  320. // the given timeout duration.
  321. //
  322. // Request must contain at least non-zero RequestURI with full url (including
  323. // scheme and host) or non-zero Host header + RequestURI.
  324. //
  325. // Client determines the server to be requested in the following order:
  326. //
  327. // - from RequestURI if it contains full url with scheme and host;
  328. // - from Host header otherwise.
  329. //
  330. // The function doesn't follow redirects. Use Get* for following redirects.
  331. //
  332. // Response is ignored if resp is nil.
  333. //
  334. // ErrTimeout is returned if the response wasn't returned during
  335. // the given timeout.
  336. // Immediately returns ErrTimeout if timeout value is negative.
  337. //
  338. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  339. // to the requested host are busy.
  340. //
  341. // It is recommended obtaining req and resp via AcquireRequest
  342. // and AcquireResponse in performance-critical code.
  343. func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  344. req.timeout = timeout
  345. if req.timeout <= 0 {
  346. return ErrTimeout
  347. }
  348. return c.Do(req, resp)
  349. }
  350. // DoDeadline performs the given request and waits for response until
  351. // the given deadline.
  352. //
  353. // Request must contain at least non-zero RequestURI with full url (including
  354. // scheme and host) or non-zero Host header + RequestURI.
  355. //
  356. // Client determines the server to be requested in the following order:
  357. //
  358. // - from RequestURI if it contains full url with scheme and host;
  359. // - from Host header otherwise.
  360. //
  361. // The function doesn't follow redirects. Use Get* for following redirects.
  362. //
  363. // Response is ignored if resp is nil.
  364. //
  365. // ErrTimeout is returned if the response wasn't returned until
  366. // the given deadline.
  367. // Immediately returns ErrTimeout if the deadline has already been reached.
  368. //
  369. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  370. // to the requested host are busy.
  371. //
  372. // It is recommended obtaining req and resp via AcquireRequest
  373. // and AcquireResponse in performance-critical code.
  374. func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  375. req.timeout = time.Until(deadline)
  376. if req.timeout <= 0 {
  377. return ErrTimeout
  378. }
  379. return c.Do(req, resp)
  380. }
  381. // DoRedirects performs the given http request and fills the given http response,
  382. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  383. // maxRedirectsCount, ErrTooManyRedirects is returned.
  384. //
  385. // Request must contain at least non-zero RequestURI with full url (including
  386. // scheme and host) or non-zero Host header + RequestURI.
  387. //
  388. // Client determines the server to be requested in the following order:
  389. //
  390. // - from RequestURI if it contains full url with scheme and host;
  391. // - from Host header otherwise.
  392. //
  393. // Response is ignored if resp is nil.
  394. //
  395. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  396. // to the requested host are busy.
  397. //
  398. // It is recommended obtaining req and resp via AcquireRequest
  399. // and AcquireResponse in performance-critical code.
  400. func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  401. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  402. return err
  403. }
  404. // Do performs the given http request and fills the given http response.
  405. //
  406. // Request must contain at least non-zero RequestURI with full url (including
  407. // scheme and host) or non-zero Host header + RequestURI.
  408. //
  409. // Client determines the server to be requested in the following order:
  410. //
  411. // - from RequestURI if it contains full url with scheme and host;
  412. // - from Host header otherwise.
  413. //
  414. // Response is ignored if resp is nil.
  415. //
  416. // The function doesn't follow redirects. Use Get* for following redirects.
  417. //
  418. // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
  419. // to the requested host are busy.
  420. //
  421. // It is recommended obtaining req and resp via AcquireRequest
  422. // and AcquireResponse in performance-critical code.
  423. func (c *Client) Do(req *Request, resp *Response) error {
  424. uri := req.URI()
  425. if uri == nil {
  426. return ErrorInvalidURI
  427. }
  428. host := uri.Host()
  429. isTLS := false
  430. if uri.isHTTPS() {
  431. isTLS = true
  432. } else if !uri.isHTTP() {
  433. return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
  434. }
  435. c.mOnce.Do(func() {
  436. c.mLock.Lock()
  437. c.m = make(map[string]*HostClient)
  438. c.ms = make(map[string]*HostClient)
  439. c.mLock.Unlock()
  440. })
  441. startCleaner := false
  442. c.mLock.RLock()
  443. m := c.m
  444. if isTLS {
  445. m = c.ms
  446. }
  447. hc := m[string(host)]
  448. if hc != nil {
  449. atomic.AddInt32(&hc.pendingClientRequests, 1)
  450. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  451. }
  452. c.mLock.RUnlock()
  453. if hc == nil {
  454. c.mLock.Lock()
  455. hc = m[string(host)]
  456. if hc == nil {
  457. hc = &HostClient{
  458. Addr: AddMissingPort(string(host), isTLS),
  459. Name: c.Name,
  460. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  461. Dial: c.Dial,
  462. DialDualStack: c.DialDualStack,
  463. IsTLS: isTLS,
  464. TLSConfig: c.TLSConfig,
  465. MaxConns: c.MaxConnsPerHost,
  466. MaxIdleConnDuration: c.MaxIdleConnDuration,
  467. MaxConnDuration: c.MaxConnDuration,
  468. MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
  469. ReadBufferSize: c.ReadBufferSize,
  470. WriteBufferSize: c.WriteBufferSize,
  471. ReadTimeout: c.ReadTimeout,
  472. WriteTimeout: c.WriteTimeout,
  473. MaxResponseBodySize: c.MaxResponseBodySize,
  474. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  475. DisablePathNormalizing: c.DisablePathNormalizing,
  476. MaxConnWaitTimeout: c.MaxConnWaitTimeout,
  477. RetryIf: c.RetryIf,
  478. ConnPoolStrategy: c.ConnPoolStrategy,
  479. StreamResponseBody: c.StreamResponseBody,
  480. clientReaderPool: &c.readerPool,
  481. clientWriterPool: &c.writerPool,
  482. }
  483. if c.ConfigureClient != nil {
  484. if err := c.ConfigureClient(hc); err != nil {
  485. c.mLock.Unlock()
  486. return err
  487. }
  488. }
  489. m[string(host)] = hc
  490. if len(m) == 1 {
  491. startCleaner = true
  492. }
  493. }
  494. atomic.AddInt32(&hc.pendingClientRequests, 1)
  495. defer atomic.AddInt32(&hc.pendingClientRequests, -1)
  496. c.mLock.Unlock()
  497. }
  498. if startCleaner {
  499. go c.mCleaner(m)
  500. }
  501. return hc.Do(req, resp)
  502. }
  503. // CloseIdleConnections closes any connections which were previously
  504. // connected from previous requests but are now sitting idle in a
  505. // "keep-alive" state. It does not interrupt any connections currently
  506. // in use.
  507. func (c *Client) CloseIdleConnections() {
  508. c.mLock.RLock()
  509. for _, v := range c.m {
  510. v.CloseIdleConnections()
  511. }
  512. for _, v := range c.ms {
  513. v.CloseIdleConnections()
  514. }
  515. c.mLock.RUnlock()
  516. }
  517. func (c *Client) mCleaner(m map[string]*HostClient) {
  518. mustStop := false
  519. sleep := c.MaxIdleConnDuration
  520. if sleep < time.Second {
  521. sleep = time.Second
  522. } else if sleep > 10*time.Second {
  523. sleep = 10 * time.Second
  524. }
  525. for {
  526. time.Sleep(sleep)
  527. c.mLock.Lock()
  528. for k, v := range m {
  529. v.connsLock.Lock()
  530. /* #nosec G601 */
  531. if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
  532. delete(m, k)
  533. }
  534. v.connsLock.Unlock()
  535. }
  536. if len(m) == 0 {
  537. mustStop = true
  538. }
  539. c.mLock.Unlock()
  540. if mustStop {
  541. break
  542. }
  543. }
  544. }
  545. // DefaultMaxConnsPerHost is the maximum number of concurrent connections
  546. // http client may establish per host by default (i.e. if
  547. // Client.MaxConnsPerHost isn't set).
  548. const DefaultMaxConnsPerHost = 512
  549. // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
  550. // connection is closed.
  551. const DefaultMaxIdleConnDuration = 10 * time.Second
  552. // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
  553. const DefaultMaxIdemponentCallAttempts = 5
  554. // DialFunc must establish connection to addr.
  555. //
  556. // There is no need in establishing TLS (SSL) connection for https.
  557. // The client automatically converts connection to TLS
  558. // if HostClient.IsTLS is set.
  559. //
  560. // TCP address passed to DialFunc always contains host and port.
  561. // Example TCP addr values:
  562. //
  563. // - foobar.com:80
  564. // - foobar.com:443
  565. // - foobar.com:8080
  566. type DialFunc func(addr string) (net.Conn, error)
  567. // RetryIfFunc signature of retry if function
  568. //
  569. // Request argument passed to RetryIfFunc, if there are any request errors.
  570. type RetryIfFunc func(request *Request) bool
  571. // RoundTripper wraps every request/response.
  572. type RoundTripper interface {
  573. RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
  574. }
  575. // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue
  576. type ConnPoolStrategyType int
  577. const (
  578. FIFO ConnPoolStrategyType = iota
  579. LIFO
  580. )
  581. // HostClient balances http requests among hosts listed in Addr.
  582. //
  583. // HostClient may be used for balancing load among multiple upstream hosts.
  584. // While multiple addresses passed to HostClient.Addr may be used for balancing
  585. // load among them, it would be better using LBClient instead, since HostClient
  586. // may unevenly balance load among upstream hosts.
  587. //
  588. // It is forbidden copying HostClient instances. Create new instances instead.
  589. //
  590. // It is safe calling HostClient methods from concurrently running goroutines.
  591. type HostClient struct {
  592. noCopy noCopy
  593. // Comma-separated list of upstream HTTP server host addresses,
  594. // which are passed to Dial in a round-robin manner.
  595. //
  596. // Each address may contain port if default dialer is used.
  597. // For example,
  598. //
  599. // - foobar.com:80
  600. // - foobar.com:443
  601. // - foobar.com:8080
  602. Addr string
  603. // Client name. Used in User-Agent request header.
  604. Name string
  605. // NoDefaultUserAgentHeader when set to true, causes the default
  606. // User-Agent header to be excluded from the Request.
  607. NoDefaultUserAgentHeader bool
  608. // Callback for establishing new connection to the host.
  609. //
  610. // Default Dial is used if not set.
  611. Dial DialFunc
  612. // Attempt to connect to both ipv4 and ipv6 host addresses
  613. // if set to true.
  614. //
  615. // This option is used only if default TCP dialer is used,
  616. // i.e. if Dial is blank.
  617. //
  618. // By default client connects only to ipv4 addresses,
  619. // since unfortunately ipv6 remains broken in many networks worldwide :)
  620. DialDualStack bool
  621. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  622. IsTLS bool
  623. // Optional TLS config.
  624. TLSConfig *tls.Config
  625. // Maximum number of connections which may be established to all hosts
  626. // listed in Addr.
  627. //
  628. // You can change this value while the HostClient is being used
  629. // with HostClient.SetMaxConns(value)
  630. //
  631. // DefaultMaxConnsPerHost is used if not set.
  632. MaxConns int
  633. // Keep-alive connections are closed after this duration.
  634. //
  635. // By default connection duration is unlimited.
  636. MaxConnDuration time.Duration
  637. // Idle keep-alive connections are closed after this duration.
  638. //
  639. // By default idle connections are closed
  640. // after DefaultMaxIdleConnDuration.
  641. MaxIdleConnDuration time.Duration
  642. // Maximum number of attempts for idempotent calls
  643. //
  644. // DefaultMaxIdemponentCallAttempts is used if not set.
  645. MaxIdemponentCallAttempts int
  646. // Per-connection buffer size for responses' reading.
  647. // This also limits the maximum header size.
  648. //
  649. // Default buffer size is used if 0.
  650. ReadBufferSize int
  651. // Per-connection buffer size for requests' writing.
  652. //
  653. // Default buffer size is used if 0.
  654. WriteBufferSize int
  655. // Maximum duration for full response reading (including body).
  656. //
  657. // By default response read timeout is unlimited.
  658. ReadTimeout time.Duration
  659. // Maximum duration for full request writing (including body).
  660. //
  661. // By default request write timeout is unlimited.
  662. WriteTimeout time.Duration
  663. // Maximum response body size.
  664. //
  665. // The client returns ErrBodyTooLarge if this limit is greater than 0
  666. // and response body is greater than the limit.
  667. //
  668. // By default response body size is unlimited.
  669. MaxResponseBodySize int
  670. // Header names are passed as-is without normalization
  671. // if this option is set.
  672. //
  673. // Disabled header names' normalization may be useful only for proxying
  674. // responses to other clients expecting case-sensitive
  675. // header names. See https://github.com/valyala/fasthttp/issues/57
  676. // for details.
  677. //
  678. // By default request and response header names are normalized, i.e.
  679. // The first letter and the first letters following dashes
  680. // are uppercased, while all the other letters are lowercased.
  681. // Examples:
  682. //
  683. // * HOST -> Host
  684. // * content-type -> Content-Type
  685. // * cONTENT-lenGTH -> Content-Length
  686. DisableHeaderNamesNormalizing bool
  687. // Path values are sent as-is without normalization
  688. //
  689. // Disabled path normalization may be useful for proxying incoming requests
  690. // to servers that are expecting paths to be forwarded as-is.
  691. //
  692. // By default path values are normalized, i.e.
  693. // extra slashes are removed, special characters are encoded.
  694. DisablePathNormalizing bool
  695. // Will not log potentially sensitive content in error logs
  696. //
  697. // This option is useful for servers that handle sensitive data
  698. // in the request/response.
  699. //
  700. // Client logs full errors by default.
  701. SecureErrorLogMessage bool
  702. // Maximum duration for waiting for a free connection.
  703. //
  704. // By default will not waiting, return ErrNoFreeConns immediately
  705. MaxConnWaitTimeout time.Duration
  706. // RetryIf controls whether a retry should be attempted after an error.
  707. //
  708. // By default will use isIdempotent function
  709. RetryIf RetryIfFunc
  710. // Transport defines a transport-like mechanism that wraps every request/response.
  711. Transport RoundTripper
  712. // Connection pool strategy. Can be either LIFO or FIFO (default).
  713. ConnPoolStrategy ConnPoolStrategyType
  714. // StreamResponseBody enables response body streaming
  715. StreamResponseBody bool
  716. lastUseTime uint32
  717. connsLock sync.Mutex
  718. connsCount int
  719. conns []*clientConn
  720. connsWait *wantConnQueue
  721. addrsLock sync.Mutex
  722. addrs []string
  723. addrIdx uint32
  724. tlsConfigMap map[string]*tls.Config
  725. tlsConfigMapLock sync.Mutex
  726. readerPool sync.Pool
  727. writerPool sync.Pool
  728. clientReaderPool *sync.Pool
  729. clientWriterPool *sync.Pool
  730. pendingRequests int32
  731. // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
  732. // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
  733. pendingClientRequests int32
  734. connsCleanerRun bool
  735. }
  736. type clientConn struct {
  737. c net.Conn
  738. createdTime time.Time
  739. lastUseTime time.Time
  740. }
  741. var startTimeUnix = time.Now().Unix()
  742. // LastUseTime returns time the client was last used
  743. func (c *HostClient) LastUseTime() time.Time {
  744. n := atomic.LoadUint32(&c.lastUseTime)
  745. return time.Unix(startTimeUnix+int64(n), 0)
  746. }
  747. // Get returns the status code and body of url.
  748. //
  749. // The contents of dst will be replaced by the body and returned, if the dst
  750. // is too small a new slice will be allocated.
  751. //
  752. // The function follows redirects. Use Do* for manually handling redirects.
  753. func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
  754. return clientGetURL(dst, url, c)
  755. }
  756. // GetTimeout returns the status code and body of url.
  757. //
  758. // The contents of dst will be replaced by the body and returned, if the dst
  759. // is too small a new slice will be allocated.
  760. //
  761. // The function follows redirects. Use Do* for manually handling redirects.
  762. //
  763. // ErrTimeout error is returned if url contents couldn't be fetched
  764. // during the given timeout.
  765. func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
  766. return clientGetURLTimeout(dst, url, timeout, c)
  767. }
  768. // GetDeadline returns the status code and body of url.
  769. //
  770. // The contents of dst will be replaced by the body and returned, if the dst
  771. // is too small a new slice will be allocated.
  772. //
  773. // The function follows redirects. Use Do* for manually handling redirects.
  774. //
  775. // ErrTimeout error is returned if url contents couldn't be fetched
  776. // until the given deadline.
  777. func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
  778. return clientGetURLDeadline(dst, url, deadline, c)
  779. }
  780. // Post sends POST request to the given url with the given POST arguments.
  781. //
  782. // The contents of dst will be replaced by the body and returned, if the dst
  783. // is too small a new slice will be allocated.
  784. //
  785. // The function follows redirects. Use Do* for manually handling redirects.
  786. //
  787. // Empty POST body is sent if postArgs is nil.
  788. func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
  789. return clientPostURL(dst, url, postArgs, c)
  790. }
  791. type clientDoer interface {
  792. Do(req *Request, resp *Response) error
  793. }
  794. func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  795. req := AcquireRequest()
  796. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  797. ReleaseRequest(req)
  798. return statusCode, body, err
  799. }
  800. func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
  801. deadline := time.Now().Add(timeout)
  802. return clientGetURLDeadline(dst, url, deadline, c)
  803. }
  804. type clientURLResponse struct {
  805. statusCode int
  806. body []byte
  807. err error
  808. }
  809. func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
  810. timeout := time.Until(deadline)
  811. if timeout <= 0 {
  812. return 0, dst, ErrTimeout
  813. }
  814. var ch chan clientURLResponse
  815. chv := clientURLResponseChPool.Get()
  816. if chv == nil {
  817. chv = make(chan clientURLResponse, 1)
  818. }
  819. ch = chv.(chan clientURLResponse)
  820. // Note that the request continues execution on ErrTimeout until
  821. // client-specific ReadTimeout exceeds. This helps limiting load
  822. // on slow hosts by MaxConns* concurrent requests.
  823. //
  824. // Without this 'hack' the load on slow host could exceed MaxConns*
  825. // concurrent requests, since timed out requests on client side
  826. // usually continue execution on the host.
  827. var mu sync.Mutex
  828. var timedout, responded bool
  829. go func() {
  830. req := AcquireRequest()
  831. statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
  832. mu.Lock()
  833. if !timedout {
  834. ch <- clientURLResponse{
  835. statusCode: statusCodeCopy,
  836. body: bodyCopy,
  837. err: errCopy,
  838. }
  839. responded = true
  840. }
  841. mu.Unlock()
  842. ReleaseRequest(req)
  843. }()
  844. tc := AcquireTimer(timeout)
  845. select {
  846. case resp := <-ch:
  847. statusCode = resp.statusCode
  848. body = resp.body
  849. err = resp.err
  850. case <-tc.C:
  851. mu.Lock()
  852. if responded {
  853. resp := <-ch
  854. statusCode = resp.statusCode
  855. body = resp.body
  856. err = resp.err
  857. } else {
  858. timedout = true
  859. err = ErrTimeout
  860. body = dst
  861. }
  862. mu.Unlock()
  863. }
  864. ReleaseTimer(tc)
  865. clientURLResponseChPool.Put(chv)
  866. return statusCode, body, err
  867. }
  868. var clientURLResponseChPool sync.Pool
  869. func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
  870. req := AcquireRequest()
  871. defer ReleaseRequest(req)
  872. req.Header.SetMethod(MethodPost)
  873. req.Header.SetContentTypeBytes(strPostArgsContentType)
  874. if postArgs != nil {
  875. if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
  876. return 0, nil, err
  877. }
  878. }
  879. statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
  880. return statusCode, body, err
  881. }
  882. var (
  883. // ErrMissingLocation is returned by clients when the Location header is missing on
  884. // an HTTP response with a redirect status code.
  885. ErrMissingLocation = errors.New("missing Location header for http redirect")
  886. // ErrTooManyRedirects is returned by clients when the number of redirects followed
  887. // exceed the max count.
  888. ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
  889. // HostClients are only able to follow redirects to the same protocol.
  890. ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
  891. )
  892. const defaultMaxRedirectsCount = 16
  893. func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
  894. resp := AcquireResponse()
  895. bodyBuf := resp.bodyBuffer()
  896. resp.keepBodyBuffer = true
  897. oldBody := bodyBuf.B
  898. bodyBuf.B = dst
  899. statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
  900. body = bodyBuf.B
  901. bodyBuf.B = oldBody
  902. resp.keepBodyBuffer = false
  903. ReleaseResponse(resp)
  904. return statusCode, body, err
  905. }
  906. func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
  907. redirectsCount := 0
  908. for {
  909. req.SetRequestURI(url)
  910. if err := req.parseURI(); err != nil {
  911. return 0, nil, err
  912. }
  913. if err = c.Do(req, resp); err != nil {
  914. break
  915. }
  916. statusCode = resp.Header.StatusCode()
  917. if !StatusCodeIsRedirect(statusCode) {
  918. break
  919. }
  920. redirectsCount++
  921. if redirectsCount > maxRedirectsCount {
  922. err = ErrTooManyRedirects
  923. break
  924. }
  925. location := resp.Header.peek(strLocation)
  926. if len(location) == 0 {
  927. err = ErrMissingLocation
  928. break
  929. }
  930. url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
  931. }
  932. return statusCode, body, err
  933. }
  934. func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
  935. u := AcquireURI()
  936. u.Update(baseURL)
  937. u.UpdateBytes(location)
  938. u.DisablePathNormalizing = disablePathNormalizing
  939. redirectURL := u.String()
  940. ReleaseURI(u)
  941. return redirectURL
  942. }
  943. // StatusCodeIsRedirect returns true if the status code indicates a redirect.
  944. func StatusCodeIsRedirect(statusCode int) bool {
  945. return statusCode == StatusMovedPermanently ||
  946. statusCode == StatusFound ||
  947. statusCode == StatusSeeOther ||
  948. statusCode == StatusTemporaryRedirect ||
  949. statusCode == StatusPermanentRedirect
  950. }
  951. var (
  952. requestPool sync.Pool
  953. responsePool sync.Pool
  954. )
  955. // AcquireRequest returns an empty Request instance from request pool.
  956. //
  957. // The returned Request instance may be passed to ReleaseRequest when it is
  958. // no longer needed. This allows Request recycling, reduces GC pressure
  959. // and usually improves performance.
  960. func AcquireRequest() *Request {
  961. v := requestPool.Get()
  962. if v == nil {
  963. return &Request{}
  964. }
  965. return v.(*Request)
  966. }
  967. // ReleaseRequest returns req acquired via AcquireRequest to request pool.
  968. //
  969. // It is forbidden accessing req and/or its' members after returning
  970. // it to request pool.
  971. func ReleaseRequest(req *Request) {
  972. req.Reset()
  973. requestPool.Put(req)
  974. }
  975. // AcquireResponse returns an empty Response instance from response pool.
  976. //
  977. // The returned Response instance may be passed to ReleaseResponse when it is
  978. // no longer needed. This allows Response recycling, reduces GC pressure
  979. // and usually improves performance.
  980. func AcquireResponse() *Response {
  981. v := responsePool.Get()
  982. if v == nil {
  983. return &Response{}
  984. }
  985. return v.(*Response)
  986. }
  987. // ReleaseResponse return resp acquired via AcquireResponse to response pool.
  988. //
  989. // It is forbidden accessing resp and/or its' members after returning
  990. // it to response pool.
  991. func ReleaseResponse(resp *Response) {
  992. resp.Reset()
  993. responsePool.Put(resp)
  994. }
  995. // DoTimeout performs the given request and waits for response during
  996. // the given timeout duration.
  997. //
  998. // Request must contain at least non-zero RequestURI with full url (including
  999. // scheme and host) or non-zero Host header + RequestURI.
  1000. //
  1001. // The function doesn't follow redirects. Use Get* for following redirects.
  1002. //
  1003. // Response is ignored if resp is nil.
  1004. //
  1005. // ErrTimeout is returned if the response wasn't returned during
  1006. // the given timeout.
  1007. // Immediately returns ErrTimeout if timeout value is negative.
  1008. //
  1009. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1010. // to the host are busy.
  1011. //
  1012. // It is recommended obtaining req and resp via AcquireRequest
  1013. // and AcquireResponse in performance-critical code.
  1014. func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  1015. req.timeout = timeout
  1016. if req.timeout <= 0 {
  1017. return ErrTimeout
  1018. }
  1019. return c.Do(req, resp)
  1020. }
  1021. // DoDeadline performs the given request and waits for response until
  1022. // the given deadline.
  1023. //
  1024. // Request must contain at least non-zero RequestURI with full url (including
  1025. // scheme and host) or non-zero Host header + RequestURI.
  1026. //
  1027. // The function doesn't follow redirects. Use Get* for following redirects.
  1028. //
  1029. // Response is ignored if resp is nil.
  1030. //
  1031. // ErrTimeout is returned if the response wasn't returned until
  1032. // the given deadline.
  1033. // Immediately returns ErrTimeout if the deadline has already been reached.
  1034. //
  1035. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1036. // to the host are busy.
  1037. //
  1038. // It is recommended obtaining req and resp via AcquireRequest
  1039. // and AcquireResponse in performance-critical code.
  1040. func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  1041. req.timeout = time.Until(deadline)
  1042. if req.timeout <= 0 {
  1043. return ErrTimeout
  1044. }
  1045. return c.Do(req, resp)
  1046. }
  1047. // DoRedirects performs the given http request and fills the given http response,
  1048. // following up to maxRedirectsCount redirects. When the redirect count exceeds
  1049. // maxRedirectsCount, ErrTooManyRedirects is returned.
  1050. //
  1051. // Request must contain at least non-zero RequestURI with full url (including
  1052. // scheme and host) or non-zero Host header + RequestURI.
  1053. //
  1054. // Client determines the server to be requested in the following order:
  1055. //
  1056. // - from RequestURI if it contains full url with scheme and host;
  1057. // - from Host header otherwise.
  1058. //
  1059. // Response is ignored if resp is nil.
  1060. //
  1061. // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
  1062. // to the requested host are busy.
  1063. //
  1064. // It is recommended obtaining req and resp via AcquireRequest
  1065. // and AcquireResponse in performance-critical code.
  1066. func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
  1067. _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
  1068. return err
  1069. }
  1070. // Do performs the given http request and sets the corresponding response.
  1071. //
  1072. // Request must contain at least non-zero RequestURI with full url (including
  1073. // scheme and host) or non-zero Host header + RequestURI.
  1074. //
  1075. // The function doesn't follow redirects. Use Get* for following redirects.
  1076. //
  1077. // Response is ignored if resp is nil.
  1078. //
  1079. // ErrNoFreeConns is returned if all HostClient.MaxConns connections
  1080. // to the host are busy.
  1081. //
  1082. // It is recommended obtaining req and resp via AcquireRequest
  1083. // and AcquireResponse in performance-critical code.
  1084. func (c *HostClient) Do(req *Request, resp *Response) error {
  1085. var err error
  1086. var retry bool
  1087. maxAttempts := c.MaxIdemponentCallAttempts
  1088. if maxAttempts <= 0 {
  1089. maxAttempts = DefaultMaxIdemponentCallAttempts
  1090. }
  1091. isRequestRetryable := isIdempotent
  1092. if c.RetryIf != nil {
  1093. isRequestRetryable = c.RetryIf
  1094. }
  1095. attempts := 0
  1096. hasBodyStream := req.IsBodyStream()
  1097. // If a request has a timeout we store the timeout
  1098. // and calculate a deadline so we can keep updating the
  1099. // timeout on each retry.
  1100. deadline := time.Time{}
  1101. timeout := req.timeout
  1102. if timeout > 0 {
  1103. deadline = time.Now().Add(timeout)
  1104. }
  1105. atomic.AddInt32(&c.pendingRequests, 1)
  1106. for {
  1107. // If the original timeout was set, we need to update
  1108. // the one set on the request to reflect the remaining time.
  1109. if timeout > 0 {
  1110. req.timeout = time.Until(deadline)
  1111. if req.timeout <= 0 {
  1112. err = ErrTimeout
  1113. break
  1114. }
  1115. }
  1116. retry, err = c.do(req, resp)
  1117. if err == nil || !retry {
  1118. break
  1119. }
  1120. if hasBodyStream {
  1121. break
  1122. }
  1123. if !isRequestRetryable(req) {
  1124. // Retry non-idempotent requests if the server closes
  1125. // the connection before sending the response.
  1126. //
  1127. // This case is possible if the server closes the idle
  1128. // keep-alive connection on timeout.
  1129. //
  1130. // Apache and nginx usually do this.
  1131. if err != io.EOF {
  1132. break
  1133. }
  1134. }
  1135. attempts++
  1136. if attempts >= maxAttempts {
  1137. break
  1138. }
  1139. }
  1140. atomic.AddInt32(&c.pendingRequests, -1)
  1141. // Restore the original timeout.
  1142. req.timeout = timeout
  1143. if err == io.EOF {
  1144. err = ErrConnectionClosed
  1145. }
  1146. return err
  1147. }
  1148. // PendingRequests returns the current number of requests the client
  1149. // is executing.
  1150. //
  1151. // This function may be used for balancing load among multiple HostClient
  1152. // instances.
  1153. func (c *HostClient) PendingRequests() int {
  1154. return int(atomic.LoadInt32(&c.pendingRequests))
  1155. }
  1156. func isIdempotent(req *Request) bool {
  1157. return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
  1158. }
  1159. func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
  1160. if resp == nil {
  1161. resp = AcquireResponse()
  1162. defer ReleaseResponse(resp)
  1163. }
  1164. ok, err := c.doNonNilReqResp(req, resp)
  1165. return ok, err
  1166. }
  1167. func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
  1168. if req == nil {
  1169. // for debugging purposes
  1170. panic("BUG: req cannot be nil")
  1171. }
  1172. if resp == nil {
  1173. // for debugging purposes
  1174. panic("BUG: resp cannot be nil")
  1175. }
  1176. // Secure header error logs configuration
  1177. resp.secureErrorLogMessage = c.SecureErrorLogMessage
  1178. resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1179. req.secureErrorLogMessage = c.SecureErrorLogMessage
  1180. req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
  1181. if c.IsTLS != req.URI().isHTTPS() {
  1182. return false, ErrHostClientRedirectToDifferentScheme
  1183. }
  1184. atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
  1185. // Free up resources occupied by response before sending the request,
  1186. // so the GC may reclaim these resources (e.g. response body).
  1187. // backing up SkipBody in case it was set explicitly
  1188. customSkipBody := resp.SkipBody
  1189. customStreamBody := resp.StreamBody || c.StreamResponseBody
  1190. resp.Reset()
  1191. resp.SkipBody = customSkipBody
  1192. resp.StreamBody = customStreamBody
  1193. req.URI().DisablePathNormalizing = c.DisablePathNormalizing
  1194. userAgentOld := req.Header.UserAgent()
  1195. if len(userAgentOld) == 0 {
  1196. userAgent := c.Name
  1197. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  1198. userAgent = defaultUserAgent
  1199. }
  1200. if userAgent != "" {
  1201. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  1202. }
  1203. }
  1204. return c.transport().RoundTrip(c, req, resp)
  1205. }
  1206. func (c *HostClient) transport() RoundTripper {
  1207. if c.Transport == nil {
  1208. return DefaultTransport
  1209. }
  1210. return c.Transport
  1211. }
  1212. var (
  1213. // ErrNoFreeConns is returned when no free connections available
  1214. // to the given host.
  1215. //
  1216. // Increase the allowed number of connections per host if you
  1217. // see this error.
  1218. ErrNoFreeConns = errors.New("no free connections available to host")
  1219. // ErrConnectionClosed may be returned from client methods if the server
  1220. // closes connection before returning the first response byte.
  1221. //
  1222. // If you see this error, then either fix the server by returning
  1223. // 'Connection: close' response header before closing the connection
  1224. // or add 'Connection: close' request header before sending requests
  1225. // to broken server.
  1226. ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
  1227. "Make sure the server returns 'Connection: close' response header before closing the connection")
  1228. // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
  1229. // If you see this error, then you need to check your HostClient configuration.
  1230. ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
  1231. )
  1232. type timeoutError struct{}
  1233. func (e *timeoutError) Error() string {
  1234. return "timeout"
  1235. }
  1236. // Only implement the Timeout() function of the net.Error interface.
  1237. // This allows for checks like:
  1238. //
  1239. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  1240. func (e *timeoutError) Timeout() bool {
  1241. return true
  1242. }
  1243. // ErrTimeout is returned from timed out calls.
  1244. var ErrTimeout = &timeoutError{}
  1245. // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
  1246. func (c *HostClient) SetMaxConns(newMaxConns int) {
  1247. c.connsLock.Lock()
  1248. c.MaxConns = newMaxConns
  1249. c.connsLock.Unlock()
  1250. }
  1251. func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
  1252. createConn := false
  1253. startCleaner := false
  1254. var n int
  1255. c.connsLock.Lock()
  1256. n = len(c.conns)
  1257. if n == 0 {
  1258. maxConns := c.MaxConns
  1259. if maxConns <= 0 {
  1260. maxConns = DefaultMaxConnsPerHost
  1261. }
  1262. if c.connsCount < maxConns {
  1263. c.connsCount++
  1264. createConn = true
  1265. if !c.connsCleanerRun && !connectionClose {
  1266. startCleaner = true
  1267. c.connsCleanerRun = true
  1268. }
  1269. }
  1270. } else {
  1271. switch c.ConnPoolStrategy {
  1272. case LIFO:
  1273. n--
  1274. cc = c.conns[n]
  1275. c.conns[n] = nil
  1276. c.conns = c.conns[:n]
  1277. case FIFO:
  1278. cc = c.conns[0]
  1279. copy(c.conns, c.conns[1:])
  1280. c.conns[n-1] = nil
  1281. c.conns = c.conns[:n-1]
  1282. default:
  1283. c.connsLock.Unlock()
  1284. return nil, ErrConnPoolStrategyNotImpl
  1285. }
  1286. }
  1287. c.connsLock.Unlock()
  1288. if cc != nil {
  1289. return cc, nil
  1290. }
  1291. if !createConn {
  1292. if c.MaxConnWaitTimeout <= 0 {
  1293. return nil, ErrNoFreeConns
  1294. }
  1295. //nolint:dupword
  1296. // reqTimeout c.MaxConnWaitTimeout wait duration
  1297. // d1 d2 min(d1, d2)
  1298. // 0(not set) d2 d2
  1299. // d1 0(don't wait) 0(don't wait)
  1300. // 0(not set) d2 d2
  1301. timeout := c.MaxConnWaitTimeout
  1302. timeoutOverridden := false
  1303. // reqTimeout == 0 means not set
  1304. if reqTimeout > 0 && reqTimeout < timeout {
  1305. timeout = reqTimeout
  1306. timeoutOverridden = true
  1307. }
  1308. // wait for a free connection
  1309. tc := AcquireTimer(timeout)
  1310. defer ReleaseTimer(tc)
  1311. w := &wantConn{
  1312. ready: make(chan struct{}, 1),
  1313. }
  1314. defer func() {
  1315. if err != nil {
  1316. w.cancel(c, err)
  1317. }
  1318. }()
  1319. c.queueForIdle(w)
  1320. select {
  1321. case <-w.ready:
  1322. return w.conn, w.err
  1323. case <-tc.C:
  1324. if timeoutOverridden {
  1325. return nil, ErrTimeout
  1326. }
  1327. return nil, ErrNoFreeConns
  1328. }
  1329. }
  1330. if startCleaner {
  1331. go c.connsCleaner()
  1332. }
  1333. conn, err := c.dialHostHard(reqTimeout)
  1334. if err != nil {
  1335. c.decConnsCount()
  1336. return nil, err
  1337. }
  1338. cc = acquireClientConn(conn)
  1339. return cc, nil
  1340. }
  1341. func (c *HostClient) queueForIdle(w *wantConn) {
  1342. c.connsLock.Lock()
  1343. defer c.connsLock.Unlock()
  1344. if c.connsWait == nil {
  1345. c.connsWait = &wantConnQueue{}
  1346. }
  1347. c.connsWait.clearFront()
  1348. c.connsWait.pushBack(w)
  1349. }
  1350. func (c *HostClient) dialConnFor(w *wantConn) {
  1351. conn, err := c.dialHostHard(0)
  1352. if err != nil {
  1353. w.tryDeliver(nil, err)
  1354. c.decConnsCount()
  1355. return
  1356. }
  1357. cc := acquireClientConn(conn)
  1358. if !w.tryDeliver(cc, nil) {
  1359. // not delivered, return idle connection
  1360. c.releaseConn(cc)
  1361. }
  1362. }
  1363. // CloseIdleConnections closes any connections which were previously
  1364. // connected from previous requests but are now sitting idle in a
  1365. // "keep-alive" state. It does not interrupt any connections currently
  1366. // in use.
  1367. func (c *HostClient) CloseIdleConnections() {
  1368. c.connsLock.Lock()
  1369. scratch := append([]*clientConn{}, c.conns...)
  1370. for i := range c.conns {
  1371. c.conns[i] = nil
  1372. }
  1373. c.conns = c.conns[:0]
  1374. c.connsLock.Unlock()
  1375. for _, cc := range scratch {
  1376. c.closeConn(cc)
  1377. }
  1378. }
  1379. func (c *HostClient) connsCleaner() {
  1380. var (
  1381. scratch []*clientConn
  1382. maxIdleConnDuration = c.MaxIdleConnDuration
  1383. )
  1384. if maxIdleConnDuration <= 0 {
  1385. maxIdleConnDuration = DefaultMaxIdleConnDuration
  1386. }
  1387. for {
  1388. currentTime := time.Now()
  1389. // Determine idle connections to be closed.
  1390. c.connsLock.Lock()
  1391. conns := c.conns
  1392. n := len(conns)
  1393. i := 0
  1394. for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
  1395. i++
  1396. }
  1397. sleepFor := maxIdleConnDuration
  1398. if i < n {
  1399. // + 1 so we actually sleep past the expiration time and not up to it.
  1400. // Otherwise the > check above would still fail.
  1401. sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
  1402. }
  1403. scratch = append(scratch[:0], conns[:i]...)
  1404. if i > 0 {
  1405. m := copy(conns, conns[i:])
  1406. for i = m; i < n; i++ {
  1407. conns[i] = nil
  1408. }
  1409. c.conns = conns[:m]
  1410. }
  1411. c.connsLock.Unlock()
  1412. // Close idle connections.
  1413. for i, cc := range scratch {
  1414. c.closeConn(cc)
  1415. scratch[i] = nil
  1416. }
  1417. // Determine whether to stop the connsCleaner.
  1418. c.connsLock.Lock()
  1419. mustStop := c.connsCount == 0
  1420. if mustStop {
  1421. c.connsCleanerRun = false
  1422. }
  1423. c.connsLock.Unlock()
  1424. if mustStop {
  1425. break
  1426. }
  1427. time.Sleep(sleepFor)
  1428. }
  1429. }
  1430. func (c *HostClient) closeConn(cc *clientConn) {
  1431. c.decConnsCount()
  1432. cc.c.Close()
  1433. releaseClientConn(cc)
  1434. }
  1435. func (c *HostClient) decConnsCount() {
  1436. if c.MaxConnWaitTimeout <= 0 {
  1437. c.connsLock.Lock()
  1438. c.connsCount--
  1439. c.connsLock.Unlock()
  1440. return
  1441. }
  1442. c.connsLock.Lock()
  1443. defer c.connsLock.Unlock()
  1444. dialed := false
  1445. if q := c.connsWait; q != nil && q.len() > 0 {
  1446. for q.len() > 0 {
  1447. w := q.popFront()
  1448. if w.waiting() {
  1449. go c.dialConnFor(w)
  1450. dialed = true
  1451. break
  1452. }
  1453. }
  1454. }
  1455. if !dialed {
  1456. c.connsCount--
  1457. }
  1458. }
  1459. // ConnsCount returns connection count of HostClient
  1460. func (c *HostClient) ConnsCount() int {
  1461. c.connsLock.Lock()
  1462. defer c.connsLock.Unlock()
  1463. return c.connsCount
  1464. }
  1465. func acquireClientConn(conn net.Conn) *clientConn {
  1466. v := clientConnPool.Get()
  1467. if v == nil {
  1468. v = &clientConn{}
  1469. }
  1470. cc := v.(*clientConn)
  1471. cc.c = conn
  1472. cc.createdTime = time.Now()
  1473. return cc
  1474. }
  1475. func releaseClientConn(cc *clientConn) {
  1476. // Reset all fields.
  1477. *cc = clientConn{}
  1478. clientConnPool.Put(cc)
  1479. }
  1480. var clientConnPool sync.Pool
  1481. func (c *HostClient) releaseConn(cc *clientConn) {
  1482. cc.lastUseTime = time.Now()
  1483. if c.MaxConnWaitTimeout <= 0 {
  1484. c.connsLock.Lock()
  1485. c.conns = append(c.conns, cc)
  1486. c.connsLock.Unlock()
  1487. return
  1488. }
  1489. // try to deliver an idle connection to a *wantConn
  1490. c.connsLock.Lock()
  1491. defer c.connsLock.Unlock()
  1492. delivered := false
  1493. if q := c.connsWait; q != nil && q.len() > 0 {
  1494. for q.len() > 0 {
  1495. w := q.popFront()
  1496. if w.waiting() {
  1497. delivered = w.tryDeliver(cc, nil)
  1498. break
  1499. }
  1500. }
  1501. }
  1502. if !delivered {
  1503. c.conns = append(c.conns, cc)
  1504. }
  1505. }
  1506. func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
  1507. var v interface{}
  1508. if c.clientWriterPool != nil {
  1509. v = c.clientWriterPool.Get()
  1510. if v == nil {
  1511. n := c.WriteBufferSize
  1512. if n <= 0 {
  1513. n = defaultWriteBufferSize
  1514. }
  1515. return bufio.NewWriterSize(conn, n)
  1516. }
  1517. } else {
  1518. v = c.writerPool.Get()
  1519. if v == nil {
  1520. n := c.WriteBufferSize
  1521. if n <= 0 {
  1522. n = defaultWriteBufferSize
  1523. }
  1524. return bufio.NewWriterSize(conn, n)
  1525. }
  1526. }
  1527. bw := v.(*bufio.Writer)
  1528. bw.Reset(conn)
  1529. return bw
  1530. }
  1531. func (c *HostClient) releaseWriter(bw *bufio.Writer) {
  1532. if c.clientWriterPool != nil {
  1533. c.clientWriterPool.Put(bw)
  1534. } else {
  1535. c.writerPool.Put(bw)
  1536. }
  1537. }
  1538. func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
  1539. var v interface{}
  1540. if c.clientReaderPool != nil {
  1541. v = c.clientReaderPool.Get()
  1542. if v == nil {
  1543. n := c.ReadBufferSize
  1544. if n <= 0 {
  1545. n = defaultReadBufferSize
  1546. }
  1547. return bufio.NewReaderSize(conn, n)
  1548. }
  1549. } else {
  1550. v = c.readerPool.Get()
  1551. if v == nil {
  1552. n := c.ReadBufferSize
  1553. if n <= 0 {
  1554. n = defaultReadBufferSize
  1555. }
  1556. return bufio.NewReaderSize(conn, n)
  1557. }
  1558. }
  1559. br := v.(*bufio.Reader)
  1560. br.Reset(conn)
  1561. return br
  1562. }
  1563. func (c *HostClient) releaseReader(br *bufio.Reader) {
  1564. if c.clientReaderPool != nil {
  1565. c.clientReaderPool.Put(br)
  1566. } else {
  1567. c.readerPool.Put(br)
  1568. }
  1569. }
  1570. func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
  1571. if c == nil {
  1572. c = &tls.Config{}
  1573. } else {
  1574. c = c.Clone()
  1575. }
  1576. if len(c.ServerName) == 0 {
  1577. serverName := tlsServerName(addr)
  1578. if serverName == "*" {
  1579. c.InsecureSkipVerify = true
  1580. } else {
  1581. c.ServerName = serverName
  1582. }
  1583. }
  1584. return c
  1585. }
  1586. func tlsServerName(addr string) string {
  1587. if !strings.Contains(addr, ":") {
  1588. return addr
  1589. }
  1590. host, _, err := net.SplitHostPort(addr)
  1591. if err != nil {
  1592. return "*"
  1593. }
  1594. return host
  1595. }
  1596. func (c *HostClient) nextAddr() string {
  1597. c.addrsLock.Lock()
  1598. if c.addrs == nil {
  1599. c.addrs = strings.Split(c.Addr, ",")
  1600. }
  1601. addr := c.addrs[0]
  1602. if len(c.addrs) > 1 {
  1603. addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
  1604. c.addrIdx++
  1605. }
  1606. c.addrsLock.Unlock()
  1607. return addr
  1608. }
  1609. func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
  1610. // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or dial has been set.
  1611. // attempt to dial all the available hosts before giving up.
  1612. c.addrsLock.Lock()
  1613. n := len(c.addrs)
  1614. c.addrsLock.Unlock()
  1615. if n == 0 {
  1616. // It looks like c.addrs isn't initialized yet.
  1617. n = 1
  1618. }
  1619. dial := c.Dial
  1620. if dialTimeout != 0 && dial == nil {
  1621. dial = func(addr string) (net.Conn, error) {
  1622. if c.DialDualStack {
  1623. return DialDualStackTimeout(addr, dialTimeout)
  1624. }
  1625. return DialTimeout(addr, dialTimeout)
  1626. }
  1627. }
  1628. timeout := c.ReadTimeout + c.WriteTimeout
  1629. if timeout <= 0 {
  1630. timeout = DefaultDialTimeout
  1631. }
  1632. deadline := time.Now().Add(timeout)
  1633. for n > 0 {
  1634. addr := c.nextAddr()
  1635. tlsConfig := c.cachedTLSConfig(addr)
  1636. conn, err = dialAddr(addr, dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  1637. if err == nil {
  1638. return conn, nil
  1639. }
  1640. if time.Since(deadline) >= 0 {
  1641. break
  1642. }
  1643. n--
  1644. }
  1645. return nil, err
  1646. }
  1647. func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
  1648. if !c.IsTLS {
  1649. return nil
  1650. }
  1651. c.tlsConfigMapLock.Lock()
  1652. if c.tlsConfigMap == nil {
  1653. c.tlsConfigMap = make(map[string]*tls.Config)
  1654. }
  1655. cfg := c.tlsConfigMap[addr]
  1656. if cfg == nil {
  1657. cfg = newClientTLSConfig(c.TLSConfig, addr)
  1658. c.tlsConfigMap[addr] = cfg
  1659. }
  1660. c.tlsConfigMapLock.Unlock()
  1661. return cfg
  1662. }
  1663. // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
  1664. var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
  1665. func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
  1666. defer func() {
  1667. if retErr != nil {
  1668. rawConn.Close()
  1669. }
  1670. }()
  1671. conn := tls.Client(rawConn, tlsConfig)
  1672. err := conn.SetDeadline(deadline)
  1673. if err != nil {
  1674. return nil, err
  1675. }
  1676. err = conn.Handshake()
  1677. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  1678. return nil, ErrTLSHandshakeTimeout
  1679. }
  1680. if err != nil {
  1681. return nil, err
  1682. }
  1683. err = conn.SetDeadline(time.Time{})
  1684. if err != nil {
  1685. return nil, err
  1686. }
  1687. return conn, nil
  1688. }
  1689. func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
  1690. deadline := time.Now().Add(timeout)
  1691. if dial == nil {
  1692. if dialDualStack {
  1693. dial = DialDualStack
  1694. } else {
  1695. dial = Dial
  1696. }
  1697. addr = AddMissingPort(addr, isTLS)
  1698. }
  1699. conn, err := dial(addr)
  1700. if err != nil {
  1701. return nil, err
  1702. }
  1703. if conn == nil {
  1704. return nil, errors.New("dialling unsuccessful. Please report this bug!")
  1705. }
  1706. // We assume that any conn that has the Handshake() method is a TLS conn already.
  1707. // This doesn't cover just tls.Conn but also other TLS implementations.
  1708. _, isTLSAlready := conn.(interface{ Handshake() error })
  1709. if isTLS && !isTLSAlready {
  1710. if timeout == 0 {
  1711. return tls.Client(conn, tlsConfig), nil
  1712. }
  1713. return tlsClientHandshake(conn, tlsConfig, deadline)
  1714. }
  1715. return conn, nil
  1716. }
  1717. // AddMissingPort adds a port to a host if it is missing.
  1718. // A literal IPv6 address in hostport must be enclosed in square
  1719. // brackets, as in "[::1]:80", "[::1%lo0]:80".
  1720. func AddMissingPort(addr string, isTLS bool) string {
  1721. addrLen := len(addr)
  1722. if addrLen == 0 {
  1723. return addr
  1724. }
  1725. isIP6 := addr[0] == '['
  1726. if isIP6 {
  1727. // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
  1728. isIP6WithoutPort := addr[addrLen-1] == ']'
  1729. if !isIP6WithoutPort {
  1730. return addr
  1731. }
  1732. } else { // IPv4
  1733. columnPos := strings.LastIndexByte(addr, ':')
  1734. if columnPos > 0 {
  1735. return addr
  1736. }
  1737. }
  1738. port := ":80"
  1739. if isTLS {
  1740. port = ":443"
  1741. }
  1742. return addr + port
  1743. }
  1744. // A wantConn records state about a wanted connection
  1745. // (that is, an active call to getConn).
  1746. // The conn may be gotten by dialing or by finding an idle connection,
  1747. // or a cancellation may make the conn no longer wanted.
  1748. // These three options are racing against each other and use
  1749. // wantConn to coordinate and agree about the winning outcome.
  1750. //
  1751. // inspired by net/http/transport.go
  1752. type wantConn struct {
  1753. ready chan struct{}
  1754. mu sync.Mutex // protects conn, err, close(ready)
  1755. conn *clientConn
  1756. err error
  1757. }
  1758. // waiting reports whether w is still waiting for an answer (connection or error).
  1759. func (w *wantConn) waiting() bool {
  1760. select {
  1761. case <-w.ready:
  1762. return false
  1763. default:
  1764. return true
  1765. }
  1766. }
  1767. // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
  1768. func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
  1769. w.mu.Lock()
  1770. defer w.mu.Unlock()
  1771. if w.conn != nil || w.err != nil {
  1772. return false
  1773. }
  1774. w.conn = conn
  1775. w.err = err
  1776. if w.conn == nil && w.err == nil {
  1777. panic("fasthttp: internal error: misuse of tryDeliver")
  1778. }
  1779. close(w.ready)
  1780. return true
  1781. }
  1782. // cancel marks w as no longer wanting a result (for example, due to cancellation).
  1783. // If a connection has been delivered already, cancel returns it with c.releaseConn.
  1784. func (w *wantConn) cancel(c *HostClient, err error) {
  1785. w.mu.Lock()
  1786. if w.conn == nil && w.err == nil {
  1787. close(w.ready) // catch misbehavior in future delivery
  1788. }
  1789. conn := w.conn
  1790. w.conn = nil
  1791. w.err = err
  1792. w.mu.Unlock()
  1793. if conn != nil {
  1794. c.releaseConn(conn)
  1795. }
  1796. }
  1797. // A wantConnQueue is a queue of wantConns.
  1798. //
  1799. // inspired by net/http/transport.go
  1800. type wantConnQueue struct {
  1801. // This is a queue, not a dequeue.
  1802. // It is split into two stages - head[headPos:] and tail.
  1803. // popFront is trivial (headPos++) on the first stage, and
  1804. // pushBack is trivial (append) on the second stage.
  1805. // If the first stage is empty, popFront can swap the
  1806. // first and second stages to remedy the situation.
  1807. //
  1808. // This two-stage split is analogous to the use of two lists
  1809. // in Okasaki's purely functional queue but without the
  1810. // overhead of reversing the list when swapping stages.
  1811. head []*wantConn
  1812. headPos int
  1813. tail []*wantConn
  1814. }
  1815. // len returns the number of items in the queue.
  1816. func (q *wantConnQueue) len() int {
  1817. return len(q.head) - q.headPos + len(q.tail)
  1818. }
  1819. // pushBack adds w to the back of the queue.
  1820. func (q *wantConnQueue) pushBack(w *wantConn) {
  1821. q.tail = append(q.tail, w)
  1822. }
  1823. // popFront removes and returns the wantConn at the front of the queue.
  1824. func (q *wantConnQueue) popFront() *wantConn {
  1825. if q.headPos >= len(q.head) {
  1826. if len(q.tail) == 0 {
  1827. return nil
  1828. }
  1829. // Pick up tail as new head, clear tail.
  1830. q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
  1831. }
  1832. w := q.head[q.headPos]
  1833. q.head[q.headPos] = nil
  1834. q.headPos++
  1835. return w
  1836. }
  1837. // peekFront returns the wantConn at the front of the queue without removing it.
  1838. func (q *wantConnQueue) peekFront() *wantConn {
  1839. if q.headPos < len(q.head) {
  1840. return q.head[q.headPos]
  1841. }
  1842. if len(q.tail) > 0 {
  1843. return q.tail[0]
  1844. }
  1845. return nil
  1846. }
  1847. // clearFront pops any wantConns that are no longer waiting from the head of the
  1848. // queue, reporting whether any were popped.
  1849. func (q *wantConnQueue) clearFront() (cleaned bool) {
  1850. for {
  1851. w := q.peekFront()
  1852. if w == nil || w.waiting() {
  1853. return cleaned
  1854. }
  1855. q.popFront()
  1856. cleaned = true
  1857. }
  1858. }
  1859. // PipelineClient pipelines requests over a limited set of concurrent
  1860. // connections to the given Addr.
  1861. //
  1862. // This client may be used in highly loaded HTTP-based RPC systems for reducing
  1863. // context switches and network level overhead.
  1864. // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
  1865. //
  1866. // It is forbidden copying PipelineClient instances. Create new instances
  1867. // instead.
  1868. //
  1869. // It is safe calling PipelineClient methods from concurrently running
  1870. // goroutines.
  1871. type PipelineClient struct {
  1872. noCopy noCopy
  1873. // Address of the host to connect to.
  1874. Addr string
  1875. // PipelineClient name. Used in User-Agent request header.
  1876. Name string
  1877. // NoDefaultUserAgentHeader when set to true, causes the default
  1878. // User-Agent header to be excluded from the Request.
  1879. NoDefaultUserAgentHeader bool
  1880. // The maximum number of concurrent connections to the Addr.
  1881. //
  1882. // A single connection is used by default.
  1883. MaxConns int
  1884. // The maximum number of pending pipelined requests over
  1885. // a single connection to Addr.
  1886. //
  1887. // DefaultMaxPendingRequests is used by default.
  1888. MaxPendingRequests int
  1889. // The maximum delay before sending pipelined requests as a batch
  1890. // to the server.
  1891. //
  1892. // By default requests are sent immediately to the server.
  1893. MaxBatchDelay time.Duration
  1894. // Callback for connection establishing to the host.
  1895. //
  1896. // Default Dial is used if not set.
  1897. Dial DialFunc
  1898. // Attempt to connect to both ipv4 and ipv6 host addresses
  1899. // if set to true.
  1900. //
  1901. // This option is used only if default TCP dialer is used,
  1902. // i.e. if Dial is blank.
  1903. //
  1904. // By default client connects only to ipv4 addresses,
  1905. // since unfortunately ipv6 remains broken in many networks worldwide :)
  1906. DialDualStack bool
  1907. // Response header names are passed as-is without normalization
  1908. // if this option is set.
  1909. //
  1910. // Disabled header names' normalization may be useful only for proxying
  1911. // responses to other clients expecting case-sensitive
  1912. // header names. See https://github.com/valyala/fasthttp/issues/57
  1913. // for details.
  1914. //
  1915. // By default request and response header names are normalized, i.e.
  1916. // The first letter and the first letters following dashes
  1917. // are uppercased, while all the other letters are lowercased.
  1918. // Examples:
  1919. //
  1920. // * HOST -> Host
  1921. // * content-type -> Content-Type
  1922. // * cONTENT-lenGTH -> Content-Length
  1923. DisableHeaderNamesNormalizing bool
  1924. // Path values are sent as-is without normalization
  1925. //
  1926. // Disabled path normalization may be useful for proxying incoming requests
  1927. // to servers that are expecting paths to be forwarded as-is.
  1928. //
  1929. // By default path values are normalized, i.e.
  1930. // extra slashes are removed, special characters are encoded.
  1931. DisablePathNormalizing bool
  1932. // Whether to use TLS (aka SSL or HTTPS) for host connections.
  1933. IsTLS bool
  1934. // Optional TLS config.
  1935. TLSConfig *tls.Config
  1936. // Idle connection to the host is closed after this duration.
  1937. //
  1938. // By default idle connection is closed after
  1939. // DefaultMaxIdleConnDuration.
  1940. MaxIdleConnDuration time.Duration
  1941. // Buffer size for responses' reading.
  1942. // This also limits the maximum header size.
  1943. //
  1944. // Default buffer size is used if 0.
  1945. ReadBufferSize int
  1946. // Buffer size for requests' writing.
  1947. //
  1948. // Default buffer size is used if 0.
  1949. WriteBufferSize int
  1950. // Maximum duration for full response reading (including body).
  1951. //
  1952. // By default response read timeout is unlimited.
  1953. ReadTimeout time.Duration
  1954. // Maximum duration for full request writing (including body).
  1955. //
  1956. // By default request write timeout is unlimited.
  1957. WriteTimeout time.Duration
  1958. // Logger for logging client errors.
  1959. //
  1960. // By default standard logger from log package is used.
  1961. Logger Logger
  1962. connClients []*pipelineConnClient
  1963. connClientsLock sync.Mutex
  1964. }
  1965. type pipelineConnClient struct {
  1966. noCopy noCopy
  1967. Addr string
  1968. Name string
  1969. NoDefaultUserAgentHeader bool
  1970. MaxPendingRequests int
  1971. MaxBatchDelay time.Duration
  1972. Dial DialFunc
  1973. DialDualStack bool
  1974. DisableHeaderNamesNormalizing bool
  1975. DisablePathNormalizing bool
  1976. IsTLS bool
  1977. TLSConfig *tls.Config
  1978. MaxIdleConnDuration time.Duration
  1979. ReadBufferSize int
  1980. WriteBufferSize int
  1981. ReadTimeout time.Duration
  1982. WriteTimeout time.Duration
  1983. Logger Logger
  1984. workPool sync.Pool
  1985. chLock sync.Mutex
  1986. chW chan *pipelineWork
  1987. chR chan *pipelineWork
  1988. tlsConfigLock sync.Mutex
  1989. tlsConfig *tls.Config
  1990. }
  1991. type pipelineWork struct {
  1992. reqCopy Request
  1993. respCopy Response
  1994. req *Request
  1995. resp *Response
  1996. t *time.Timer
  1997. deadline time.Time
  1998. err error
  1999. done chan struct{}
  2000. }
  2001. // DoTimeout performs the given request and waits for response during
  2002. // the given timeout duration.
  2003. //
  2004. // Request must contain at least non-zero RequestURI with full url (including
  2005. // scheme and host) or non-zero Host header + RequestURI.
  2006. //
  2007. // The function doesn't follow redirects.
  2008. //
  2009. // Response is ignored if resp is nil.
  2010. //
  2011. // ErrTimeout is returned if the response wasn't returned during
  2012. // the given timeout.
  2013. //
  2014. // It is recommended obtaining req and resp via AcquireRequest
  2015. // and AcquireResponse in performance-critical code.
  2016. func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
  2017. return c.DoDeadline(req, resp, time.Now().Add(timeout))
  2018. }
  2019. // DoDeadline performs the given request and waits for response until
  2020. // the given deadline.
  2021. //
  2022. // Request must contain at least non-zero RequestURI with full url (including
  2023. // scheme and host) or non-zero Host header + RequestURI.
  2024. //
  2025. // The function doesn't follow redirects.
  2026. //
  2027. // Response is ignored if resp is nil.
  2028. //
  2029. // ErrTimeout is returned if the response wasn't returned until
  2030. // the given deadline.
  2031. //
  2032. // It is recommended obtaining req and resp via AcquireRequest
  2033. // and AcquireResponse in performance-critical code.
  2034. func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2035. return c.getConnClient().DoDeadline(req, resp, deadline)
  2036. }
  2037. func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
  2038. c.init()
  2039. timeout := time.Until(deadline)
  2040. if timeout <= 0 {
  2041. return ErrTimeout
  2042. }
  2043. if c.DisablePathNormalizing {
  2044. req.URI().DisablePathNormalizing = true
  2045. }
  2046. userAgentOld := req.Header.UserAgent()
  2047. if len(userAgentOld) == 0 {
  2048. userAgent := c.Name
  2049. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2050. userAgent = defaultUserAgent
  2051. }
  2052. if userAgent != "" {
  2053. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2054. }
  2055. }
  2056. w := c.acquirePipelineWork(timeout)
  2057. w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2058. w.req = &w.reqCopy
  2059. w.resp = &w.respCopy
  2060. // Make a copy of the request in order to avoid data races on timeouts
  2061. req.copyToSkipBody(&w.reqCopy)
  2062. swapRequestBody(req, &w.reqCopy)
  2063. // Put the request to outgoing queue
  2064. select {
  2065. case c.chW <- w:
  2066. // Fast path: len(c.ch) < cap(c.ch)
  2067. default:
  2068. // Slow path
  2069. select {
  2070. case c.chW <- w:
  2071. case <-w.t.C:
  2072. c.releasePipelineWork(w)
  2073. return ErrTimeout
  2074. }
  2075. }
  2076. // Wait for the response
  2077. var err error
  2078. select {
  2079. case <-w.done:
  2080. if resp != nil {
  2081. w.respCopy.copyToSkipBody(resp)
  2082. swapResponseBody(resp, &w.respCopy)
  2083. }
  2084. err = w.err
  2085. c.releasePipelineWork(w)
  2086. case <-w.t.C:
  2087. err = ErrTimeout
  2088. }
  2089. return err
  2090. }
  2091. func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
  2092. v := c.workPool.Get()
  2093. if v != nil {
  2094. w = v.(*pipelineWork)
  2095. } else {
  2096. w = &pipelineWork{
  2097. done: make(chan struct{}, 1),
  2098. }
  2099. }
  2100. if timeout > 0 {
  2101. if w.t == nil {
  2102. w.t = time.NewTimer(timeout)
  2103. } else {
  2104. w.t.Reset(timeout)
  2105. }
  2106. w.deadline = time.Now().Add(timeout)
  2107. } else {
  2108. w.deadline = zeroTime
  2109. }
  2110. return w
  2111. }
  2112. func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
  2113. if w.t != nil {
  2114. w.t.Stop()
  2115. }
  2116. w.reqCopy.Reset()
  2117. w.respCopy.Reset()
  2118. w.req = nil
  2119. w.resp = nil
  2120. w.err = nil
  2121. c.workPool.Put(w)
  2122. }
  2123. // Do performs the given http request and sets the corresponding response.
  2124. //
  2125. // Request must contain at least non-zero RequestURI with full url (including
  2126. // scheme and host) or non-zero Host header + RequestURI.
  2127. //
  2128. // The function doesn't follow redirects. Use Get* for following redirects.
  2129. //
  2130. // Response is ignored if resp is nil.
  2131. //
  2132. // It is recommended obtaining req and resp via AcquireRequest
  2133. // and AcquireResponse in performance-critical code.
  2134. func (c *PipelineClient) Do(req *Request, resp *Response) error {
  2135. return c.getConnClient().Do(req, resp)
  2136. }
  2137. func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
  2138. c.init()
  2139. if c.DisablePathNormalizing {
  2140. req.URI().DisablePathNormalizing = true
  2141. }
  2142. userAgentOld := req.Header.UserAgent()
  2143. if len(userAgentOld) == 0 {
  2144. userAgent := c.Name
  2145. if userAgent == "" && !c.NoDefaultUserAgentHeader {
  2146. userAgent = defaultUserAgent
  2147. }
  2148. if userAgent != "" {
  2149. req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
  2150. }
  2151. }
  2152. w := c.acquirePipelineWork(0)
  2153. w.req = req
  2154. if resp != nil {
  2155. resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
  2156. w.resp = resp
  2157. } else {
  2158. w.resp = &w.respCopy
  2159. }
  2160. // Put the request to outgoing queue
  2161. select {
  2162. case c.chW <- w:
  2163. default:
  2164. // Try substituting the oldest w with the current one.
  2165. select {
  2166. case wOld := <-c.chW:
  2167. wOld.err = ErrPipelineOverflow
  2168. wOld.done <- struct{}{}
  2169. default:
  2170. }
  2171. select {
  2172. case c.chW <- w:
  2173. default:
  2174. c.releasePipelineWork(w)
  2175. return ErrPipelineOverflow
  2176. }
  2177. }
  2178. // Wait for the response
  2179. <-w.done
  2180. err := w.err
  2181. c.releasePipelineWork(w)
  2182. return err
  2183. }
  2184. func (c *PipelineClient) getConnClient() *pipelineConnClient {
  2185. c.connClientsLock.Lock()
  2186. cc := c.getConnClientUnlocked()
  2187. c.connClientsLock.Unlock()
  2188. return cc
  2189. }
  2190. func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
  2191. if len(c.connClients) == 0 {
  2192. return c.newConnClient()
  2193. }
  2194. // Return the client with the minimum number of pending requests.
  2195. minCC := c.connClients[0]
  2196. minReqs := minCC.PendingRequests()
  2197. if minReqs == 0 {
  2198. return minCC
  2199. }
  2200. for i := 1; i < len(c.connClients); i++ {
  2201. cc := c.connClients[i]
  2202. reqs := cc.PendingRequests()
  2203. if reqs == 0 {
  2204. return cc
  2205. }
  2206. if reqs < minReqs {
  2207. minCC = cc
  2208. minReqs = reqs
  2209. }
  2210. }
  2211. maxConns := c.MaxConns
  2212. if maxConns <= 0 {
  2213. maxConns = 1
  2214. }
  2215. if len(c.connClients) < maxConns {
  2216. return c.newConnClient()
  2217. }
  2218. return minCC
  2219. }
  2220. func (c *PipelineClient) newConnClient() *pipelineConnClient {
  2221. cc := &pipelineConnClient{
  2222. Addr: c.Addr,
  2223. Name: c.Name,
  2224. NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
  2225. MaxPendingRequests: c.MaxPendingRequests,
  2226. MaxBatchDelay: c.MaxBatchDelay,
  2227. Dial: c.Dial,
  2228. DialDualStack: c.DialDualStack,
  2229. DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
  2230. DisablePathNormalizing: c.DisablePathNormalizing,
  2231. IsTLS: c.IsTLS,
  2232. TLSConfig: c.TLSConfig,
  2233. MaxIdleConnDuration: c.MaxIdleConnDuration,
  2234. ReadBufferSize: c.ReadBufferSize,
  2235. WriteBufferSize: c.WriteBufferSize,
  2236. ReadTimeout: c.ReadTimeout,
  2237. WriteTimeout: c.WriteTimeout,
  2238. Logger: c.Logger,
  2239. }
  2240. c.connClients = append(c.connClients, cc)
  2241. return cc
  2242. }
  2243. // ErrPipelineOverflow may be returned from PipelineClient.Do*
  2244. // if the requests' queue is overflowed.
  2245. var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
  2246. // DefaultMaxPendingRequests is the default value
  2247. // for PipelineClient.MaxPendingRequests.
  2248. const DefaultMaxPendingRequests = 1024
  2249. func (c *pipelineConnClient) init() {
  2250. c.chLock.Lock()
  2251. if c.chR == nil {
  2252. maxPendingRequests := c.MaxPendingRequests
  2253. if maxPendingRequests <= 0 {
  2254. maxPendingRequests = DefaultMaxPendingRequests
  2255. }
  2256. c.chR = make(chan *pipelineWork, maxPendingRequests)
  2257. if c.chW == nil {
  2258. c.chW = make(chan *pipelineWork, maxPendingRequests)
  2259. }
  2260. go func() {
  2261. // Keep restarting the worker if it fails (connection errors for example).
  2262. for {
  2263. if err := c.worker(); err != nil {
  2264. c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
  2265. if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
  2266. // Throttle client reconnections on timeout errors
  2267. time.Sleep(time.Second)
  2268. }
  2269. } else {
  2270. c.chLock.Lock()
  2271. stop := len(c.chR) == 0 && len(c.chW) == 0
  2272. if !stop {
  2273. c.chR = nil
  2274. c.chW = nil
  2275. }
  2276. c.chLock.Unlock()
  2277. if stop {
  2278. break
  2279. }
  2280. }
  2281. }
  2282. }()
  2283. }
  2284. c.chLock.Unlock()
  2285. }
  2286. func (c *pipelineConnClient) worker() error {
  2287. tlsConfig := c.cachedTLSConfig()
  2288. conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
  2289. if err != nil {
  2290. return err
  2291. }
  2292. // Start reader and writer
  2293. stopW := make(chan struct{})
  2294. doneW := make(chan error)
  2295. go func() {
  2296. doneW <- c.writer(conn, stopW)
  2297. }()
  2298. stopR := make(chan struct{})
  2299. doneR := make(chan error)
  2300. go func() {
  2301. doneR <- c.reader(conn, stopR)
  2302. }()
  2303. // Wait until reader and writer are stopped
  2304. select {
  2305. case err = <-doneW:
  2306. conn.Close()
  2307. close(stopR)
  2308. <-doneR
  2309. case err = <-doneR:
  2310. conn.Close()
  2311. close(stopW)
  2312. <-doneW
  2313. }
  2314. // Notify pending readers
  2315. for len(c.chR) > 0 {
  2316. w := <-c.chR
  2317. w.err = errPipelineConnStopped
  2318. w.done <- struct{}{}
  2319. }
  2320. return err
  2321. }
  2322. func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
  2323. if !c.IsTLS {
  2324. return nil
  2325. }
  2326. c.tlsConfigLock.Lock()
  2327. cfg := c.tlsConfig
  2328. if cfg == nil {
  2329. cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
  2330. c.tlsConfig = cfg
  2331. }
  2332. c.tlsConfigLock.Unlock()
  2333. return cfg
  2334. }
  2335. func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
  2336. writeBufferSize := c.WriteBufferSize
  2337. if writeBufferSize <= 0 {
  2338. writeBufferSize = defaultWriteBufferSize
  2339. }
  2340. bw := bufio.NewWriterSize(conn, writeBufferSize)
  2341. defer bw.Flush()
  2342. chR := c.chR
  2343. chW := c.chW
  2344. writeTimeout := c.WriteTimeout
  2345. maxIdleConnDuration := c.MaxIdleConnDuration
  2346. if maxIdleConnDuration <= 0 {
  2347. maxIdleConnDuration = DefaultMaxIdleConnDuration
  2348. }
  2349. maxBatchDelay := c.MaxBatchDelay
  2350. var (
  2351. stopTimer = time.NewTimer(time.Hour)
  2352. flushTimer = time.NewTimer(time.Hour)
  2353. flushTimerCh <-chan time.Time
  2354. instantTimerCh = make(chan time.Time)
  2355. w *pipelineWork
  2356. err error
  2357. )
  2358. close(instantTimerCh)
  2359. for {
  2360. againChW:
  2361. select {
  2362. case w = <-chW:
  2363. // Fast path: len(chW) > 0
  2364. default:
  2365. // Slow path
  2366. stopTimer.Reset(maxIdleConnDuration)
  2367. select {
  2368. case w = <-chW:
  2369. case <-stopTimer.C:
  2370. return nil
  2371. case <-stopCh:
  2372. return nil
  2373. case <-flushTimerCh:
  2374. if err = bw.Flush(); err != nil {
  2375. return err
  2376. }
  2377. flushTimerCh = nil
  2378. goto againChW
  2379. }
  2380. }
  2381. if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
  2382. w.err = ErrTimeout
  2383. w.done <- struct{}{}
  2384. continue
  2385. }
  2386. w.resp.parseNetConn(conn)
  2387. if writeTimeout > 0 {
  2388. // Set Deadline every time, since golang has fixed the performance issue
  2389. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2390. currentTime := time.Now()
  2391. if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
  2392. w.err = err
  2393. w.done <- struct{}{}
  2394. return err
  2395. }
  2396. }
  2397. if err = w.req.Write(bw); err != nil {
  2398. w.err = err
  2399. w.done <- struct{}{}
  2400. return err
  2401. }
  2402. if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
  2403. if maxBatchDelay > 0 {
  2404. flushTimer.Reset(maxBatchDelay)
  2405. flushTimerCh = flushTimer.C
  2406. } else {
  2407. flushTimerCh = instantTimerCh
  2408. }
  2409. }
  2410. againChR:
  2411. select {
  2412. case chR <- w:
  2413. // Fast path: len(chR) < cap(chR)
  2414. default:
  2415. // Slow path
  2416. select {
  2417. case chR <- w:
  2418. case <-stopCh:
  2419. w.err = errPipelineConnStopped
  2420. w.done <- struct{}{}
  2421. return nil
  2422. case <-flushTimerCh:
  2423. if err = bw.Flush(); err != nil {
  2424. w.err = err
  2425. w.done <- struct{}{}
  2426. return err
  2427. }
  2428. flushTimerCh = nil
  2429. goto againChR
  2430. }
  2431. }
  2432. }
  2433. }
  2434. func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
  2435. readBufferSize := c.ReadBufferSize
  2436. if readBufferSize <= 0 {
  2437. readBufferSize = defaultReadBufferSize
  2438. }
  2439. br := bufio.NewReaderSize(conn, readBufferSize)
  2440. chR := c.chR
  2441. readTimeout := c.ReadTimeout
  2442. var (
  2443. w *pipelineWork
  2444. err error
  2445. )
  2446. for {
  2447. select {
  2448. case w = <-chR:
  2449. // Fast path: len(chR) > 0
  2450. default:
  2451. // Slow path
  2452. select {
  2453. case w = <-chR:
  2454. case <-stopCh:
  2455. return nil
  2456. }
  2457. }
  2458. if readTimeout > 0 {
  2459. // Set Deadline every time, since golang has fixed the performance issue
  2460. // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
  2461. currentTime := time.Now()
  2462. if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
  2463. w.err = err
  2464. w.done <- struct{}{}
  2465. return err
  2466. }
  2467. }
  2468. if err = w.resp.Read(br); err != nil {
  2469. w.err = err
  2470. w.done <- struct{}{}
  2471. return err
  2472. }
  2473. w.done <- struct{}{}
  2474. }
  2475. }
  2476. func (c *pipelineConnClient) logger() Logger {
  2477. if c.Logger != nil {
  2478. return c.Logger
  2479. }
  2480. return defaultLogger
  2481. }
  2482. // PendingRequests returns the current number of pending requests pipelined
  2483. // to the server.
  2484. //
  2485. // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
  2486. // each connection to the server may keep up to MaxPendingRequests requests
  2487. // in the queue before sending them to the server.
  2488. //
  2489. // This function may be used for balancing load among multiple PipelineClient
  2490. // instances.
  2491. func (c *PipelineClient) PendingRequests() int {
  2492. c.connClientsLock.Lock()
  2493. n := 0
  2494. for _, cc := range c.connClients {
  2495. n += cc.PendingRequests()
  2496. }
  2497. c.connClientsLock.Unlock()
  2498. return n
  2499. }
  2500. func (c *pipelineConnClient) PendingRequests() int {
  2501. c.init()
  2502. c.chLock.Lock()
  2503. n := len(c.chR) + len(c.chW)
  2504. c.chLock.Unlock()
  2505. return n
  2506. }
  2507. var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
  2508. var DefaultTransport RoundTripper = &transport{}
  2509. type transport struct{}
  2510. func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
  2511. customSkipBody := resp.SkipBody
  2512. customStreamBody := resp.StreamBody
  2513. var deadline time.Time
  2514. if req.timeout > 0 {
  2515. deadline = time.Now().Add(req.timeout)
  2516. }
  2517. cc, err := hc.acquireConn(req.timeout, req.ConnectionClose())
  2518. if err != nil {
  2519. return false, err
  2520. }
  2521. conn := cc.c
  2522. resp.parseNetConn(conn)
  2523. writeDeadline := deadline
  2524. if hc.WriteTimeout > 0 {
  2525. tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
  2526. if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
  2527. writeDeadline = tmpWriteDeadline
  2528. }
  2529. }
  2530. if err = conn.SetWriteDeadline(writeDeadline); err != nil {
  2531. hc.closeConn(cc)
  2532. return true, err
  2533. }
  2534. resetConnection := false
  2535. if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
  2536. req.SetConnectionClose()
  2537. resetConnection = true
  2538. }
  2539. bw := hc.acquireWriter(conn)
  2540. err = req.Write(bw)
  2541. if resetConnection {
  2542. req.Header.ResetConnectionClose()
  2543. }
  2544. if err == nil {
  2545. err = bw.Flush()
  2546. }
  2547. hc.releaseWriter(bw)
  2548. // Return ErrTimeout on any timeout.
  2549. if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  2550. err = ErrTimeout
  2551. }
  2552. isConnRST := isConnectionReset(err)
  2553. if err != nil && !isConnRST {
  2554. hc.closeConn(cc)
  2555. return true, err
  2556. }
  2557. readDeadline := deadline
  2558. if hc.ReadTimeout > 0 {
  2559. tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
  2560. if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
  2561. readDeadline = tmpReadDeadline
  2562. }
  2563. }
  2564. if err = conn.SetReadDeadline(readDeadline); err != nil {
  2565. hc.closeConn(cc)
  2566. return true, err
  2567. }
  2568. if customSkipBody || req.Header.IsHead() {
  2569. resp.SkipBody = true
  2570. }
  2571. if hc.DisableHeaderNamesNormalizing {
  2572. resp.Header.DisableNormalizing()
  2573. }
  2574. br := hc.acquireReader(conn)
  2575. err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
  2576. if err != nil {
  2577. hc.releaseReader(br)
  2578. hc.closeConn(cc)
  2579. // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
  2580. needRetry := err != ErrBodyTooLarge
  2581. return needRetry, err
  2582. }
  2583. closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST
  2584. if customStreamBody && resp.bodyStream != nil {
  2585. rbs := resp.bodyStream
  2586. resp.bodyStream = newCloseReader(rbs, func() error {
  2587. hc.releaseReader(br)
  2588. if r, ok := rbs.(*requestStream); ok {
  2589. releaseRequestStream(r)
  2590. }
  2591. if closeConn || resp.ConnectionClose() {
  2592. hc.closeConn(cc)
  2593. } else {
  2594. hc.releaseConn(cc)
  2595. }
  2596. return nil
  2597. })
  2598. return false, nil
  2599. } else {
  2600. hc.releaseReader(br)
  2601. }
  2602. if closeConn {
  2603. hc.closeConn(cc)
  2604. } else {
  2605. hc.releaseConn(cc)
  2606. }
  2607. return false, nil
  2608. }