123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958 |
- package fasthttp
- import (
- "bufio"
- "crypto/tls"
- "errors"
- "fmt"
- "io"
- "net"
- "strings"
- "sync"
- "sync/atomic"
- "time"
- )
- // Do performs the given http request and fills the given http response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func Do(req *Request, resp *Response) error {
- return defaultClient.Do(req, resp)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- return defaultClient.DoTimeout(req, resp, timeout)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- return defaultClient.DoDeadline(req, resp, deadline)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, &defaultClient)
- return err
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return defaultClient.Get(dst, url)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return defaultClient.GetTimeout(dst, url, timeout)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return defaultClient.GetDeadline(dst, url, deadline)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return defaultClient.Post(dst, url, postArgs)
- }
- var defaultClient Client
- // Client implements http client.
- //
- // Copying Client by value is prohibited. Create new instance instead.
- //
- // It is safe calling Client methods from concurrently running goroutines.
- //
- // The fields of a Client should not be changed while it is in use.
- type Client struct {
- noCopy noCopy
- // Client name. Used in User-Agent request header.
- //
- // Default client name is used if not set.
- Name string
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // Callback for establishing new connections to hosts.
- //
- // Default Dial is used if not set.
- Dial DialFunc
- // Attempt to connect to both ipv4 and ipv6 addresses if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial is blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // TLS config for https connections.
- //
- // Default TLS config is used if not set.
- TLSConfig *tls.Config
- // Maximum number of connections per each host which may be established.
- //
- // DefaultMaxConnsPerHost is used if not set.
- MaxConnsPerHost int
- // Idle keep-alive connections are closed after this duration.
- //
- // By default idle connections are closed
- // after DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Keep-alive connections are closed after this duration.
- //
- // By default connection duration is unlimited.
- MaxConnDuration time.Duration
- // Maximum number of attempts for idempotent calls
- //
- // DefaultMaxIdemponentCallAttempts is used if not set.
- MaxIdemponentCallAttempts int
- // Per-connection buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Per-connection buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- // Maximum response body size.
- //
- // The client returns ErrBodyTooLarge if this limit is greater than 0
- // and response body is greater than the limit.
- //
- // By default response body size is unlimited.
- MaxResponseBodySize int
- // Header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // Maximum duration for waiting for a free connection.
- //
- // By default will not waiting, return ErrNoFreeConns immediately
- MaxConnWaitTimeout time.Duration
- // RetryIf controls whether a retry should be attempted after an error.
- //
- // By default will use isIdempotent function
- RetryIf RetryIfFunc
- // Connection pool strategy. Can be either LIFO or FIFO (default).
- ConnPoolStrategy ConnPoolStrategyType
- // StreamResponseBody enables response body streaming
- StreamResponseBody bool
- // ConfigureClient configures the fasthttp.HostClient.
- ConfigureClient func(hc *HostClient) error
- mLock sync.RWMutex
- mOnce sync.Once
- m map[string]*HostClient
- ms map[string]*HostClient
- readerPool sync.Pool
- writerPool sync.Pool
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func (c *Client) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return clientGetURL(dst, url, c)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func (c *Client) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return clientGetURLTimeout(dst, url, timeout, c)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func (c *Client) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func (c *Client) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return clientPostURL(dst, url, postArgs, c)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- // Immediately returns ErrTimeout if timeout value is negative.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- req.timeout = timeout
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- // Immediately returns ErrTimeout if the deadline has already been reached.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
- return err
- }
- // Do performs the given http request and fills the given http response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // ErrNoFreeConns is returned if all Client.MaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *Client) Do(req *Request, resp *Response) error {
- uri := req.URI()
- if uri == nil {
- return ErrorInvalidURI
- }
- host := uri.Host()
- isTLS := false
- if uri.isHTTPS() {
- isTLS = true
- } else if !uri.isHTTP() {
- return fmt.Errorf("unsupported protocol %q. http and https are supported", uri.Scheme())
- }
- c.mOnce.Do(func() {
- c.mLock.Lock()
- c.m = make(map[string]*HostClient)
- c.ms = make(map[string]*HostClient)
- c.mLock.Unlock()
- })
- startCleaner := false
- c.mLock.RLock()
- m := c.m
- if isTLS {
- m = c.ms
- }
- hc := m[string(host)]
- if hc != nil {
- atomic.AddInt32(&hc.pendingClientRequests, 1)
- defer atomic.AddInt32(&hc.pendingClientRequests, -1)
- }
- c.mLock.RUnlock()
- if hc == nil {
- c.mLock.Lock()
- hc = m[string(host)]
- if hc == nil {
- hc = &HostClient{
- Addr: AddMissingPort(string(host), isTLS),
- Name: c.Name,
- NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
- Dial: c.Dial,
- DialDualStack: c.DialDualStack,
- IsTLS: isTLS,
- TLSConfig: c.TLSConfig,
- MaxConns: c.MaxConnsPerHost,
- MaxIdleConnDuration: c.MaxIdleConnDuration,
- MaxConnDuration: c.MaxConnDuration,
- MaxIdemponentCallAttempts: c.MaxIdemponentCallAttempts,
- ReadBufferSize: c.ReadBufferSize,
- WriteBufferSize: c.WriteBufferSize,
- ReadTimeout: c.ReadTimeout,
- WriteTimeout: c.WriteTimeout,
- MaxResponseBodySize: c.MaxResponseBodySize,
- DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
- DisablePathNormalizing: c.DisablePathNormalizing,
- MaxConnWaitTimeout: c.MaxConnWaitTimeout,
- RetryIf: c.RetryIf,
- ConnPoolStrategy: c.ConnPoolStrategy,
- StreamResponseBody: c.StreamResponseBody,
- clientReaderPool: &c.readerPool,
- clientWriterPool: &c.writerPool,
- }
- if c.ConfigureClient != nil {
- if err := c.ConfigureClient(hc); err != nil {
- c.mLock.Unlock()
- return err
- }
- }
- m[string(host)] = hc
- if len(m) == 1 {
- startCleaner = true
- }
- }
- atomic.AddInt32(&hc.pendingClientRequests, 1)
- defer atomic.AddInt32(&hc.pendingClientRequests, -1)
- c.mLock.Unlock()
- }
- if startCleaner {
- go c.mCleaner(m)
- }
- return hc.Do(req, resp)
- }
- // CloseIdleConnections closes any connections which were previously
- // connected from previous requests but are now sitting idle in a
- // "keep-alive" state. It does not interrupt any connections currently
- // in use.
- func (c *Client) CloseIdleConnections() {
- c.mLock.RLock()
- for _, v := range c.m {
- v.CloseIdleConnections()
- }
- for _, v := range c.ms {
- v.CloseIdleConnections()
- }
- c.mLock.RUnlock()
- }
- func (c *Client) mCleaner(m map[string]*HostClient) {
- mustStop := false
- sleep := c.MaxIdleConnDuration
- if sleep < time.Second {
- sleep = time.Second
- } else if sleep > 10*time.Second {
- sleep = 10 * time.Second
- }
- for {
- time.Sleep(sleep)
- c.mLock.Lock()
- for k, v := range m {
- v.connsLock.Lock()
- /* #nosec G601 */
- if v.connsCount == 0 && atomic.LoadInt32(&v.pendingClientRequests) == 0 {
- delete(m, k)
- }
- v.connsLock.Unlock()
- }
- if len(m) == 0 {
- mustStop = true
- }
- c.mLock.Unlock()
- if mustStop {
- break
- }
- }
- }
- // DefaultMaxConnsPerHost is the maximum number of concurrent connections
- // http client may establish per host by default (i.e. if
- // Client.MaxConnsPerHost isn't set).
- const DefaultMaxConnsPerHost = 512
- // DefaultMaxIdleConnDuration is the default duration before idle keep-alive
- // connection is closed.
- const DefaultMaxIdleConnDuration = 10 * time.Second
- // DefaultMaxIdemponentCallAttempts is the default idempotent calls attempts count.
- const DefaultMaxIdemponentCallAttempts = 5
- // DialFunc must establish connection to addr.
- //
- // There is no need in establishing TLS (SSL) connection for https.
- // The client automatically converts connection to TLS
- // if HostClient.IsTLS is set.
- //
- // TCP address passed to DialFunc always contains host and port.
- // Example TCP addr values:
- //
- // - foobar.com:80
- // - foobar.com:443
- // - foobar.com:8080
- type DialFunc func(addr string) (net.Conn, error)
- // RetryIfFunc signature of retry if function
- //
- // Request argument passed to RetryIfFunc, if there are any request errors.
- type RetryIfFunc func(request *Request) bool
- // RoundTripper wraps every request/response.
- type RoundTripper interface {
- RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error)
- }
- // ConnPoolStrategyType define strategy of connection pool enqueue/dequeue
- type ConnPoolStrategyType int
- const (
- FIFO ConnPoolStrategyType = iota
- LIFO
- )
- // HostClient balances http requests among hosts listed in Addr.
- //
- // HostClient may be used for balancing load among multiple upstream hosts.
- // While multiple addresses passed to HostClient.Addr may be used for balancing
- // load among them, it would be better using LBClient instead, since HostClient
- // may unevenly balance load among upstream hosts.
- //
- // It is forbidden copying HostClient instances. Create new instances instead.
- //
- // It is safe calling HostClient methods from concurrently running goroutines.
- type HostClient struct {
- noCopy noCopy
- // Comma-separated list of upstream HTTP server host addresses,
- // which are passed to Dial in a round-robin manner.
- //
- // Each address may contain port if default dialer is used.
- // For example,
- //
- // - foobar.com:80
- // - foobar.com:443
- // - foobar.com:8080
- Addr string
- // Client name. Used in User-Agent request header.
- Name string
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // Callback for establishing new connection to the host.
- //
- // Default Dial is used if not set.
- Dial DialFunc
- // Attempt to connect to both ipv4 and ipv6 host addresses
- // if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial is blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // Whether to use TLS (aka SSL or HTTPS) for host connections.
- IsTLS bool
- // Optional TLS config.
- TLSConfig *tls.Config
- // Maximum number of connections which may be established to all hosts
- // listed in Addr.
- //
- // You can change this value while the HostClient is being used
- // with HostClient.SetMaxConns(value)
- //
- // DefaultMaxConnsPerHost is used if not set.
- MaxConns int
- // Keep-alive connections are closed after this duration.
- //
- // By default connection duration is unlimited.
- MaxConnDuration time.Duration
- // Idle keep-alive connections are closed after this duration.
- //
- // By default idle connections are closed
- // after DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Maximum number of attempts for idempotent calls
- //
- // DefaultMaxIdemponentCallAttempts is used if not set.
- MaxIdemponentCallAttempts int
- // Per-connection buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Per-connection buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- // Maximum response body size.
- //
- // The client returns ErrBodyTooLarge if this limit is greater than 0
- // and response body is greater than the limit.
- //
- // By default response body size is unlimited.
- MaxResponseBodySize int
- // Header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // Will not log potentially sensitive content in error logs
- //
- // This option is useful for servers that handle sensitive data
- // in the request/response.
- //
- // Client logs full errors by default.
- SecureErrorLogMessage bool
- // Maximum duration for waiting for a free connection.
- //
- // By default will not waiting, return ErrNoFreeConns immediately
- MaxConnWaitTimeout time.Duration
- // RetryIf controls whether a retry should be attempted after an error.
- //
- // By default will use isIdempotent function
- RetryIf RetryIfFunc
- // Transport defines a transport-like mechanism that wraps every request/response.
- Transport RoundTripper
- // Connection pool strategy. Can be either LIFO or FIFO (default).
- ConnPoolStrategy ConnPoolStrategyType
- // StreamResponseBody enables response body streaming
- StreamResponseBody bool
- lastUseTime uint32
- connsLock sync.Mutex
- connsCount int
- conns []*clientConn
- connsWait *wantConnQueue
- addrsLock sync.Mutex
- addrs []string
- addrIdx uint32
- tlsConfigMap map[string]*tls.Config
- tlsConfigMapLock sync.Mutex
- readerPool sync.Pool
- writerPool sync.Pool
- clientReaderPool *sync.Pool
- clientWriterPool *sync.Pool
- pendingRequests int32
- // pendingClientRequests counts the number of requests that a Client is currently running using this HostClient.
- // It will be incremented earlier than pendingRequests and will be used by Client to see if the HostClient is still in use.
- pendingClientRequests int32
- connsCleanerRun bool
- }
- type clientConn struct {
- c net.Conn
- createdTime time.Time
- lastUseTime time.Time
- }
- var startTimeUnix = time.Now().Unix()
- // LastUseTime returns time the client was last used
- func (c *HostClient) LastUseTime() time.Time {
- n := atomic.LoadUint32(&c.lastUseTime)
- return time.Unix(startTimeUnix+int64(n), 0)
- }
- // Get returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- func (c *HostClient) Get(dst []byte, url string) (statusCode int, body []byte, err error) {
- return clientGetURL(dst, url, c)
- }
- // GetTimeout returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // during the given timeout.
- func (c *HostClient) GetTimeout(dst []byte, url string, timeout time.Duration) (statusCode int, body []byte, err error) {
- return clientGetURLTimeout(dst, url, timeout, c)
- }
- // GetDeadline returns the status code and body of url.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // ErrTimeout error is returned if url contents couldn't be fetched
- // until the given deadline.
- func (c *HostClient) GetDeadline(dst []byte, url string, deadline time.Time) (statusCode int, body []byte, err error) {
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- // Post sends POST request to the given url with the given POST arguments.
- //
- // The contents of dst will be replaced by the body and returned, if the dst
- // is too small a new slice will be allocated.
- //
- // The function follows redirects. Use Do* for manually handling redirects.
- //
- // Empty POST body is sent if postArgs is nil.
- func (c *HostClient) Post(dst []byte, url string, postArgs *Args) (statusCode int, body []byte, err error) {
- return clientPostURL(dst, url, postArgs, c)
- }
- type clientDoer interface {
- Do(req *Request, resp *Response) error
- }
- func clientGetURL(dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
- req := AcquireRequest()
- statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
- ReleaseRequest(req)
- return statusCode, body, err
- }
- func clientGetURLTimeout(dst []byte, url string, timeout time.Duration, c clientDoer) (statusCode int, body []byte, err error) {
- deadline := time.Now().Add(timeout)
- return clientGetURLDeadline(dst, url, deadline, c)
- }
- type clientURLResponse struct {
- statusCode int
- body []byte
- err error
- }
- func clientGetURLDeadline(dst []byte, url string, deadline time.Time, c clientDoer) (statusCode int, body []byte, err error) {
- timeout := time.Until(deadline)
- if timeout <= 0 {
- return 0, dst, ErrTimeout
- }
- var ch chan clientURLResponse
- chv := clientURLResponseChPool.Get()
- if chv == nil {
- chv = make(chan clientURLResponse, 1)
- }
- ch = chv.(chan clientURLResponse)
- // Note that the request continues execution on ErrTimeout until
- // client-specific ReadTimeout exceeds. This helps limiting load
- // on slow hosts by MaxConns* concurrent requests.
- //
- // Without this 'hack' the load on slow host could exceed MaxConns*
- // concurrent requests, since timed out requests on client side
- // usually continue execution on the host.
- var mu sync.Mutex
- var timedout, responded bool
- go func() {
- req := AcquireRequest()
- statusCodeCopy, bodyCopy, errCopy := doRequestFollowRedirectsBuffer(req, dst, url, c)
- mu.Lock()
- if !timedout {
- ch <- clientURLResponse{
- statusCode: statusCodeCopy,
- body: bodyCopy,
- err: errCopy,
- }
- responded = true
- }
- mu.Unlock()
- ReleaseRequest(req)
- }()
- tc := AcquireTimer(timeout)
- select {
- case resp := <-ch:
- statusCode = resp.statusCode
- body = resp.body
- err = resp.err
- case <-tc.C:
- mu.Lock()
- if responded {
- resp := <-ch
- statusCode = resp.statusCode
- body = resp.body
- err = resp.err
- } else {
- timedout = true
- err = ErrTimeout
- body = dst
- }
- mu.Unlock()
- }
- ReleaseTimer(tc)
- clientURLResponseChPool.Put(chv)
- return statusCode, body, err
- }
- var clientURLResponseChPool sync.Pool
- func clientPostURL(dst []byte, url string, postArgs *Args, c clientDoer) (statusCode int, body []byte, err error) {
- req := AcquireRequest()
- defer ReleaseRequest(req)
- req.Header.SetMethod(MethodPost)
- req.Header.SetContentTypeBytes(strPostArgsContentType)
- if postArgs != nil {
- if _, err := postArgs.WriteTo(req.BodyWriter()); err != nil {
- return 0, nil, err
- }
- }
- statusCode, body, err = doRequestFollowRedirectsBuffer(req, dst, url, c)
- return statusCode, body, err
- }
- var (
- // ErrMissingLocation is returned by clients when the Location header is missing on
- // an HTTP response with a redirect status code.
- ErrMissingLocation = errors.New("missing Location header for http redirect")
- // ErrTooManyRedirects is returned by clients when the number of redirects followed
- // exceed the max count.
- ErrTooManyRedirects = errors.New("too many redirects detected when doing the request")
- // HostClients are only able to follow redirects to the same protocol.
- ErrHostClientRedirectToDifferentScheme = errors.New("HostClient can't follow redirects to a different protocol, please use Client instead")
- )
- const defaultMaxRedirectsCount = 16
- func doRequestFollowRedirectsBuffer(req *Request, dst []byte, url string, c clientDoer) (statusCode int, body []byte, err error) {
- resp := AcquireResponse()
- bodyBuf := resp.bodyBuffer()
- resp.keepBodyBuffer = true
- oldBody := bodyBuf.B
- bodyBuf.B = dst
- statusCode, _, err = doRequestFollowRedirects(req, resp, url, defaultMaxRedirectsCount, c)
- body = bodyBuf.B
- bodyBuf.B = oldBody
- resp.keepBodyBuffer = false
- ReleaseResponse(resp)
- return statusCode, body, err
- }
- func doRequestFollowRedirects(req *Request, resp *Response, url string, maxRedirectsCount int, c clientDoer) (statusCode int, body []byte, err error) {
- redirectsCount := 0
- for {
- req.SetRequestURI(url)
- if err := req.parseURI(); err != nil {
- return 0, nil, err
- }
- if err = c.Do(req, resp); err != nil {
- break
- }
- statusCode = resp.Header.StatusCode()
- if !StatusCodeIsRedirect(statusCode) {
- break
- }
- redirectsCount++
- if redirectsCount > maxRedirectsCount {
- err = ErrTooManyRedirects
- break
- }
- location := resp.Header.peek(strLocation)
- if len(location) == 0 {
- err = ErrMissingLocation
- break
- }
- url = getRedirectURL(url, location, req.DisableRedirectPathNormalizing)
- }
- return statusCode, body, err
- }
- func getRedirectURL(baseURL string, location []byte, disablePathNormalizing bool) string {
- u := AcquireURI()
- u.Update(baseURL)
- u.UpdateBytes(location)
- u.DisablePathNormalizing = disablePathNormalizing
- redirectURL := u.String()
- ReleaseURI(u)
- return redirectURL
- }
- // StatusCodeIsRedirect returns true if the status code indicates a redirect.
- func StatusCodeIsRedirect(statusCode int) bool {
- return statusCode == StatusMovedPermanently ||
- statusCode == StatusFound ||
- statusCode == StatusSeeOther ||
- statusCode == StatusTemporaryRedirect ||
- statusCode == StatusPermanentRedirect
- }
- var (
- requestPool sync.Pool
- responsePool sync.Pool
- )
- // AcquireRequest returns an empty Request instance from request pool.
- //
- // The returned Request instance may be passed to ReleaseRequest when it is
- // no longer needed. This allows Request recycling, reduces GC pressure
- // and usually improves performance.
- func AcquireRequest() *Request {
- v := requestPool.Get()
- if v == nil {
- return &Request{}
- }
- return v.(*Request)
- }
- // ReleaseRequest returns req acquired via AcquireRequest to request pool.
- //
- // It is forbidden accessing req and/or its' members after returning
- // it to request pool.
- func ReleaseRequest(req *Request) {
- req.Reset()
- requestPool.Put(req)
- }
- // AcquireResponse returns an empty Response instance from response pool.
- //
- // The returned Response instance may be passed to ReleaseResponse when it is
- // no longer needed. This allows Response recycling, reduces GC pressure
- // and usually improves performance.
- func AcquireResponse() *Response {
- v := responsePool.Get()
- if v == nil {
- return &Response{}
- }
- return v.(*Response)
- }
- // ReleaseResponse return resp acquired via AcquireResponse to response pool.
- //
- // It is forbidden accessing resp and/or its' members after returning
- // it to response pool.
- func ReleaseResponse(resp *Response) {
- resp.Reset()
- responsePool.Put(resp)
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- // Immediately returns ErrTimeout if timeout value is negative.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- req.timeout = timeout
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- // Immediately returns ErrTimeout if the deadline has already been reached.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- return ErrTimeout
- }
- return c.Do(req, resp)
- }
- // DoRedirects performs the given http request and fills the given http response,
- // following up to maxRedirectsCount redirects. When the redirect count exceeds
- // maxRedirectsCount, ErrTooManyRedirects is returned.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // Client determines the server to be requested in the following order:
- //
- // - from RequestURI if it contains full url with scheme and host;
- // - from Host header otherwise.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all DefaultMaxConnsPerHost connections
- // to the requested host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) DoRedirects(req *Request, resp *Response, maxRedirectsCount int) error {
- _, _, err := doRequestFollowRedirects(req, resp, req.URI().String(), maxRedirectsCount, c)
- return err
- }
- // Do performs the given http request and sets the corresponding response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrNoFreeConns is returned if all HostClient.MaxConns connections
- // to the host are busy.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *HostClient) Do(req *Request, resp *Response) error {
- var err error
- var retry bool
- maxAttempts := c.MaxIdemponentCallAttempts
- if maxAttempts <= 0 {
- maxAttempts = DefaultMaxIdemponentCallAttempts
- }
- isRequestRetryable := isIdempotent
- if c.RetryIf != nil {
- isRequestRetryable = c.RetryIf
- }
- attempts := 0
- hasBodyStream := req.IsBodyStream()
- // If a request has a timeout we store the timeout
- // and calculate a deadline so we can keep updating the
- // timeout on each retry.
- deadline := time.Time{}
- timeout := req.timeout
- if timeout > 0 {
- deadline = time.Now().Add(timeout)
- }
- atomic.AddInt32(&c.pendingRequests, 1)
- for {
- // If the original timeout was set, we need to update
- // the one set on the request to reflect the remaining time.
- if timeout > 0 {
- req.timeout = time.Until(deadline)
- if req.timeout <= 0 {
- err = ErrTimeout
- break
- }
- }
- retry, err = c.do(req, resp)
- if err == nil || !retry {
- break
- }
- if hasBodyStream {
- break
- }
- if !isRequestRetryable(req) {
- // Retry non-idempotent requests if the server closes
- // the connection before sending the response.
- //
- // This case is possible if the server closes the idle
- // keep-alive connection on timeout.
- //
- // Apache and nginx usually do this.
- if err != io.EOF {
- break
- }
- }
- attempts++
- if attempts >= maxAttempts {
- break
- }
- }
- atomic.AddInt32(&c.pendingRequests, -1)
- // Restore the original timeout.
- req.timeout = timeout
- if err == io.EOF {
- err = ErrConnectionClosed
- }
- return err
- }
- // PendingRequests returns the current number of requests the client
- // is executing.
- //
- // This function may be used for balancing load among multiple HostClient
- // instances.
- func (c *HostClient) PendingRequests() int {
- return int(atomic.LoadInt32(&c.pendingRequests))
- }
- func isIdempotent(req *Request) bool {
- return req.Header.IsGet() || req.Header.IsHead() || req.Header.IsPut()
- }
- func (c *HostClient) do(req *Request, resp *Response) (bool, error) {
- if resp == nil {
- resp = AcquireResponse()
- defer ReleaseResponse(resp)
- }
- ok, err := c.doNonNilReqResp(req, resp)
- return ok, err
- }
- func (c *HostClient) doNonNilReqResp(req *Request, resp *Response) (bool, error) {
- if req == nil {
- // for debugging purposes
- panic("BUG: req cannot be nil")
- }
- if resp == nil {
- // for debugging purposes
- panic("BUG: resp cannot be nil")
- }
- // Secure header error logs configuration
- resp.secureErrorLogMessage = c.SecureErrorLogMessage
- resp.Header.secureErrorLogMessage = c.SecureErrorLogMessage
- req.secureErrorLogMessage = c.SecureErrorLogMessage
- req.Header.secureErrorLogMessage = c.SecureErrorLogMessage
- if c.IsTLS != req.URI().isHTTPS() {
- return false, ErrHostClientRedirectToDifferentScheme
- }
- atomic.StoreUint32(&c.lastUseTime, uint32(time.Now().Unix()-startTimeUnix))
- // Free up resources occupied by response before sending the request,
- // so the GC may reclaim these resources (e.g. response body).
- // backing up SkipBody in case it was set explicitly
- customSkipBody := resp.SkipBody
- customStreamBody := resp.StreamBody || c.StreamResponseBody
- resp.Reset()
- resp.SkipBody = customSkipBody
- resp.StreamBody = customStreamBody
- req.URI().DisablePathNormalizing = c.DisablePathNormalizing
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- return c.transport().RoundTrip(c, req, resp)
- }
- func (c *HostClient) transport() RoundTripper {
- if c.Transport == nil {
- return DefaultTransport
- }
- return c.Transport
- }
- var (
- // ErrNoFreeConns is returned when no free connections available
- // to the given host.
- //
- // Increase the allowed number of connections per host if you
- // see this error.
- ErrNoFreeConns = errors.New("no free connections available to host")
- // ErrConnectionClosed may be returned from client methods if the server
- // closes connection before returning the first response byte.
- //
- // If you see this error, then either fix the server by returning
- // 'Connection: close' response header before closing the connection
- // or add 'Connection: close' request header before sending requests
- // to broken server.
- ErrConnectionClosed = errors.New("the server closed connection before returning the first response byte. " +
- "Make sure the server returns 'Connection: close' response header before closing the connection")
- // ErrConnPoolStrategyNotImpl is returned when HostClient.ConnPoolStrategy is not implement yet.
- // If you see this error, then you need to check your HostClient configuration.
- ErrConnPoolStrategyNotImpl = errors.New("connection pool strategy is not implement")
- )
- type timeoutError struct{}
- func (e *timeoutError) Error() string {
- return "timeout"
- }
- // Only implement the Timeout() function of the net.Error interface.
- // This allows for checks like:
- //
- // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
- func (e *timeoutError) Timeout() bool {
- return true
- }
- // ErrTimeout is returned from timed out calls.
- var ErrTimeout = &timeoutError{}
- // SetMaxConns sets up the maximum number of connections which may be established to all hosts listed in Addr.
- func (c *HostClient) SetMaxConns(newMaxConns int) {
- c.connsLock.Lock()
- c.MaxConns = newMaxConns
- c.connsLock.Unlock()
- }
- func (c *HostClient) acquireConn(reqTimeout time.Duration, connectionClose bool) (cc *clientConn, err error) {
- createConn := false
- startCleaner := false
- var n int
- c.connsLock.Lock()
- n = len(c.conns)
- if n == 0 {
- maxConns := c.MaxConns
- if maxConns <= 0 {
- maxConns = DefaultMaxConnsPerHost
- }
- if c.connsCount < maxConns {
- c.connsCount++
- createConn = true
- if !c.connsCleanerRun && !connectionClose {
- startCleaner = true
- c.connsCleanerRun = true
- }
- }
- } else {
- switch c.ConnPoolStrategy {
- case LIFO:
- n--
- cc = c.conns[n]
- c.conns[n] = nil
- c.conns = c.conns[:n]
- case FIFO:
- cc = c.conns[0]
- copy(c.conns, c.conns[1:])
- c.conns[n-1] = nil
- c.conns = c.conns[:n-1]
- default:
- c.connsLock.Unlock()
- return nil, ErrConnPoolStrategyNotImpl
- }
- }
- c.connsLock.Unlock()
- if cc != nil {
- return cc, nil
- }
- if !createConn {
- if c.MaxConnWaitTimeout <= 0 {
- return nil, ErrNoFreeConns
- }
- //nolint:dupword
- // reqTimeout c.MaxConnWaitTimeout wait duration
- // d1 d2 min(d1, d2)
- // 0(not set) d2 d2
- // d1 0(don't wait) 0(don't wait)
- // 0(not set) d2 d2
- timeout := c.MaxConnWaitTimeout
- timeoutOverridden := false
- // reqTimeout == 0 means not set
- if reqTimeout > 0 && reqTimeout < timeout {
- timeout = reqTimeout
- timeoutOverridden = true
- }
- // wait for a free connection
- tc := AcquireTimer(timeout)
- defer ReleaseTimer(tc)
- w := &wantConn{
- ready: make(chan struct{}, 1),
- }
- defer func() {
- if err != nil {
- w.cancel(c, err)
- }
- }()
- c.queueForIdle(w)
- select {
- case <-w.ready:
- return w.conn, w.err
- case <-tc.C:
- if timeoutOverridden {
- return nil, ErrTimeout
- }
- return nil, ErrNoFreeConns
- }
- }
- if startCleaner {
- go c.connsCleaner()
- }
- conn, err := c.dialHostHard(reqTimeout)
- if err != nil {
- c.decConnsCount()
- return nil, err
- }
- cc = acquireClientConn(conn)
- return cc, nil
- }
- func (c *HostClient) queueForIdle(w *wantConn) {
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- if c.connsWait == nil {
- c.connsWait = &wantConnQueue{}
- }
- c.connsWait.clearFront()
- c.connsWait.pushBack(w)
- }
- func (c *HostClient) dialConnFor(w *wantConn) {
- conn, err := c.dialHostHard(0)
- if err != nil {
- w.tryDeliver(nil, err)
- c.decConnsCount()
- return
- }
- cc := acquireClientConn(conn)
- if !w.tryDeliver(cc, nil) {
- // not delivered, return idle connection
- c.releaseConn(cc)
- }
- }
- // CloseIdleConnections closes any connections which were previously
- // connected from previous requests but are now sitting idle in a
- // "keep-alive" state. It does not interrupt any connections currently
- // in use.
- func (c *HostClient) CloseIdleConnections() {
- c.connsLock.Lock()
- scratch := append([]*clientConn{}, c.conns...)
- for i := range c.conns {
- c.conns[i] = nil
- }
- c.conns = c.conns[:0]
- c.connsLock.Unlock()
- for _, cc := range scratch {
- c.closeConn(cc)
- }
- }
- func (c *HostClient) connsCleaner() {
- var (
- scratch []*clientConn
- maxIdleConnDuration = c.MaxIdleConnDuration
- )
- if maxIdleConnDuration <= 0 {
- maxIdleConnDuration = DefaultMaxIdleConnDuration
- }
- for {
- currentTime := time.Now()
- // Determine idle connections to be closed.
- c.connsLock.Lock()
- conns := c.conns
- n := len(conns)
- i := 0
- for i < n && currentTime.Sub(conns[i].lastUseTime) > maxIdleConnDuration {
- i++
- }
- sleepFor := maxIdleConnDuration
- if i < n {
- // + 1 so we actually sleep past the expiration time and not up to it.
- // Otherwise the > check above would still fail.
- sleepFor = maxIdleConnDuration - currentTime.Sub(conns[i].lastUseTime) + 1
- }
- scratch = append(scratch[:0], conns[:i]...)
- if i > 0 {
- m := copy(conns, conns[i:])
- for i = m; i < n; i++ {
- conns[i] = nil
- }
- c.conns = conns[:m]
- }
- c.connsLock.Unlock()
- // Close idle connections.
- for i, cc := range scratch {
- c.closeConn(cc)
- scratch[i] = nil
- }
- // Determine whether to stop the connsCleaner.
- c.connsLock.Lock()
- mustStop := c.connsCount == 0
- if mustStop {
- c.connsCleanerRun = false
- }
- c.connsLock.Unlock()
- if mustStop {
- break
- }
- time.Sleep(sleepFor)
- }
- }
- func (c *HostClient) closeConn(cc *clientConn) {
- c.decConnsCount()
- cc.c.Close()
- releaseClientConn(cc)
- }
- func (c *HostClient) decConnsCount() {
- if c.MaxConnWaitTimeout <= 0 {
- c.connsLock.Lock()
- c.connsCount--
- c.connsLock.Unlock()
- return
- }
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- dialed := false
- if q := c.connsWait; q != nil && q.len() > 0 {
- for q.len() > 0 {
- w := q.popFront()
- if w.waiting() {
- go c.dialConnFor(w)
- dialed = true
- break
- }
- }
- }
- if !dialed {
- c.connsCount--
- }
- }
- // ConnsCount returns connection count of HostClient
- func (c *HostClient) ConnsCount() int {
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- return c.connsCount
- }
- func acquireClientConn(conn net.Conn) *clientConn {
- v := clientConnPool.Get()
- if v == nil {
- v = &clientConn{}
- }
- cc := v.(*clientConn)
- cc.c = conn
- cc.createdTime = time.Now()
- return cc
- }
- func releaseClientConn(cc *clientConn) {
- // Reset all fields.
- *cc = clientConn{}
- clientConnPool.Put(cc)
- }
- var clientConnPool sync.Pool
- func (c *HostClient) releaseConn(cc *clientConn) {
- cc.lastUseTime = time.Now()
- if c.MaxConnWaitTimeout <= 0 {
- c.connsLock.Lock()
- c.conns = append(c.conns, cc)
- c.connsLock.Unlock()
- return
- }
- // try to deliver an idle connection to a *wantConn
- c.connsLock.Lock()
- defer c.connsLock.Unlock()
- delivered := false
- if q := c.connsWait; q != nil && q.len() > 0 {
- for q.len() > 0 {
- w := q.popFront()
- if w.waiting() {
- delivered = w.tryDeliver(cc, nil)
- break
- }
- }
- }
- if !delivered {
- c.conns = append(c.conns, cc)
- }
- }
- func (c *HostClient) acquireWriter(conn net.Conn) *bufio.Writer {
- var v interface{}
- if c.clientWriterPool != nil {
- v = c.clientWriterPool.Get()
- if v == nil {
- n := c.WriteBufferSize
- if n <= 0 {
- n = defaultWriteBufferSize
- }
- return bufio.NewWriterSize(conn, n)
- }
- } else {
- v = c.writerPool.Get()
- if v == nil {
- n := c.WriteBufferSize
- if n <= 0 {
- n = defaultWriteBufferSize
- }
- return bufio.NewWriterSize(conn, n)
- }
- }
- bw := v.(*bufio.Writer)
- bw.Reset(conn)
- return bw
- }
- func (c *HostClient) releaseWriter(bw *bufio.Writer) {
- if c.clientWriterPool != nil {
- c.clientWriterPool.Put(bw)
- } else {
- c.writerPool.Put(bw)
- }
- }
- func (c *HostClient) acquireReader(conn net.Conn) *bufio.Reader {
- var v interface{}
- if c.clientReaderPool != nil {
- v = c.clientReaderPool.Get()
- if v == nil {
- n := c.ReadBufferSize
- if n <= 0 {
- n = defaultReadBufferSize
- }
- return bufio.NewReaderSize(conn, n)
- }
- } else {
- v = c.readerPool.Get()
- if v == nil {
- n := c.ReadBufferSize
- if n <= 0 {
- n = defaultReadBufferSize
- }
- return bufio.NewReaderSize(conn, n)
- }
- }
- br := v.(*bufio.Reader)
- br.Reset(conn)
- return br
- }
- func (c *HostClient) releaseReader(br *bufio.Reader) {
- if c.clientReaderPool != nil {
- c.clientReaderPool.Put(br)
- } else {
- c.readerPool.Put(br)
- }
- }
- func newClientTLSConfig(c *tls.Config, addr string) *tls.Config {
- if c == nil {
- c = &tls.Config{}
- } else {
- c = c.Clone()
- }
- if len(c.ServerName) == 0 {
- serverName := tlsServerName(addr)
- if serverName == "*" {
- c.InsecureSkipVerify = true
- } else {
- c.ServerName = serverName
- }
- }
- return c
- }
- func tlsServerName(addr string) string {
- if !strings.Contains(addr, ":") {
- return addr
- }
- host, _, err := net.SplitHostPort(addr)
- if err != nil {
- return "*"
- }
- return host
- }
- func (c *HostClient) nextAddr() string {
- c.addrsLock.Lock()
- if c.addrs == nil {
- c.addrs = strings.Split(c.Addr, ",")
- }
- addr := c.addrs[0]
- if len(c.addrs) > 1 {
- addr = c.addrs[c.addrIdx%uint32(len(c.addrs))]
- c.addrIdx++
- }
- c.addrsLock.Unlock()
- return addr
- }
- func (c *HostClient) dialHostHard(dialTimeout time.Duration) (conn net.Conn, err error) {
- // use dialTimeout to control the timeout of each dial. It does not work if dialTimeout is 0 or dial has been set.
- // attempt to dial all the available hosts before giving up.
- c.addrsLock.Lock()
- n := len(c.addrs)
- c.addrsLock.Unlock()
- if n == 0 {
- // It looks like c.addrs isn't initialized yet.
- n = 1
- }
- dial := c.Dial
- if dialTimeout != 0 && dial == nil {
- dial = func(addr string) (net.Conn, error) {
- if c.DialDualStack {
- return DialDualStackTimeout(addr, dialTimeout)
- }
- return DialTimeout(addr, dialTimeout)
- }
- }
- timeout := c.ReadTimeout + c.WriteTimeout
- if timeout <= 0 {
- timeout = DefaultDialTimeout
- }
- deadline := time.Now().Add(timeout)
- for n > 0 {
- addr := c.nextAddr()
- tlsConfig := c.cachedTLSConfig(addr)
- conn, err = dialAddr(addr, dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
- if err == nil {
- return conn, nil
- }
- if time.Since(deadline) >= 0 {
- break
- }
- n--
- }
- return nil, err
- }
- func (c *HostClient) cachedTLSConfig(addr string) *tls.Config {
- if !c.IsTLS {
- return nil
- }
- c.tlsConfigMapLock.Lock()
- if c.tlsConfigMap == nil {
- c.tlsConfigMap = make(map[string]*tls.Config)
- }
- cfg := c.tlsConfigMap[addr]
- if cfg == nil {
- cfg = newClientTLSConfig(c.TLSConfig, addr)
- c.tlsConfigMap[addr] = cfg
- }
- c.tlsConfigMapLock.Unlock()
- return cfg
- }
- // ErrTLSHandshakeTimeout indicates there is a timeout from tls handshake.
- var ErrTLSHandshakeTimeout = errors.New("tls handshake timed out")
- func tlsClientHandshake(rawConn net.Conn, tlsConfig *tls.Config, deadline time.Time) (_ net.Conn, retErr error) {
- defer func() {
- if retErr != nil {
- rawConn.Close()
- }
- }()
- conn := tls.Client(rawConn, tlsConfig)
- err := conn.SetDeadline(deadline)
- if err != nil {
- return nil, err
- }
- err = conn.Handshake()
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- return nil, ErrTLSHandshakeTimeout
- }
- if err != nil {
- return nil, err
- }
- err = conn.SetDeadline(time.Time{})
- if err != nil {
- return nil, err
- }
- return conn, nil
- }
- func dialAddr(addr string, dial DialFunc, dialDualStack, isTLS bool, tlsConfig *tls.Config, timeout time.Duration) (net.Conn, error) {
- deadline := time.Now().Add(timeout)
- if dial == nil {
- if dialDualStack {
- dial = DialDualStack
- } else {
- dial = Dial
- }
- addr = AddMissingPort(addr, isTLS)
- }
- conn, err := dial(addr)
- if err != nil {
- return nil, err
- }
- if conn == nil {
- return nil, errors.New("dialling unsuccessful. Please report this bug!")
- }
- // We assume that any conn that has the Handshake() method is a TLS conn already.
- // This doesn't cover just tls.Conn but also other TLS implementations.
- _, isTLSAlready := conn.(interface{ Handshake() error })
- if isTLS && !isTLSAlready {
- if timeout == 0 {
- return tls.Client(conn, tlsConfig), nil
- }
- return tlsClientHandshake(conn, tlsConfig, deadline)
- }
- return conn, nil
- }
- // AddMissingPort adds a port to a host if it is missing.
- // A literal IPv6 address in hostport must be enclosed in square
- // brackets, as in "[::1]:80", "[::1%lo0]:80".
- func AddMissingPort(addr string, isTLS bool) string {
- addrLen := len(addr)
- if addrLen == 0 {
- return addr
- }
- isIP6 := addr[0] == '['
- if isIP6 {
- // if the IPv6 has opening bracket but closing bracket is the last char then it doesn't have a port
- isIP6WithoutPort := addr[addrLen-1] == ']'
- if !isIP6WithoutPort {
- return addr
- }
- } else { // IPv4
- columnPos := strings.LastIndexByte(addr, ':')
- if columnPos > 0 {
- return addr
- }
- }
- port := ":80"
- if isTLS {
- port = ":443"
- }
- return addr + port
- }
- // A wantConn records state about a wanted connection
- // (that is, an active call to getConn).
- // The conn may be gotten by dialing or by finding an idle connection,
- // or a cancellation may make the conn no longer wanted.
- // These three options are racing against each other and use
- // wantConn to coordinate and agree about the winning outcome.
- //
- // inspired by net/http/transport.go
- type wantConn struct {
- ready chan struct{}
- mu sync.Mutex // protects conn, err, close(ready)
- conn *clientConn
- err error
- }
- // waiting reports whether w is still waiting for an answer (connection or error).
- func (w *wantConn) waiting() bool {
- select {
- case <-w.ready:
- return false
- default:
- return true
- }
- }
- // tryDeliver attempts to deliver conn, err to w and reports whether it succeeded.
- func (w *wantConn) tryDeliver(conn *clientConn, err error) bool {
- w.mu.Lock()
- defer w.mu.Unlock()
- if w.conn != nil || w.err != nil {
- return false
- }
- w.conn = conn
- w.err = err
- if w.conn == nil && w.err == nil {
- panic("fasthttp: internal error: misuse of tryDeliver")
- }
- close(w.ready)
- return true
- }
- // cancel marks w as no longer wanting a result (for example, due to cancellation).
- // If a connection has been delivered already, cancel returns it with c.releaseConn.
- func (w *wantConn) cancel(c *HostClient, err error) {
- w.mu.Lock()
- if w.conn == nil && w.err == nil {
- close(w.ready) // catch misbehavior in future delivery
- }
- conn := w.conn
- w.conn = nil
- w.err = err
- w.mu.Unlock()
- if conn != nil {
- c.releaseConn(conn)
- }
- }
- // A wantConnQueue is a queue of wantConns.
- //
- // inspired by net/http/transport.go
- type wantConnQueue struct {
- // This is a queue, not a dequeue.
- // It is split into two stages - head[headPos:] and tail.
- // popFront is trivial (headPos++) on the first stage, and
- // pushBack is trivial (append) on the second stage.
- // If the first stage is empty, popFront can swap the
- // first and second stages to remedy the situation.
- //
- // This two-stage split is analogous to the use of two lists
- // in Okasaki's purely functional queue but without the
- // overhead of reversing the list when swapping stages.
- head []*wantConn
- headPos int
- tail []*wantConn
- }
- // len returns the number of items in the queue.
- func (q *wantConnQueue) len() int {
- return len(q.head) - q.headPos + len(q.tail)
- }
- // pushBack adds w to the back of the queue.
- func (q *wantConnQueue) pushBack(w *wantConn) {
- q.tail = append(q.tail, w)
- }
- // popFront removes and returns the wantConn at the front of the queue.
- func (q *wantConnQueue) popFront() *wantConn {
- if q.headPos >= len(q.head) {
- if len(q.tail) == 0 {
- return nil
- }
- // Pick up tail as new head, clear tail.
- q.head, q.headPos, q.tail = q.tail, 0, q.head[:0]
- }
- w := q.head[q.headPos]
- q.head[q.headPos] = nil
- q.headPos++
- return w
- }
- // peekFront returns the wantConn at the front of the queue without removing it.
- func (q *wantConnQueue) peekFront() *wantConn {
- if q.headPos < len(q.head) {
- return q.head[q.headPos]
- }
- if len(q.tail) > 0 {
- return q.tail[0]
- }
- return nil
- }
- // clearFront pops any wantConns that are no longer waiting from the head of the
- // queue, reporting whether any were popped.
- func (q *wantConnQueue) clearFront() (cleaned bool) {
- for {
- w := q.peekFront()
- if w == nil || w.waiting() {
- return cleaned
- }
- q.popFront()
- cleaned = true
- }
- }
- // PipelineClient pipelines requests over a limited set of concurrent
- // connections to the given Addr.
- //
- // This client may be used in highly loaded HTTP-based RPC systems for reducing
- // context switches and network level overhead.
- // See https://en.wikipedia.org/wiki/HTTP_pipelining for details.
- //
- // It is forbidden copying PipelineClient instances. Create new instances
- // instead.
- //
- // It is safe calling PipelineClient methods from concurrently running
- // goroutines.
- type PipelineClient struct {
- noCopy noCopy
- // Address of the host to connect to.
- Addr string
- // PipelineClient name. Used in User-Agent request header.
- Name string
- // NoDefaultUserAgentHeader when set to true, causes the default
- // User-Agent header to be excluded from the Request.
- NoDefaultUserAgentHeader bool
- // The maximum number of concurrent connections to the Addr.
- //
- // A single connection is used by default.
- MaxConns int
- // The maximum number of pending pipelined requests over
- // a single connection to Addr.
- //
- // DefaultMaxPendingRequests is used by default.
- MaxPendingRequests int
- // The maximum delay before sending pipelined requests as a batch
- // to the server.
- //
- // By default requests are sent immediately to the server.
- MaxBatchDelay time.Duration
- // Callback for connection establishing to the host.
- //
- // Default Dial is used if not set.
- Dial DialFunc
- // Attempt to connect to both ipv4 and ipv6 host addresses
- // if set to true.
- //
- // This option is used only if default TCP dialer is used,
- // i.e. if Dial is blank.
- //
- // By default client connects only to ipv4 addresses,
- // since unfortunately ipv6 remains broken in many networks worldwide :)
- DialDualStack bool
- // Response header names are passed as-is without normalization
- // if this option is set.
- //
- // Disabled header names' normalization may be useful only for proxying
- // responses to other clients expecting case-sensitive
- // header names. See https://github.com/valyala/fasthttp/issues/57
- // for details.
- //
- // By default request and response header names are normalized, i.e.
- // The first letter and the first letters following dashes
- // are uppercased, while all the other letters are lowercased.
- // Examples:
- //
- // * HOST -> Host
- // * content-type -> Content-Type
- // * cONTENT-lenGTH -> Content-Length
- DisableHeaderNamesNormalizing bool
- // Path values are sent as-is without normalization
- //
- // Disabled path normalization may be useful for proxying incoming requests
- // to servers that are expecting paths to be forwarded as-is.
- //
- // By default path values are normalized, i.e.
- // extra slashes are removed, special characters are encoded.
- DisablePathNormalizing bool
- // Whether to use TLS (aka SSL or HTTPS) for host connections.
- IsTLS bool
- // Optional TLS config.
- TLSConfig *tls.Config
- // Idle connection to the host is closed after this duration.
- //
- // By default idle connection is closed after
- // DefaultMaxIdleConnDuration.
- MaxIdleConnDuration time.Duration
- // Buffer size for responses' reading.
- // This also limits the maximum header size.
- //
- // Default buffer size is used if 0.
- ReadBufferSize int
- // Buffer size for requests' writing.
- //
- // Default buffer size is used if 0.
- WriteBufferSize int
- // Maximum duration for full response reading (including body).
- //
- // By default response read timeout is unlimited.
- ReadTimeout time.Duration
- // Maximum duration for full request writing (including body).
- //
- // By default request write timeout is unlimited.
- WriteTimeout time.Duration
- // Logger for logging client errors.
- //
- // By default standard logger from log package is used.
- Logger Logger
- connClients []*pipelineConnClient
- connClientsLock sync.Mutex
- }
- type pipelineConnClient struct {
- noCopy noCopy
- Addr string
- Name string
- NoDefaultUserAgentHeader bool
- MaxPendingRequests int
- MaxBatchDelay time.Duration
- Dial DialFunc
- DialDualStack bool
- DisableHeaderNamesNormalizing bool
- DisablePathNormalizing bool
- IsTLS bool
- TLSConfig *tls.Config
- MaxIdleConnDuration time.Duration
- ReadBufferSize int
- WriteBufferSize int
- ReadTimeout time.Duration
- WriteTimeout time.Duration
- Logger Logger
- workPool sync.Pool
- chLock sync.Mutex
- chW chan *pipelineWork
- chR chan *pipelineWork
- tlsConfigLock sync.Mutex
- tlsConfig *tls.Config
- }
- type pipelineWork struct {
- reqCopy Request
- respCopy Response
- req *Request
- resp *Response
- t *time.Timer
- deadline time.Time
- err error
- done chan struct{}
- }
- // DoTimeout performs the given request and waits for response during
- // the given timeout duration.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned during
- // the given timeout.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) DoTimeout(req *Request, resp *Response, timeout time.Duration) error {
- return c.DoDeadline(req, resp, time.Now().Add(timeout))
- }
- // DoDeadline performs the given request and waits for response until
- // the given deadline.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects.
- //
- // Response is ignored if resp is nil.
- //
- // ErrTimeout is returned if the response wasn't returned until
- // the given deadline.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- return c.getConnClient().DoDeadline(req, resp, deadline)
- }
- func (c *pipelineConnClient) DoDeadline(req *Request, resp *Response, deadline time.Time) error {
- c.init()
- timeout := time.Until(deadline)
- if timeout <= 0 {
- return ErrTimeout
- }
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- w := c.acquirePipelineWork(timeout)
- w.respCopy.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
- w.req = &w.reqCopy
- w.resp = &w.respCopy
- // Make a copy of the request in order to avoid data races on timeouts
- req.copyToSkipBody(&w.reqCopy)
- swapRequestBody(req, &w.reqCopy)
- // Put the request to outgoing queue
- select {
- case c.chW <- w:
- // Fast path: len(c.ch) < cap(c.ch)
- default:
- // Slow path
- select {
- case c.chW <- w:
- case <-w.t.C:
- c.releasePipelineWork(w)
- return ErrTimeout
- }
- }
- // Wait for the response
- var err error
- select {
- case <-w.done:
- if resp != nil {
- w.respCopy.copyToSkipBody(resp)
- swapResponseBody(resp, &w.respCopy)
- }
- err = w.err
- c.releasePipelineWork(w)
- case <-w.t.C:
- err = ErrTimeout
- }
- return err
- }
- func (c *pipelineConnClient) acquirePipelineWork(timeout time.Duration) (w *pipelineWork) {
- v := c.workPool.Get()
- if v != nil {
- w = v.(*pipelineWork)
- } else {
- w = &pipelineWork{
- done: make(chan struct{}, 1),
- }
- }
- if timeout > 0 {
- if w.t == nil {
- w.t = time.NewTimer(timeout)
- } else {
- w.t.Reset(timeout)
- }
- w.deadline = time.Now().Add(timeout)
- } else {
- w.deadline = zeroTime
- }
- return w
- }
- func (c *pipelineConnClient) releasePipelineWork(w *pipelineWork) {
- if w.t != nil {
- w.t.Stop()
- }
- w.reqCopy.Reset()
- w.respCopy.Reset()
- w.req = nil
- w.resp = nil
- w.err = nil
- c.workPool.Put(w)
- }
- // Do performs the given http request and sets the corresponding response.
- //
- // Request must contain at least non-zero RequestURI with full url (including
- // scheme and host) or non-zero Host header + RequestURI.
- //
- // The function doesn't follow redirects. Use Get* for following redirects.
- //
- // Response is ignored if resp is nil.
- //
- // It is recommended obtaining req and resp via AcquireRequest
- // and AcquireResponse in performance-critical code.
- func (c *PipelineClient) Do(req *Request, resp *Response) error {
- return c.getConnClient().Do(req, resp)
- }
- func (c *pipelineConnClient) Do(req *Request, resp *Response) error {
- c.init()
- if c.DisablePathNormalizing {
- req.URI().DisablePathNormalizing = true
- }
- userAgentOld := req.Header.UserAgent()
- if len(userAgentOld) == 0 {
- userAgent := c.Name
- if userAgent == "" && !c.NoDefaultUserAgentHeader {
- userAgent = defaultUserAgent
- }
- if userAgent != "" {
- req.Header.userAgent = append(req.Header.userAgent[:0], userAgent...)
- }
- }
- w := c.acquirePipelineWork(0)
- w.req = req
- if resp != nil {
- resp.Header.disableNormalizing = c.DisableHeaderNamesNormalizing
- w.resp = resp
- } else {
- w.resp = &w.respCopy
- }
- // Put the request to outgoing queue
- select {
- case c.chW <- w:
- default:
- // Try substituting the oldest w with the current one.
- select {
- case wOld := <-c.chW:
- wOld.err = ErrPipelineOverflow
- wOld.done <- struct{}{}
- default:
- }
- select {
- case c.chW <- w:
- default:
- c.releasePipelineWork(w)
- return ErrPipelineOverflow
- }
- }
- // Wait for the response
- <-w.done
- err := w.err
- c.releasePipelineWork(w)
- return err
- }
- func (c *PipelineClient) getConnClient() *pipelineConnClient {
- c.connClientsLock.Lock()
- cc := c.getConnClientUnlocked()
- c.connClientsLock.Unlock()
- return cc
- }
- func (c *PipelineClient) getConnClientUnlocked() *pipelineConnClient {
- if len(c.connClients) == 0 {
- return c.newConnClient()
- }
- // Return the client with the minimum number of pending requests.
- minCC := c.connClients[0]
- minReqs := minCC.PendingRequests()
- if minReqs == 0 {
- return minCC
- }
- for i := 1; i < len(c.connClients); i++ {
- cc := c.connClients[i]
- reqs := cc.PendingRequests()
- if reqs == 0 {
- return cc
- }
- if reqs < minReqs {
- minCC = cc
- minReqs = reqs
- }
- }
- maxConns := c.MaxConns
- if maxConns <= 0 {
- maxConns = 1
- }
- if len(c.connClients) < maxConns {
- return c.newConnClient()
- }
- return minCC
- }
- func (c *PipelineClient) newConnClient() *pipelineConnClient {
- cc := &pipelineConnClient{
- Addr: c.Addr,
- Name: c.Name,
- NoDefaultUserAgentHeader: c.NoDefaultUserAgentHeader,
- MaxPendingRequests: c.MaxPendingRequests,
- MaxBatchDelay: c.MaxBatchDelay,
- Dial: c.Dial,
- DialDualStack: c.DialDualStack,
- DisableHeaderNamesNormalizing: c.DisableHeaderNamesNormalizing,
- DisablePathNormalizing: c.DisablePathNormalizing,
- IsTLS: c.IsTLS,
- TLSConfig: c.TLSConfig,
- MaxIdleConnDuration: c.MaxIdleConnDuration,
- ReadBufferSize: c.ReadBufferSize,
- WriteBufferSize: c.WriteBufferSize,
- ReadTimeout: c.ReadTimeout,
- WriteTimeout: c.WriteTimeout,
- Logger: c.Logger,
- }
- c.connClients = append(c.connClients, cc)
- return cc
- }
- // ErrPipelineOverflow may be returned from PipelineClient.Do*
- // if the requests' queue is overflowed.
- var ErrPipelineOverflow = errors.New("pipelined requests' queue has been overflowed. Increase MaxConns and/or MaxPendingRequests")
- // DefaultMaxPendingRequests is the default value
- // for PipelineClient.MaxPendingRequests.
- const DefaultMaxPendingRequests = 1024
- func (c *pipelineConnClient) init() {
- c.chLock.Lock()
- if c.chR == nil {
- maxPendingRequests := c.MaxPendingRequests
- if maxPendingRequests <= 0 {
- maxPendingRequests = DefaultMaxPendingRequests
- }
- c.chR = make(chan *pipelineWork, maxPendingRequests)
- if c.chW == nil {
- c.chW = make(chan *pipelineWork, maxPendingRequests)
- }
- go func() {
- // Keep restarting the worker if it fails (connection errors for example).
- for {
- if err := c.worker(); err != nil {
- c.logger().Printf("error in PipelineClient(%q): %v", c.Addr, err)
- if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
- // Throttle client reconnections on timeout errors
- time.Sleep(time.Second)
- }
- } else {
- c.chLock.Lock()
- stop := len(c.chR) == 0 && len(c.chW) == 0
- if !stop {
- c.chR = nil
- c.chW = nil
- }
- c.chLock.Unlock()
- if stop {
- break
- }
- }
- }
- }()
- }
- c.chLock.Unlock()
- }
- func (c *pipelineConnClient) worker() error {
- tlsConfig := c.cachedTLSConfig()
- conn, err := dialAddr(c.Addr, c.Dial, c.DialDualStack, c.IsTLS, tlsConfig, c.WriteTimeout)
- if err != nil {
- return err
- }
- // Start reader and writer
- stopW := make(chan struct{})
- doneW := make(chan error)
- go func() {
- doneW <- c.writer(conn, stopW)
- }()
- stopR := make(chan struct{})
- doneR := make(chan error)
- go func() {
- doneR <- c.reader(conn, stopR)
- }()
- // Wait until reader and writer are stopped
- select {
- case err = <-doneW:
- conn.Close()
- close(stopR)
- <-doneR
- case err = <-doneR:
- conn.Close()
- close(stopW)
- <-doneW
- }
- // Notify pending readers
- for len(c.chR) > 0 {
- w := <-c.chR
- w.err = errPipelineConnStopped
- w.done <- struct{}{}
- }
- return err
- }
- func (c *pipelineConnClient) cachedTLSConfig() *tls.Config {
- if !c.IsTLS {
- return nil
- }
- c.tlsConfigLock.Lock()
- cfg := c.tlsConfig
- if cfg == nil {
- cfg = newClientTLSConfig(c.TLSConfig, c.Addr)
- c.tlsConfig = cfg
- }
- c.tlsConfigLock.Unlock()
- return cfg
- }
- func (c *pipelineConnClient) writer(conn net.Conn, stopCh <-chan struct{}) error {
- writeBufferSize := c.WriteBufferSize
- if writeBufferSize <= 0 {
- writeBufferSize = defaultWriteBufferSize
- }
- bw := bufio.NewWriterSize(conn, writeBufferSize)
- defer bw.Flush()
- chR := c.chR
- chW := c.chW
- writeTimeout := c.WriteTimeout
- maxIdleConnDuration := c.MaxIdleConnDuration
- if maxIdleConnDuration <= 0 {
- maxIdleConnDuration = DefaultMaxIdleConnDuration
- }
- maxBatchDelay := c.MaxBatchDelay
- var (
- stopTimer = time.NewTimer(time.Hour)
- flushTimer = time.NewTimer(time.Hour)
- flushTimerCh <-chan time.Time
- instantTimerCh = make(chan time.Time)
- w *pipelineWork
- err error
- )
- close(instantTimerCh)
- for {
- againChW:
- select {
- case w = <-chW:
- // Fast path: len(chW) > 0
- default:
- // Slow path
- stopTimer.Reset(maxIdleConnDuration)
- select {
- case w = <-chW:
- case <-stopTimer.C:
- return nil
- case <-stopCh:
- return nil
- case <-flushTimerCh:
- if err = bw.Flush(); err != nil {
- return err
- }
- flushTimerCh = nil
- goto againChW
- }
- }
- if !w.deadline.IsZero() && time.Since(w.deadline) >= 0 {
- w.err = ErrTimeout
- w.done <- struct{}{}
- continue
- }
- w.resp.parseNetConn(conn)
- if writeTimeout > 0 {
- // Set Deadline every time, since golang has fixed the performance issue
- // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
- currentTime := time.Now()
- if err = conn.SetWriteDeadline(currentTime.Add(writeTimeout)); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- }
- if err = w.req.Write(bw); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- if flushTimerCh == nil && (len(chW) == 0 || len(chR) == cap(chR)) {
- if maxBatchDelay > 0 {
- flushTimer.Reset(maxBatchDelay)
- flushTimerCh = flushTimer.C
- } else {
- flushTimerCh = instantTimerCh
- }
- }
- againChR:
- select {
- case chR <- w:
- // Fast path: len(chR) < cap(chR)
- default:
- // Slow path
- select {
- case chR <- w:
- case <-stopCh:
- w.err = errPipelineConnStopped
- w.done <- struct{}{}
- return nil
- case <-flushTimerCh:
- if err = bw.Flush(); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- flushTimerCh = nil
- goto againChR
- }
- }
- }
- }
- func (c *pipelineConnClient) reader(conn net.Conn, stopCh <-chan struct{}) error {
- readBufferSize := c.ReadBufferSize
- if readBufferSize <= 0 {
- readBufferSize = defaultReadBufferSize
- }
- br := bufio.NewReaderSize(conn, readBufferSize)
- chR := c.chR
- readTimeout := c.ReadTimeout
- var (
- w *pipelineWork
- err error
- )
- for {
- select {
- case w = <-chR:
- // Fast path: len(chR) > 0
- default:
- // Slow path
- select {
- case w = <-chR:
- case <-stopCh:
- return nil
- }
- }
- if readTimeout > 0 {
- // Set Deadline every time, since golang has fixed the performance issue
- // See https://github.com/golang/go/issues/15133#issuecomment-271571395 for details
- currentTime := time.Now()
- if err = conn.SetReadDeadline(currentTime.Add(readTimeout)); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- }
- if err = w.resp.Read(br); err != nil {
- w.err = err
- w.done <- struct{}{}
- return err
- }
- w.done <- struct{}{}
- }
- }
- func (c *pipelineConnClient) logger() Logger {
- if c.Logger != nil {
- return c.Logger
- }
- return defaultLogger
- }
- // PendingRequests returns the current number of pending requests pipelined
- // to the server.
- //
- // This number may exceed MaxPendingRequests*MaxConns by up to two times, since
- // each connection to the server may keep up to MaxPendingRequests requests
- // in the queue before sending them to the server.
- //
- // This function may be used for balancing load among multiple PipelineClient
- // instances.
- func (c *PipelineClient) PendingRequests() int {
- c.connClientsLock.Lock()
- n := 0
- for _, cc := range c.connClients {
- n += cc.PendingRequests()
- }
- c.connClientsLock.Unlock()
- return n
- }
- func (c *pipelineConnClient) PendingRequests() int {
- c.init()
- c.chLock.Lock()
- n := len(c.chR) + len(c.chW)
- c.chLock.Unlock()
- return n
- }
- var errPipelineConnStopped = errors.New("pipeline connection has been stopped")
- var DefaultTransport RoundTripper = &transport{}
- type transport struct{}
- func (t *transport) RoundTrip(hc *HostClient, req *Request, resp *Response) (retry bool, err error) {
- customSkipBody := resp.SkipBody
- customStreamBody := resp.StreamBody
- var deadline time.Time
- if req.timeout > 0 {
- deadline = time.Now().Add(req.timeout)
- }
- cc, err := hc.acquireConn(req.timeout, req.ConnectionClose())
- if err != nil {
- return false, err
- }
- conn := cc.c
- resp.parseNetConn(conn)
- writeDeadline := deadline
- if hc.WriteTimeout > 0 {
- tmpWriteDeadline := time.Now().Add(hc.WriteTimeout)
- if writeDeadline.IsZero() || tmpWriteDeadline.Before(writeDeadline) {
- writeDeadline = tmpWriteDeadline
- }
- }
- if err = conn.SetWriteDeadline(writeDeadline); err != nil {
- hc.closeConn(cc)
- return true, err
- }
- resetConnection := false
- if hc.MaxConnDuration > 0 && time.Since(cc.createdTime) > hc.MaxConnDuration && !req.ConnectionClose() {
- req.SetConnectionClose()
- resetConnection = true
- }
- bw := hc.acquireWriter(conn)
- err = req.Write(bw)
- if resetConnection {
- req.Header.ResetConnectionClose()
- }
- if err == nil {
- err = bw.Flush()
- }
- hc.releaseWriter(bw)
- // Return ErrTimeout on any timeout.
- if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
- err = ErrTimeout
- }
- isConnRST := isConnectionReset(err)
- if err != nil && !isConnRST {
- hc.closeConn(cc)
- return true, err
- }
- readDeadline := deadline
- if hc.ReadTimeout > 0 {
- tmpReadDeadline := time.Now().Add(hc.ReadTimeout)
- if readDeadline.IsZero() || tmpReadDeadline.Before(readDeadline) {
- readDeadline = tmpReadDeadline
- }
- }
- if err = conn.SetReadDeadline(readDeadline); err != nil {
- hc.closeConn(cc)
- return true, err
- }
- if customSkipBody || req.Header.IsHead() {
- resp.SkipBody = true
- }
- if hc.DisableHeaderNamesNormalizing {
- resp.Header.DisableNormalizing()
- }
- br := hc.acquireReader(conn)
- err = resp.ReadLimitBody(br, hc.MaxResponseBodySize)
- if err != nil {
- hc.releaseReader(br)
- hc.closeConn(cc)
- // Don't retry in case of ErrBodyTooLarge since we will just get the same again.
- needRetry := err != ErrBodyTooLarge
- return needRetry, err
- }
- closeConn := resetConnection || req.ConnectionClose() || resp.ConnectionClose() || isConnRST
- if customStreamBody && resp.bodyStream != nil {
- rbs := resp.bodyStream
- resp.bodyStream = newCloseReader(rbs, func() error {
- hc.releaseReader(br)
- if r, ok := rbs.(*requestStream); ok {
- releaseRequestStream(r)
- }
- if closeConn || resp.ConnectionClose() {
- hc.closeConn(cc)
- } else {
- hc.releaseConn(cc)
- }
- return nil
- })
- return false, nil
- } else {
- hc.releaseReader(br)
- }
- if closeConn {
- hc.closeConn(cc)
- } else {
- hc.releaseConn(cc)
- }
- return false, nil
- }
|