From d43255885cd84d3ed1f7e70a4b00eae64eb7a7de Mon Sep 17 00:00:00 2001 From: ifedorenko Date: Thu, 20 Feb 2020 19:58:40 -0500 Subject: [PATCH] Serialize websocket writes with a mutex (#256) Fixes #241 Signed-off-by: Igor Fedorenko --- internal/api/websocket.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/internal/api/websocket.go b/internal/api/websocket.go index 2e0b80b86..40055f76d 100644 --- a/internal/api/websocket.go +++ b/internal/api/websocket.go @@ -33,7 +33,7 @@ var wsAuth = struct { mutex sync.RWMutex }{authenticated: make(map[string]bool)} -func wsReader(ws *websocket.Conn, connId string, conf *config.Config) { +func wsReader(ws *websocket.Conn, writeMutex *sync.Mutex, connId string, conf *config.Config) { defer ws.Close() ws.SetReadLimit(512) @@ -61,17 +61,19 @@ func wsReader(ws *websocket.Conn, connId string, conf *config.Config) { wsAuth.authenticated[connId] = true wsAuth.mutex.Unlock() + writeMutex.Lock() ws.SetWriteDeadline(time.Now().Add(30 * time.Second)) if err := ws.WriteJSON(gin.H{"event": "config.updated", "data": event.Data(conf.ClientConfig())}); err != nil { log.Error(err) } + writeMutex.Unlock() } } } } -func wsWriter(ws *websocket.Conn, connId string) { +func wsWriter(ws *websocket.Conn, writeMutex *sync.Mutex, connId string) { pingTicker := time.NewTicker(15 * time.Second) s := event.Subscribe("log.*", "notify.*", "index.*", "upload.*", "import.*", "config.*", "count.*", "photos.*", "albums.*", "labels.*") @@ -98,12 +100,15 @@ func wsWriter(ws *websocket.Conn, connId string) { wsAuth.mutex.RUnlock() if auth { + writeMutex.Lock() ws.SetWriteDeadline(time.Now().Add(30 * time.Second)) if err := ws.WriteJSON(gin.H{"event": msg.Name, "data": msg.Fields}); err != nil { + writeMutex.Unlock() log.Debug(err) return } + writeMutex.Unlock() } } } @@ -131,6 +136,8 @@ func Websocket(router *gin.RouterGroup, conf *config.Config) { return } + var writeMutex sync.Mutex + defer ws.Close() connId := rnd.UUID() @@ -143,8 +150,8 @@ func Websocket(router *gin.RouterGroup, conf *config.Config) { log.Debug("websocket: connected") - go wsWriter(ws, connId) + go wsWriter(ws, &writeMutex, connId) - wsReader(ws, connId, conf) + wsReader(ws, &writeMutex, connId, conf) }) }