web_socket.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. // package web_socket -- реализация высокоуровнего веб-сокета для работы десктопа
  2. package web_socket
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "log"
  7. "net/url"
  8. "os"
  9. "sync"
  10. "time"
  11. "github.com/gorilla/websocket"
  12. "wartank/pkg/components/safebool"
  13. "wartank/pkg/types"
  14. )
  15. const (
  16. strWebSocket = "web_socket"
  17. TypeMsgBin = 2
  18. )
  19. // WebSocket -- реализация высокоуровнего веб-сокета для работы десктопа
  20. type WebSocket struct {
  21. kern types.IKernel
  22. slog types.ISlog
  23. url string
  24. isConnect *safebool.SafeBool
  25. ws *websocket.Conn
  26. block sync.RWMutex
  27. }
  28. // NewWebSocket -- возвращает новый веб-сокет
  29. func NewWebSocket(kern types.IKernel) (*WebSocket, error) {
  30. log.Println("NewWebSocket()")
  31. if kern == nil {
  32. return nil, fmt.Errorf("NewWebSocket(): IKernel == nil")
  33. }
  34. url := os.Getenv("SERVER_URL")
  35. if url == "" {
  36. return nil, fmt.Errorf("NewWebSocket(): env SERVER_URL not set")
  37. }
  38. sf := &WebSocket{
  39. kern: kern,
  40. slog: kern.Slog(),
  41. url: url,
  42. isConnect: safebool.NewSafeBool(),
  43. }
  44. sf.connect()
  45. go sf.close()
  46. return sf, nil
  47. }
  48. // Подключает веб-сокет к серверу
  49. func (sf *WebSocket) connect() {
  50. log.Println("WebSocket.connect()")
  51. fnConnect := func() {
  52. u := url.URL{Scheme: "ws", Host: sf.url, Path: "/api/ws"}
  53. strUrl := u.String()
  54. log.Printf("WebSocket.connect(): wait connect to %q\n", strUrl)
  55. var err error
  56. sf.ws, _, err = websocket.DefaultDialer.Dial(strUrl, nil)
  57. if err != nil {
  58. log.Printf("WebSocket.connect(): in dial, err=\n\t%v\n", err)
  59. time.Sleep(time.Second * 2)
  60. return
  61. }
  62. sf.isConnect.Set()
  63. log.Println("WebSocket.connect(): ok")
  64. }
  65. for !sf.isConnect.Get() {
  66. select {
  67. case <-sf.kern.CtxApp().Done():
  68. return
  69. default:
  70. fnConnect()
  71. }
  72. }
  73. }
  74. // Read -- потокобезопасное чтение топика сервера
  75. func (sf *WebSocket) Read(topic string) (map[string]string, error) {
  76. sf.block.Lock()
  77. defer sf.block.Unlock()
  78. dictResp, err := sf.read(topic)
  79. if err != nil {
  80. return nil, fmt.Errorf("WebSocket.Read(): in read, err=\n\t%w", err)
  81. }
  82. return dictResp, nil
  83. }
  84. // Скрытая потоко-небезопасна функция
  85. func (sf *WebSocket) read(topic string) (dictResp map[string]string, err error) {
  86. var binResp []byte
  87. for {
  88. dictReq := make(map[string]string)
  89. dictReq["topic"] = topic
  90. binReq, err := json.Marshal(dictReq)
  91. if err != nil {
  92. return nil, fmt.Errorf("WebSocket.read(): in marshall topic(%q), err=\n\t%w", topic, err)
  93. }
  94. err = sf.ws.WriteMessage(TypeMsgBin, binReq)
  95. if err != nil {
  96. sf.slog.Errorf("WebSocket.read(): in write msg, err=\n\t%v\n", err)
  97. sf.ws.Close()
  98. sf.isConnect.Reset()
  99. sf.connect()
  100. continue
  101. }
  102. _, binResp, err = sf.ws.ReadMessage()
  103. if err != nil {
  104. sf.slog.Errorf("WebSocket.read(): in read msg, err=\n\t%v\n", err)
  105. sf.ws.Close()
  106. sf.isConnect.Reset()
  107. sf.connect()
  108. continue
  109. }
  110. break
  111. }
  112. dictResp = make(map[string]string)
  113. err = json.Unmarshal(binResp, &dictResp)
  114. if err != nil {
  115. return nil, fmt.Errorf("WebSocket.read(): in unmarshal binResp, err=\n\t%w", err)
  116. }
  117. return dictResp, nil
  118. }
  119. // Write -- потокобезопасная запись топика
  120. func (sf *WebSocket) Write(topic string, dictReq map[string]string) error {
  121. sf.block.Lock()
  122. defer sf.block.Unlock()
  123. err := sf.write(topic, dictReq)
  124. if err != nil {
  125. return fmt.Errorf("WebSocket.Write(): in write, err=\n\t%w", err)
  126. }
  127. return nil
  128. }
  129. // Скрытая потоко-небезопасна функция
  130. func (sf *WebSocket) write(topic string, dictReq map[string]string) error {
  131. dictReq["topic"] = topic
  132. binData, err := json.Marshal(dictReq)
  133. if err != nil {
  134. return fmt.Errorf("WebSocket.write(): in marshal msg, err=\n\t%w", err)
  135. }
  136. for {
  137. err = sf.ws.WriteMessage(TypeMsgBin, binData)
  138. if err != nil {
  139. sf.slog.Errorf("WebSocket.write(): in write msg, err=\n\t%v\n", err)
  140. sf.isConnect.Reset()
  141. sf.ws.Close()
  142. sf.connect()
  143. continue
  144. }
  145. return nil
  146. }
  147. }
  148. // IsConnect -- потокобезопасный признак подключенности сервера
  149. func (sf *WebSocket) IsConnect() bool {
  150. sf.block.RLock()
  151. defer sf.block.RUnlock()
  152. return sf.isConnect.Get()
  153. }
  154. // Call -- потокобезопасный вызов удалённого топика
  155. func (sf *WebSocket) Call(topic string, dictReq map[string]string) (dictResp map[string]string, err error) {
  156. sf.block.Lock()
  157. defer sf.block.Unlock()
  158. var binResp []byte
  159. for {
  160. dictReq["topic"] = topic
  161. binReq, err := json.Marshal(dictReq)
  162. if err != nil {
  163. return nil, fmt.Errorf("WebSocket.Call(): in marshall topic(%q), err=\n\t%w", topic, err)
  164. }
  165. err = sf.ws.WriteMessage(TypeMsgBin, binReq)
  166. if err != nil {
  167. sf.slog.Errorf("WebSocket.Call(): in write msg, err=\n\t%v\n", err)
  168. sf.ws.Close()
  169. sf.isConnect.Reset()
  170. sf.connect()
  171. continue
  172. }
  173. _, binResp, err = sf.ws.ReadMessage()
  174. if err != nil {
  175. sf.slog.Errorf("WebSocket.Call(): in read msg, err=\n\t%v\n", err)
  176. sf.ws.Close()
  177. sf.isConnect.Reset()
  178. sf.connect()
  179. continue
  180. }
  181. break
  182. }
  183. dictResp = make(map[string]string)
  184. err = json.Unmarshal(binResp, &dictResp)
  185. if err != nil {
  186. return nil, fmt.Errorf("WebSocket.Call(): in unmarshal binResp, err=\n\t%w", err)
  187. }
  188. return dictResp, nil
  189. }
  190. // Потокобезопасное ожидание закрытия в отдельном потоке
  191. func (sf *WebSocket) close() {
  192. <-sf.kern.Done()
  193. sf.block.Lock()
  194. defer sf.block.Unlock()
  195. if !sf.isConnect.Get() {
  196. return
  197. }
  198. sf.isConnect.Reset()
  199. sf.ws.Close()
  200. }