// package web_socket -- реализация высокоуровнего веб-сокета для работы десктопа package web_socket import ( "encoding/json" "fmt" "log" "net/url" "os" "sync" "time" "github.com/gorilla/websocket" "wartank/pkg/components/safebool" "wartank/pkg/types" ) const ( strWebSocket = "web_socket" TypeMsgBin = 2 ) // WebSocket -- реализация высокоуровнего веб-сокета для работы десктопа type WebSocket struct { kern types.IKernel slog types.ISlog url string isConnect *safebool.SafeBool ws *websocket.Conn block sync.RWMutex } // NewWebSocket -- возвращает новый веб-сокет func NewWebSocket(kern types.IKernel) (*WebSocket, error) { log.Println("NewWebSocket()") if kern == nil { return nil, fmt.Errorf("NewWebSocket(): IKernel == nil") } url := os.Getenv("SERVER_URL") if url == "" { return nil, fmt.Errorf("NewWebSocket(): env SERVER_URL not set") } sf := &WebSocket{ kern: kern, slog: kern.Slog(), url: url, isConnect: safebool.NewSafeBool(), } sf.connect() go sf.close() return sf, nil } // Подключает веб-сокет к серверу func (sf *WebSocket) connect() { log.Println("WebSocket.connect()") fnConnect := func() { u := url.URL{Scheme: "ws", Host: sf.url, Path: "/api/ws"} strUrl := u.String() log.Printf("WebSocket.connect(): wait connect to %q\n", strUrl) var err error sf.ws, _, err = websocket.DefaultDialer.Dial(strUrl, nil) if err != nil { log.Printf("WebSocket.connect(): in dial, err=\n\t%v\n", err) time.Sleep(time.Second * 2) return } sf.isConnect.Set() log.Println("WebSocket.connect(): ok") } for !sf.isConnect.Get() { select { case <-sf.kern.CtxApp().Done(): return default: fnConnect() } } } // Read -- потокобезопасное чтение топика сервера func (sf *WebSocket) Read(topic string) (map[string]string, error) { sf.block.Lock() defer sf.block.Unlock() dictResp, err := sf.read(topic) if err != nil { return nil, fmt.Errorf("WebSocket.Read(): in read, err=\n\t%w", err) } return dictResp, nil } // Скрытая потоко-небезопасна функция func (sf *WebSocket) read(topic string) (dictResp map[string]string, err error) { var binResp []byte for { dictReq := make(map[string]string) dictReq["topic"] = topic binReq, err := json.Marshal(dictReq) if err != nil { return nil, fmt.Errorf("WebSocket.read(): in marshall topic(%q), err=\n\t%w", topic, err) } err = sf.ws.WriteMessage(TypeMsgBin, binReq) if err != nil { sf.slog.Errorf("WebSocket.read(): in write msg, err=\n\t%v\n", err) sf.ws.Close() sf.isConnect.Reset() sf.connect() continue } _, binResp, err = sf.ws.ReadMessage() if err != nil { sf.slog.Errorf("WebSocket.read(): in read msg, err=\n\t%v\n", err) sf.ws.Close() sf.isConnect.Reset() sf.connect() continue } break } dictResp = make(map[string]string) err = json.Unmarshal(binResp, &dictResp) if err != nil { return nil, fmt.Errorf("WebSocket.read(): in unmarshal binResp, err=\n\t%w", err) } return dictResp, nil } // Write -- потокобезопасная запись топика func (sf *WebSocket) Write(topic string, dictReq map[string]string) error { sf.block.Lock() defer sf.block.Unlock() err := sf.write(topic, dictReq) if err != nil { return fmt.Errorf("WebSocket.Write(): in write, err=\n\t%w", err) } return nil } // Скрытая потоко-небезопасна функция func (sf *WebSocket) write(topic string, dictReq map[string]string) error { dictReq["topic"] = topic binData, err := json.Marshal(dictReq) if err != nil { return fmt.Errorf("WebSocket.write(): in marshal msg, err=\n\t%w", err) } for { err = sf.ws.WriteMessage(TypeMsgBin, binData) if err != nil { sf.slog.Errorf("WebSocket.write(): in write msg, err=\n\t%v\n", err) sf.isConnect.Reset() sf.ws.Close() sf.connect() continue } return nil } } // IsConnect -- потокобезопасный признак подключенности сервера func (sf *WebSocket) IsConnect() bool { sf.block.RLock() defer sf.block.RUnlock() return sf.isConnect.Get() } // Call -- потокобезопасный вызов удалённого топика func (sf *WebSocket) Call(topic string, dictReq map[string]string) (dictResp map[string]string, err error) { sf.block.Lock() defer sf.block.Unlock() var binResp []byte for { dictReq["topic"] = topic binReq, err := json.Marshal(dictReq) if err != nil { return nil, fmt.Errorf("WebSocket.Call(): in marshall topic(%q), err=\n\t%w", topic, err) } err = sf.ws.WriteMessage(TypeMsgBin, binReq) if err != nil { sf.slog.Errorf("WebSocket.Call(): in write msg, err=\n\t%v\n", err) sf.ws.Close() sf.isConnect.Reset() sf.connect() continue } _, binResp, err = sf.ws.ReadMessage() if err != nil { sf.slog.Errorf("WebSocket.Call(): in read msg, err=\n\t%v\n", err) sf.ws.Close() sf.isConnect.Reset() sf.connect() continue } break } dictResp = make(map[string]string) err = json.Unmarshal(binResp, &dictResp) if err != nil { return nil, fmt.Errorf("WebSocket.Call(): in unmarshal binResp, err=\n\t%w", err) } return dictResp, nil } // Потокобезопасное ожидание закрытия в отдельном потоке func (sf *WebSocket) close() { <-sf.kern.Done() sf.block.Lock() defer sf.block.Unlock() if !sf.isConnect.Get() { return } sf.isConnect.Reset() sf.ws.Close() }