123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218 |
- // package web_socket -- реализация высокоуровнего веб-сокета для работы десктопа
- package web_socket
- import (
- "encoding/json"
- "fmt"
- "log"
- "net/url"
- "os"
- "sync"
- "time"
- "github.com/gorilla/websocket"
- "wartank/pkg/components/safebool"
- "wartank/pkg/types"
- )
- const (
- strWebSocket = "web_socket"
- TypeMsgBin = 2
- )
- // WebSocket -- реализация высокоуровнего веб-сокета для работы десктопа
- type WebSocket struct {
- kern types.IKernel
- slog types.ISlog
- url string
- isConnect *safebool.SafeBool
- ws *websocket.Conn
- block sync.RWMutex
- }
- // NewWebSocket -- возвращает новый веб-сокет
- func NewWebSocket(kern types.IKernel) (*WebSocket, error) {
- log.Println("NewWebSocket()")
- if kern == nil {
- return nil, fmt.Errorf("NewWebSocket(): IKernel == nil")
- }
- url := os.Getenv("SERVER_URL")
- if url == "" {
- return nil, fmt.Errorf("NewWebSocket(): env SERVER_URL not set")
- }
- sf := &WebSocket{
- kern: kern,
- slog: kern.Slog(),
- url: url,
- isConnect: safebool.NewSafeBool(),
- }
- sf.connect()
- go sf.close()
- return sf, nil
- }
- // Подключает веб-сокет к серверу
- func (sf *WebSocket) connect() {
- log.Println("WebSocket.connect()")
- fnConnect := func() {
- u := url.URL{Scheme: "ws", Host: sf.url, Path: "/api/ws"}
- strUrl := u.String()
- log.Printf("WebSocket.connect(): wait connect to %q\n", strUrl)
- var err error
- sf.ws, _, err = websocket.DefaultDialer.Dial(strUrl, nil)
- if err != nil {
- log.Printf("WebSocket.connect(): in dial, err=\n\t%v\n", err)
- time.Sleep(time.Second * 2)
- return
- }
- sf.isConnect.Set()
- log.Println("WebSocket.connect(): ok")
- }
- for !sf.isConnect.Get() {
- select {
- case <-sf.kern.CtxApp().Done():
- return
- default:
- fnConnect()
- }
- }
- }
- // Read -- потокобезопасное чтение топика сервера
- func (sf *WebSocket) Read(topic string) (map[string]string, error) {
- sf.block.Lock()
- defer sf.block.Unlock()
- dictResp, err := sf.read(topic)
- if err != nil {
- return nil, fmt.Errorf("WebSocket.Read(): in read, err=\n\t%w", err)
- }
- return dictResp, nil
- }
- // Скрытая потоко-небезопасна функция
- func (sf *WebSocket) read(topic string) (dictResp map[string]string, err error) {
- var binResp []byte
- for {
- dictReq := make(map[string]string)
- dictReq["topic"] = topic
- binReq, err := json.Marshal(dictReq)
- if err != nil {
- return nil, fmt.Errorf("WebSocket.read(): in marshall topic(%q), err=\n\t%w", topic, err)
- }
- err = sf.ws.WriteMessage(TypeMsgBin, binReq)
- if err != nil {
- sf.slog.Errorf("WebSocket.read(): in write msg, err=\n\t%v\n", err)
- sf.ws.Close()
- sf.isConnect.Reset()
- sf.connect()
- continue
- }
- _, binResp, err = sf.ws.ReadMessage()
- if err != nil {
- sf.slog.Errorf("WebSocket.read(): in read msg, err=\n\t%v\n", err)
- sf.ws.Close()
- sf.isConnect.Reset()
- sf.connect()
- continue
- }
- break
- }
- dictResp = make(map[string]string)
- err = json.Unmarshal(binResp, &dictResp)
- if err != nil {
- return nil, fmt.Errorf("WebSocket.read(): in unmarshal binResp, err=\n\t%w", err)
- }
- return dictResp, nil
- }
- // Write -- потокобезопасная запись топика
- func (sf *WebSocket) Write(topic string, dictReq map[string]string) error {
- sf.block.Lock()
- defer sf.block.Unlock()
- err := sf.write(topic, dictReq)
- if err != nil {
- return fmt.Errorf("WebSocket.Write(): in write, err=\n\t%w", err)
- }
- return nil
- }
- // Скрытая потоко-небезопасна функция
- func (sf *WebSocket) write(topic string, dictReq map[string]string) error {
- dictReq["topic"] = topic
- binData, err := json.Marshal(dictReq)
- if err != nil {
- return fmt.Errorf("WebSocket.write(): in marshal msg, err=\n\t%w", err)
- }
- for {
- err = sf.ws.WriteMessage(TypeMsgBin, binData)
- if err != nil {
- sf.slog.Errorf("WebSocket.write(): in write msg, err=\n\t%v\n", err)
- sf.isConnect.Reset()
- sf.ws.Close()
- sf.connect()
- continue
- }
- return nil
- }
- }
- // IsConnect -- потокобезопасный признак подключенности сервера
- func (sf *WebSocket) IsConnect() bool {
- sf.block.RLock()
- defer sf.block.RUnlock()
- return sf.isConnect.Get()
- }
- // Call -- потокобезопасный вызов удалённого топика
- func (sf *WebSocket) Call(topic string, dictReq map[string]string) (dictResp map[string]string, err error) {
- sf.block.Lock()
- defer sf.block.Unlock()
- var binResp []byte
- for {
- dictReq["topic"] = topic
- binReq, err := json.Marshal(dictReq)
- if err != nil {
- return nil, fmt.Errorf("WebSocket.Call(): in marshall topic(%q), err=\n\t%w", topic, err)
- }
- err = sf.ws.WriteMessage(TypeMsgBin, binReq)
- if err != nil {
- sf.slog.Errorf("WebSocket.Call(): in write msg, err=\n\t%v\n", err)
- sf.ws.Close()
- sf.isConnect.Reset()
- sf.connect()
- continue
- }
- _, binResp, err = sf.ws.ReadMessage()
- if err != nil {
- sf.slog.Errorf("WebSocket.Call(): in read msg, err=\n\t%v\n", err)
- sf.ws.Close()
- sf.isConnect.Reset()
- sf.connect()
- continue
- }
- break
- }
- dictResp = make(map[string]string)
- err = json.Unmarshal(binResp, &dictResp)
- if err != nil {
- return nil, fmt.Errorf("WebSocket.Call(): in unmarshal binResp, err=\n\t%w", err)
- }
- return dictResp, nil
- }
- // Потокобезопасное ожидание закрытия в отдельном потоке
- func (sf *WebSocket) close() {
- <-sf.kern.Done()
- sf.block.Lock()
- defer sf.block.Unlock()
- if !sf.isConnect.Get() {
- return
- }
- sf.isConnect.Reset()
- sf.ws.Close()
- }
|