pipeconns.go 6.6 KB


  1. package fasthttputil
  2. import (
  3. "errors"
  4. "io"
  5. "net"
  6. "sync"
  7. "time"
  8. )
  9. // NewPipeConns returns new bi-directional connection pipe.
  10. //
  11. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  12. func NewPipeConns() *PipeConns {
  13. ch1 := make(chan *byteBuffer, 4)
  14. ch2 := make(chan *byteBuffer, 4)
  15. pc := &PipeConns{
  16. stopCh: make(chan struct{}),
  17. }
  18. pc.c1.rCh = ch1
  19. pc.c1.wCh = ch2
  20. pc.c2.rCh = ch2
  21. pc.c2.wCh = ch1
  22. pc.c1.pc = pc
  23. pc.c2.pc = pc
  24. return pc
  25. }
  26. // PipeConns provides bi-directional connection pipe,
  27. // which use in-process memory as a transport.
  28. //
  29. // PipeConns must be created by calling NewPipeConns.
  30. //
  31. // PipeConns has the following additional features comparing to connections
  32. // returned from net.Pipe():
  33. //
  34. // - It is faster.
  35. // - It buffers Write calls, so there is no need to have concurrent goroutine
  36. // calling Read in order to unblock each Write call.
  37. // - It supports read and write deadlines.
  38. //
  39. // PipeConns is NOT safe for concurrent use by multiple goroutines!
  40. type PipeConns struct {
  41. c1 pipeConn
  42. c2 pipeConn
  43. stopCh chan struct{}
  44. stopChLock sync.Mutex
  45. }
  46. // SetAddresses sets the local and remote addresses for the connection.
  47. func (pc *PipeConns) SetAddresses(localAddr1, remoteAddr1, localAddr2, remoteAddr2 net.Addr) {
  48. pc.c1.addrLock.Lock()
  49. defer pc.c1.addrLock.Unlock()
  50. pc.c2.addrLock.Lock()
  51. defer pc.c2.addrLock.Unlock()
  52. pc.c1.localAddr = localAddr1
  53. pc.c1.remoteAddr = remoteAddr1
  54. pc.c2.localAddr = localAddr2
  55. pc.c2.remoteAddr = remoteAddr2
  56. }
  57. // Conn1 returns the first end of bi-directional pipe.
  58. //
  59. // Data written to Conn1 may be read from Conn2.
  60. // Data written to Conn2 may be read from Conn1.
  61. func (pc *PipeConns) Conn1() net.Conn {
  62. return &pc.c1
  63. }
  64. // Conn2 returns the second end of bi-directional pipe.
  65. //
  66. // Data written to Conn2 may be read from Conn1.
  67. // Data written to Conn1 may be read from Conn2.
  68. func (pc *PipeConns) Conn2() net.Conn {
  69. return &pc.c2
  70. }
  71. // Close closes pipe connections.
  72. func (pc *PipeConns) Close() error {
  73. pc.stopChLock.Lock()
  74. select {
  75. case <-pc.stopCh:
  76. default:
  77. close(pc.stopCh)
  78. }
  79. pc.stopChLock.Unlock()
  80. return nil
  81. }
  82. type pipeConn struct {
  83. b *byteBuffer
  84. bb []byte
  85. rCh chan *byteBuffer
  86. wCh chan *byteBuffer
  87. pc *PipeConns
  88. readDeadlineTimer *time.Timer
  89. writeDeadlineTimer *time.Timer
  90. readDeadlineCh <-chan time.Time
  91. writeDeadlineCh <-chan time.Time
  92. readDeadlineChLock sync.Mutex
  93. localAddr net.Addr
  94. remoteAddr net.Addr
  95. addrLock sync.RWMutex
  96. }
  97. func (c *pipeConn) Write(p []byte) (int, error) {
  98. b := acquireByteBuffer()
  99. b.b = append(b.b[:0], p...)
  100. select {
  101. case <-c.pc.stopCh:
  102. releaseByteBuffer(b)
  103. return 0, errConnectionClosed
  104. default:
  105. }
  106. select {
  107. case c.wCh <- b:
  108. default:
  109. select {
  110. case c.wCh <- b:
  111. case <-c.writeDeadlineCh:
  112. c.writeDeadlineCh = closedDeadlineCh
  113. return 0, ErrTimeout
  114. case <-c.pc.stopCh:
  115. releaseByteBuffer(b)
  116. return 0, errConnectionClosed
  117. }
  118. }
  119. return len(p), nil
  120. }
  121. func (c *pipeConn) Read(p []byte) (int, error) {
  122. mayBlock := true
  123. nn := 0
  124. for len(p) > 0 {
  125. n, err := c.read(p, mayBlock)
  126. nn += n
  127. if err != nil {
  128. if !mayBlock && err == errWouldBlock {
  129. err = nil
  130. }
  131. return nn, err
  132. }
  133. p = p[n:]
  134. mayBlock = false
  135. }
  136. return nn, nil
  137. }
  138. func (c *pipeConn) read(p []byte, mayBlock bool) (int, error) {
  139. if len(c.bb) == 0 {
  140. if err := c.readNextByteBuffer(mayBlock); err != nil {
  141. return 0, err
  142. }
  143. }
  144. n := copy(p, c.bb)
  145. c.bb = c.bb[n:]
  146. return n, nil
  147. }
  148. func (c *pipeConn) readNextByteBuffer(mayBlock bool) error {
  149. releaseByteBuffer(c.b)
  150. c.b = nil
  151. select {
  152. case c.b = <-c.rCh:
  153. default:
  154. if !mayBlock {
  155. return errWouldBlock
  156. }
  157. c.readDeadlineChLock.Lock()
  158. readDeadlineCh := c.readDeadlineCh
  159. c.readDeadlineChLock.Unlock()
  160. select {
  161. case c.b = <-c.rCh:
  162. case <-readDeadlineCh:
  163. c.readDeadlineChLock.Lock()
  164. c.readDeadlineCh = closedDeadlineCh
  165. c.readDeadlineChLock.Unlock()
  166. // rCh may contain data when deadline is reached.
  167. // Read the data before returning ErrTimeout.
  168. select {
  169. case c.b = <-c.rCh:
  170. default:
  171. return ErrTimeout
  172. }
  173. case <-c.pc.stopCh:
  174. // rCh may contain data when stopCh is closed.
  175. // Read the data before returning EOF.
  176. select {
  177. case c.b = <-c.rCh:
  178. default:
  179. return io.EOF
  180. }
  181. }
  182. }
  183. c.bb = c.b.b
  184. return nil
  185. }
  186. var (
  187. errWouldBlock = errors.New("would block")
  188. errConnectionClosed = errors.New("connection closed")
  189. )
  190. type timeoutError struct{}
  191. func (e *timeoutError) Error() string {
  192. return "timeout"
  193. }
  194. // Only implement the Timeout() function of the net.Error interface.
  195. // This allows for checks like:
  196. //
  197. // if x, ok := err.(interface{ Timeout() bool }); ok && x.Timeout() {
  198. func (e *timeoutError) Timeout() bool {
  199. return true
  200. }
  201. // ErrTimeout is returned from Read() or Write() on timeout.
  202. var ErrTimeout = &timeoutError{}
  203. func (c *pipeConn) Close() error {
  204. return c.pc.Close()
  205. }
  206. func (c *pipeConn) LocalAddr() net.Addr {
  207. c.addrLock.RLock()
  208. defer c.addrLock.RUnlock()
  209. if c.localAddr != nil {
  210. return c.localAddr
  211. }
  212. return pipeAddr(0)
  213. }
  214. func (c *pipeConn) RemoteAddr() net.Addr {
  215. c.addrLock.RLock()
  216. defer c.addrLock.RUnlock()
  217. if c.remoteAddr != nil {
  218. return c.remoteAddr
  219. }
  220. return pipeAddr(0)
  221. }
  222. func (c *pipeConn) SetDeadline(deadline time.Time) error {
  223. c.SetReadDeadline(deadline) //nolint:errcheck
  224. c.SetWriteDeadline(deadline) //nolint:errcheck
  225. return nil
  226. }
  227. func (c *pipeConn) SetReadDeadline(deadline time.Time) error {
  228. if c.readDeadlineTimer == nil {
  229. c.readDeadlineTimer = time.NewTimer(time.Hour)
  230. }
  231. readDeadlineCh := updateTimer(c.readDeadlineTimer, deadline)
  232. c.readDeadlineChLock.Lock()
  233. c.readDeadlineCh = readDeadlineCh
  234. c.readDeadlineChLock.Unlock()
  235. return nil
  236. }
  237. func (c *pipeConn) SetWriteDeadline(deadline time.Time) error {
  238. if c.writeDeadlineTimer == nil {
  239. c.writeDeadlineTimer = time.NewTimer(time.Hour)
  240. }
  241. c.writeDeadlineCh = updateTimer(c.writeDeadlineTimer, deadline)
  242. return nil
  243. }
  244. func updateTimer(t *time.Timer, deadline time.Time) <-chan time.Time {
  245. if !t.Stop() {
  246. select {
  247. case <-t.C:
  248. default:
  249. }
  250. }
  251. if deadline.IsZero() {
  252. return nil
  253. }
  254. d := time.Until(deadline)
  255. if d <= 0 {
  256. return closedDeadlineCh
  257. }
  258. t.Reset(d)
  259. return t.C
  260. }
  261. var closedDeadlineCh = func() <-chan time.Time {
  262. ch := make(chan time.Time)
  263. close(ch)
  264. return ch
  265. }()
  266. type pipeAddr int
  267. func (pipeAddr) Network() string {
  268. return "pipe"
  269. }
  270. func (pipeAddr) String() string {
  271. return "pipe"
  272. }
  273. type byteBuffer struct {
  274. b []byte
  275. }
  276. func acquireByteBuffer() *byteBuffer {
  277. return byteBufferPool.Get().(*byteBuffer)
  278. }
  279. func releaseByteBuffer(b *byteBuffer) {
  280. if b != nil {
  281. byteBufferPool.Put(b)
  282. }
  283. }
  284. var byteBufferPool = &sync.Pool{
  285. New: func() interface{} {
  286. return &byteBuffer{
  287. b: make([]byte, 1024),
  288. }
  289. },
  290. }