diff --git a/main.go b/main.go index f868478..1be8242 100644 --- a/main.go +++ b/main.go @@ -21,7 +21,5 @@ func main() { } /** TODO - - queue for messages, both ways (Ack "queued" instead of "completed"), filled queue drops oldest entry - */ diff --git a/sim.go b/sim.go index 2309c1a..e534411 100644 --- a/sim.go +++ b/sim.go @@ -1,3 +1,5 @@ +//go:build sim + package main import ( @@ -13,7 +15,7 @@ import ( ) const ( - gatewayURL = "ws://localhost:3333/push" + gatewayURL = "ws://localhost:3333/sync" apiKey = "gateway" serverID = "test-server-001" ) diff --git a/ws/handlers.go b/ws/handlers.go index 5b0c696..838241f 100644 --- a/ws/handlers.go +++ b/ws/handlers.go @@ -9,7 +9,7 @@ import ( "github.com/gorilla/websocket" ) -func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) { +func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) { conn, err := wsg.validateAndUpgradeConnection(w, r) if err != nil { return @@ -65,7 +65,7 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) return } - wsg.registerConn(conn, meta) + wsg.registerConn(conn, meta, "mod") wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps @@ -81,11 +81,15 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) meta.ID = bhs.BotID + if ok := wsg.registerConn(conn, meta, "bot"); !ok { + wsg.sendWebsocketError(conn, "Bot already connected.", 409) + return + } + if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil { return } - wsg.registerConn(conn, meta) wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps @@ -97,12 +101,8 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) } } -func (wsg *WebsocketGateway) handleReady(w http.ResponseWriter, r *http.Request) {} - func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(200) _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) } - -func (wsg *WebsocketGateway) handleRegister(w http.ResponseWriter, r *http.Request) {} diff --git a/ws/structs.go b/ws/structs.go index 1bbe67e..9cbd361 100644 --- a/ws/structs.go +++ b/ws/structs.go @@ -16,9 +16,9 @@ type WebsocketGateway struct { upgrader websocket.Upgrader - cache *cache.Cache - botConn *websocket.Conn - conns *cache.ConnectionCache + cache *cache.Cache + bot *websocket.Conn + conns *cache.ConnectionCache modHandler ModHandler botHandler BotHandler diff --git a/ws/temp.go b/ws/temp.go index c62057b..8a64b07 100644 --- a/ws/temp.go +++ b/ws/temp.go @@ -40,6 +40,13 @@ func (h *LoggingModHandler) Handle(conn *websocket.Conn, msg GatewayModMessageIn ForwardedAt: time.Now().UTC(), } + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + if err := conn.WriteJSON(fwd); err != nil { + _ = conn.Close() + return err + } + b, _ := json.Marshal(fwd) h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content) h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) diff --git a/ws/util.go b/ws/util.go index f695de3..376dda7 100644 --- a/ws/util.go +++ b/ws/util.go @@ -105,17 +105,33 @@ func loggingMiddleware(logger *slog.Logger, next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() next.ServeHTTP(w, r) - logger.Info("http request", "remote", r.RemoteAddr, "method", r.Method, "path", r.URL.Path, "duration", time.Since(start)) + logger.Info("Incoming HTTP request.", "remote", r.RemoteAddr, "path", r.URL.Path, "duration", time.Since(start)) }) } // connections -func (wsg *WebsocketGateway) registerConn(conn *websocket.Conn, meta cache.ConnectionMetaData) { +func (wsg *WebsocketGateway) registerConn(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) bool { + if typ == "bot" { + if wsg.bot != nil { + return false + } + + wsg.bot = conn + return true + } + wsg.conns.Set(meta.ID, conn, &meta) + return true } -func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.ConnectionMetaData) { +func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) { + if typ == "bot" { + _ = wsg.bot.Close() + wsg.bot = nil + return + } + wsg.conns.RemoveById(meta.ID) _ = conn.Close() } @@ -123,8 +139,15 @@ func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.Con func (wsg *WebsocketGateway) closeAll() { wsg.logger.Info("Closing all websocket connections.") + if wsg.bot != nil { + _ = wsg.bot.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second)) + _ = wsg.bot.Close() + } + wsg.conns.Range(func(id string, c *websocket.Conn, meta *cache.ConnectionMetaData) { _ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second)) _ = c.Close() }) + + wsg.conns.Clear() } diff --git a/ws/websocket.go b/ws/websocket.go index a5ba800..a9f1cd8 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -46,10 +46,8 @@ func (wsg *WebsocketGateway) Start() error { func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { mux := http.NewServeMux() - mux.HandleFunc("/push", wsg.handlePush) - mux.HandleFunc("/ready", wsg.handleReady) + mux.HandleFunc("/sync", wsg.handleSync) mux.HandleFunc("/health", wsg.handleHealth) - mux.HandleFunc("/register", wsg.handleRegister) srv := &http.Server{ Addr: listenAddr, @@ -73,8 +71,8 @@ func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { defer func() { - wsg.unregisterConn(conn, meta) - wsg.logger.Info("Mod-Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) + wsg.unregisterConn(conn, meta, "mod") + wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) }() ticker := time.NewTicker(30 * time.Second) @@ -91,9 +89,7 @@ func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.Connec if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - wsg.logger.Warn("unexpected mod close", "server_id", meta.ID, "err", err) - } else { - wsg.logger.Debug("mod read error", "server_id", meta.ID, "err", err) + wsg.logger.Warn("Mod-Client unexpectedly closed the connection.", "err", err) } return } @@ -129,7 +125,7 @@ func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.Connec func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { defer func() { - wsg.unregisterConn(conn, meta) + wsg.unregisterConn(conn, meta, "bot") wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String()) }() @@ -187,3 +183,25 @@ func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.Connec _ = writeJSONSafe(conn, map[string]string{"status": "ok"}) } } + +func (wsg *WebsocketGateway) read(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) { + defer func() { + wsg.unregisterConn(conn, meta, typ) + wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String()) + }() + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + go func() { + for range ticker.C { + wsg.sendWebsocketPing(conn) + } + }() + + switch typ { + case "mod": + + case "bot": + } +}