tcpdialer.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. package fasthttp
  2. import (
  3. "context"
  4. "errors"
  5. "net"
  6. "strconv"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. )
  11. // Dial dials the given TCP addr using tcp4.
  12. //
  13. // This function has the following additional features comparing to net.Dial:
  14. //
  15. // - It reduces load on DNS resolver by caching resolved TCP addressed
  16. // for DNSCacheDuration.
  17. // - It dials all the resolved TCP addresses in round-robin manner until
  18. // connection is established. This may be useful if certain addresses
  19. // are temporarily unreachable.
  20. // - It returns ErrDialTimeout if connection cannot be established during
  21. // DefaultDialTimeout seconds. Use DialTimeout for customizing dial timeout.
  22. //
  23. // This dialer is intended for custom code wrapping before passing
  24. // to Client.Dial or HostClient.Dial.
  25. //
  26. // For instance, per-host counters and/or limits may be implemented
  27. // by such wrappers.
  28. //
  29. // The addr passed to the function must contain port. Example addr values:
  30. //
  31. // - foobar.baz:443
  32. // - foo.bar:80
  33. // - aaa.com:8080
  34. func Dial(addr string) (net.Conn, error) {
  35. return defaultDialer.Dial(addr)
  36. }
  37. // DialTimeout dials the given TCP addr using tcp4 using the given timeout.
  38. //
  39. // This function has the following additional features comparing to net.Dial:
  40. //
  41. // - It reduces load on DNS resolver by caching resolved TCP addressed
  42. // for DNSCacheDuration.
  43. // - It dials all the resolved TCP addresses in round-robin manner until
  44. // connection is established. This may be useful if certain addresses
  45. // are temporarily unreachable.
  46. //
  47. // This dialer is intended for custom code wrapping before passing
  48. // to Client.DialTimeout or HostClient.DialTimeout.
  49. //
  50. // For instance, per-host counters and/or limits may be implemented
  51. // by such wrappers.
  52. //
  53. // The addr passed to the function must contain port. Example addr values:
  54. //
  55. // - foobar.baz:443
  56. // - foo.bar:80
  57. // - aaa.com:8080
  58. func DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  59. return defaultDialer.DialTimeout(addr, timeout)
  60. }
  61. // DialDualStack dials the given TCP addr using both tcp4 and tcp6.
  62. //
  63. // This function has the following additional features comparing to net.Dial:
  64. //
  65. // - It reduces load on DNS resolver by caching resolved TCP addressed
  66. // for DNSCacheDuration.
  67. // - It dials all the resolved TCP addresses in round-robin manner until
  68. // connection is established. This may be useful if certain addresses
  69. // are temporarily unreachable.
  70. // - It returns ErrDialTimeout if connection cannot be established during
  71. // DefaultDialTimeout seconds. Use DialDualStackTimeout for custom dial
  72. // timeout.
  73. //
  74. // This dialer is intended for custom code wrapping before passing
  75. // to Client.Dial or HostClient.Dial.
  76. //
  77. // For instance, per-host counters and/or limits may be implemented
  78. // by such wrappers.
  79. //
  80. // The addr passed to the function must contain port. Example addr values:
  81. //
  82. // - foobar.baz:443
  83. // - foo.bar:80
  84. // - aaa.com:8080
  85. func DialDualStack(addr string) (net.Conn, error) {
  86. return defaultDialer.DialDualStack(addr)
  87. }
  88. // DialDualStackTimeout dials the given TCP addr using both tcp4 and tcp6
  89. // using the given timeout.
  90. //
  91. // This function has the following additional features comparing to net.Dial:
  92. //
  93. // - It reduces load on DNS resolver by caching resolved TCP addressed
  94. // for DNSCacheDuration.
  95. // - It dials all the resolved TCP addresses in round-robin manner until
  96. // connection is established. This may be useful if certain addresses
  97. // are temporarily unreachable.
  98. //
  99. // This dialer is intended for custom code wrapping before passing
  100. // to Client.DialTimeout or HostClient.DialTimeout.
  101. //
  102. // For instance, per-host counters and/or limits may be implemented
  103. // by such wrappers.
  104. //
  105. // The addr passed to the function must contain port. Example addr values:
  106. //
  107. // - foobar.baz:443
  108. // - foo.bar:80
  109. // - aaa.com:8080
  110. func DialDualStackTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  111. return defaultDialer.DialDualStackTimeout(addr, timeout)
  112. }
  113. var defaultDialer = &TCPDialer{Concurrency: 1000}
  114. // Resolver represents interface of the tcp resolver.
  115. type Resolver interface {
  116. LookupIPAddr(context.Context, string) (names []net.IPAddr, err error)
  117. }
  118. // TCPDialer contains options to control a group of Dial calls.
  119. type TCPDialer struct {
  120. // Concurrency controls the maximum number of concurrent Dials
  121. // that can be performed using this object.
  122. // Setting this to 0 means unlimited.
  123. //
  124. // WARNING: This can only be changed before the first Dial.
  125. // Changes made after the first Dial will not affect anything.
  126. Concurrency int
  127. // LocalAddr is the local address to use when dialing an
  128. // address.
  129. // If nil, a local address is automatically chosen.
  130. LocalAddr *net.TCPAddr
  131. // This may be used to override DNS resolving policy, like this:
  132. // var dialer = &fasthttp.TCPDialer{
  133. // Resolver: &net.Resolver{
  134. // PreferGo: true,
  135. // StrictErrors: false,
  136. // Dial: func (ctx context.Context, network, address string) (net.Conn, error) {
  137. // d := net.Dialer{}
  138. // return d.DialContext(ctx, "udp", "8.8.8.8:53")
  139. // },
  140. // },
  141. // }
  142. Resolver Resolver
  143. // DisableDNSResolution may be used to disable DNS resolution
  144. DisableDNSResolution bool
  145. // DNSCacheDuration may be used to override the default DNS cache duration (DefaultDNSCacheDuration)
  146. DNSCacheDuration time.Duration
  147. tcpAddrsMap sync.Map
  148. concurrencyCh chan struct{}
  149. once sync.Once
  150. }
  151. // Dial dials the given TCP addr using tcp4.
  152. //
  153. // This function has the following additional features comparing to net.Dial:
  154. //
  155. // - It reduces load on DNS resolver by caching resolved TCP addressed
  156. // for DNSCacheDuration.
  157. // - It dials all the resolved TCP addresses in round-robin manner until
  158. // connection is established. This may be useful if certain addresses
  159. // are temporarily unreachable.
  160. // - It returns ErrDialTimeout if connection cannot be established during
  161. // DefaultDialTimeout seconds. Use DialTimeout for customizing dial timeout.
  162. //
  163. // This dialer is intended for custom code wrapping before passing
  164. // to Client.Dial or HostClient.Dial.
  165. //
  166. // For instance, per-host counters and/or limits may be implemented
  167. // by such wrappers.
  168. //
  169. // The addr passed to the function must contain port. Example addr values:
  170. //
  171. // - foobar.baz:443
  172. // - foo.bar:80
  173. // - aaa.com:8080
  174. func (d *TCPDialer) Dial(addr string) (net.Conn, error) {
  175. return d.dial(addr, false, DefaultDialTimeout)
  176. }
  177. // DialTimeout dials the given TCP addr using tcp4 using the given timeout.
  178. //
  179. // This function has the following additional features comparing to net.Dial:
  180. //
  181. // - It reduces load on DNS resolver by caching resolved TCP addressed
  182. // for DNSCacheDuration.
  183. // - It dials all the resolved TCP addresses in round-robin manner until
  184. // connection is established. This may be useful if certain addresses
  185. // are temporarily unreachable.
  186. //
  187. // This dialer is intended for custom code wrapping before passing
  188. // to Client.DialTimeout or HostClient.DialTimeout.
  189. //
  190. // For instance, per-host counters and/or limits may be implemented
  191. // by such wrappers.
  192. //
  193. // The addr passed to the function must contain port. Example addr values:
  194. //
  195. // - foobar.baz:443
  196. // - foo.bar:80
  197. // - aaa.com:8080
  198. func (d *TCPDialer) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  199. return d.dial(addr, false, timeout)
  200. }
  201. // DialDualStack dials the given TCP addr using both tcp4 and tcp6.
  202. //
  203. // This function has the following additional features comparing to net.Dial:
  204. //
  205. // - It reduces load on DNS resolver by caching resolved TCP addressed
  206. // for DNSCacheDuration.
  207. // - It dials all the resolved TCP addresses in round-robin manner until
  208. // connection is established. This may be useful if certain addresses
  209. // are temporarily unreachable.
  210. // - It returns ErrDialTimeout if connection cannot be established during
  211. // DefaultDialTimeout seconds. Use DialDualStackTimeout for custom dial
  212. // timeout.
  213. //
  214. // This dialer is intended for custom code wrapping before passing
  215. // to Client.Dial or HostClient.Dial.
  216. //
  217. // For instance, per-host counters and/or limits may be implemented
  218. // by such wrappers.
  219. //
  220. // The addr passed to the function must contain port. Example addr values:
  221. //
  222. // - foobar.baz:443
  223. // - foo.bar:80
  224. // - aaa.com:8080
  225. func (d *TCPDialer) DialDualStack(addr string) (net.Conn, error) {
  226. return d.dial(addr, true, DefaultDialTimeout)
  227. }
  228. // DialDualStackTimeout dials the given TCP addr using both tcp4 and tcp6
  229. // using the given timeout.
  230. //
  231. // This function has the following additional features comparing to net.Dial:
  232. //
  233. // - It reduces load on DNS resolver by caching resolved TCP addressed
  234. // for DNSCacheDuration.
  235. // - It dials all the resolved TCP addresses in round-robin manner until
  236. // connection is established. This may be useful if certain addresses
  237. // are temporarily unreachable.
  238. //
  239. // This dialer is intended for custom code wrapping before passing
  240. // to Client.DialTimeout or HostClient.DialTimeout.
  241. //
  242. // For instance, per-host counters and/or limits may be implemented
  243. // by such wrappers.
  244. //
  245. // The addr passed to the function must contain port. Example addr values:
  246. //
  247. // - foobar.baz:443
  248. // - foo.bar:80
  249. // - aaa.com:8080
  250. func (d *TCPDialer) DialDualStackTimeout(addr string, timeout time.Duration) (net.Conn, error) {
  251. return d.dial(addr, true, timeout)
  252. }
  253. func (d *TCPDialer) dial(addr string, dualStack bool, timeout time.Duration) (net.Conn, error) {
  254. d.once.Do(func() {
  255. if d.Concurrency > 0 {
  256. d.concurrencyCh = make(chan struct{}, d.Concurrency)
  257. }
  258. if d.DNSCacheDuration == 0 {
  259. d.DNSCacheDuration = DefaultDNSCacheDuration
  260. }
  261. if !d.DisableDNSResolution {
  262. go d.tcpAddrsClean()
  263. }
  264. })
  265. deadline := time.Now().Add(timeout)
  266. network := "tcp4"
  267. if dualStack {
  268. network = "tcp"
  269. }
  270. if d.DisableDNSResolution {
  271. return d.tryDial(network, addr, deadline, d.concurrencyCh)
  272. }
  273. addrs, idx, err := d.getTCPAddrs(addr, dualStack, deadline)
  274. if err != nil {
  275. return nil, err
  276. }
  277. var conn net.Conn
  278. n := uint32(len(addrs))
  279. for n > 0 {
  280. conn, err = d.tryDial(network, addrs[idx%n].String(), deadline, d.concurrencyCh)
  281. if err == nil {
  282. return conn, nil
  283. }
  284. if err == ErrDialTimeout {
  285. return nil, err
  286. }
  287. idx++
  288. n--
  289. }
  290. return nil, err
  291. }
  292. func (d *TCPDialer) tryDial(
  293. network string, addr string, deadline time.Time, concurrencyCh chan struct{},
  294. ) (net.Conn, error) {
  295. timeout := time.Until(deadline)
  296. if timeout <= 0 {
  297. return nil, ErrDialTimeout
  298. }
  299. if concurrencyCh != nil {
  300. select {
  301. case concurrencyCh <- struct{}{}:
  302. default:
  303. tc := AcquireTimer(timeout)
  304. isTimeout := false
  305. select {
  306. case concurrencyCh <- struct{}{}:
  307. case <-tc.C:
  308. isTimeout = true
  309. }
  310. ReleaseTimer(tc)
  311. if isTimeout {
  312. return nil, ErrDialTimeout
  313. }
  314. }
  315. defer func() { <-concurrencyCh }()
  316. }
  317. dialer := net.Dialer{}
  318. if d.LocalAddr != nil {
  319. dialer.LocalAddr = d.LocalAddr
  320. }
  321. ctx, cancelCtx := context.WithDeadline(context.Background(), deadline)
  322. defer cancelCtx()
  323. conn, err := dialer.DialContext(ctx, network, addr)
  324. if err != nil && ctx.Err() == context.DeadlineExceeded {
  325. return nil, ErrDialTimeout
  326. }
  327. return conn, err
  328. }
  329. // ErrDialTimeout is returned when TCP dialing is timed out.
  330. var ErrDialTimeout = errors.New("dialing to the given TCP address timed out")
  331. // DefaultDialTimeout is timeout used by Dial and DialDualStack
  332. // for establishing TCP connections.
  333. const DefaultDialTimeout = 3 * time.Second
  334. type tcpAddrEntry struct {
  335. addrs []net.TCPAddr
  336. addrsIdx uint32
  337. pending int32
  338. resolveTime time.Time
  339. }
  340. // DefaultDNSCacheDuration is the duration for caching resolved TCP addresses
  341. // by Dial* functions.
  342. const DefaultDNSCacheDuration = time.Minute
  343. func (d *TCPDialer) tcpAddrsClean() {
  344. expireDuration := 2 * d.DNSCacheDuration
  345. for {
  346. time.Sleep(time.Second)
  347. t := time.Now()
  348. d.tcpAddrsMap.Range(func(k, v any) bool {
  349. if e, ok := v.(*tcpAddrEntry); ok && t.Sub(e.resolveTime) > expireDuration {
  350. d.tcpAddrsMap.Delete(k)
  351. }
  352. return true
  353. })
  354. }
  355. }
  356. func (d *TCPDialer) getTCPAddrs(addr string, dualStack bool, deadline time.Time) ([]net.TCPAddr, uint32, error) {
  357. item, exist := d.tcpAddrsMap.Load(addr)
  358. e, ok := item.(*tcpAddrEntry)
  359. if exist && ok && e != nil && time.Since(e.resolveTime) > d.DNSCacheDuration {
  360. // Only let one goroutine re-resolve at a time.
  361. if atomic.SwapInt32(&e.pending, 1) == 0 {
  362. e = nil
  363. }
  364. }
  365. if e == nil {
  366. addrs, err := resolveTCPAddrs(addr, dualStack, d.Resolver, deadline)
  367. if err != nil {
  368. item, exist := d.tcpAddrsMap.Load(addr)
  369. e, ok = item.(*tcpAddrEntry)
  370. if exist && ok && e != nil {
  371. // Set pending to 0 so another goroutine can retry.
  372. atomic.StoreInt32(&e.pending, 0)
  373. }
  374. return nil, 0, err
  375. }
  376. e = &tcpAddrEntry{
  377. addrs: addrs,
  378. resolveTime: time.Now(),
  379. }
  380. d.tcpAddrsMap.Store(addr, e)
  381. }
  382. idx := atomic.AddUint32(&e.addrsIdx, 1)
  383. return e.addrs, idx, nil
  384. }
  385. func resolveTCPAddrs(addr string, dualStack bool, resolver Resolver, deadline time.Time) ([]net.TCPAddr, error) {
  386. host, portS, err := net.SplitHostPort(addr)
  387. if err != nil {
  388. return nil, err
  389. }
  390. port, err := strconv.Atoi(portS)
  391. if err != nil {
  392. return nil, err
  393. }
  394. if resolver == nil {
  395. resolver = net.DefaultResolver
  396. }
  397. ctx, cancel := context.WithDeadline(context.Background(), deadline)
  398. defer cancel()
  399. ipaddrs, err := resolver.LookupIPAddr(ctx, host)
  400. if err != nil {
  401. return nil, err
  402. }
  403. n := len(ipaddrs)
  404. addrs := make([]net.TCPAddr, 0, n)
  405. for i := 0; i < n; i++ {
  406. ip := ipaddrs[i]
  407. if !dualStack && ip.IP.To4() == nil {
  408. continue
  409. }
  410. addrs = append(addrs, net.TCPAddr{
  411. IP: ip.IP,
  412. Port: port,
  413. Zone: ip.Zone,
  414. })
  415. }
  416. if len(addrs) == 0 {
  417. return nil, errNoDNSEntries
  418. }
  419. return addrs, nil
  420. }
  421. var errNoDNSEntries = errors.New("couldn't find DNS entries for the given domain. Try using DialDualStack")