diff --git a/README.md b/README.md index e4f35b4..f03f694 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,15 @@ # HomesteadGateway -Gateway between multiple HomesteadRelay's and the HomesteadToGo Bot. \ No newline at end of file +Gateway between multiple HomesteadRelay's and the HomesteadToGo Bot. + + +## dev notes + +perhaps drop database, instead + +``` +Mod -> websocket /register { server_id, channel_id } // grabbed from mod config +Bot -> websocket /ready { channel_id } // ready if Mod with fitting channel_id has called /register +Gateway -> memory cache (server_id -> channel_id; channel_id -> server_id) // mem enough +Mod/Bot -> websocket /ws { ... } -> Bot/Mod // sync +``` diff --git a/main.go b/main.go index 351e983..fa5670f 100644 --- a/main.go +++ b/main.go @@ -2,10 +2,8 @@ package main import ( "flag" - "fmt" "homestead/homestead_gateway/controller" "homestead/homestead_gateway/util/config" - "homestead/homestead_gateway/util/logger" ) func main() { @@ -15,19 +13,6 @@ func main() { panic(err) } - l, c, e := logger.New("yomama", cfg.Log) - if e != nil { - panic(e) - } - defer c() - - l.Debug("debug") - l.Info("info") - l.Warn("warn") - l.Error("error") - - panic(fmt.Sprintf("%+v", cfg.Log)) - ctrl := controller.NewGatewayController(*cfg) err = ctrl.Run() if err != nil { diff --git a/sim.go b/sim.go index 4ea529d..a8257e6 100644 --- a/sim.go +++ b/sim.go @@ -180,7 +180,7 @@ func simulateBot() { func main() { fmt.Println("Starting WebSocket client simulator...") - fmt.Println("Connecting to ws://localhost:3333/ws with api_key=test_key") + fmt.Println("Connecting to ws://localhost:3333/ws with api_key=gateway") go simulateMod() go simulateBot() diff --git a/util/cache/cache.go b/util/cache/cache.go new file mode 100644 index 0000000..9af7ca3 --- /dev/null +++ b/util/cache/cache.go @@ -0,0 +1,70 @@ +package cache + +import "sync" + +type Cache struct { + mu sync.RWMutex + s2c map[string]string + c2s map[string]string +} + +func NewCache() *Cache { + return &Cache{ + s2c: make(map[string]string), + c2s: make(map[string]string), + } +} + +// Set creates or overwrites the pair a -> b and b -> a. +// It ensures any previous mappings involving a or b are removed first. +func (c *Cache) Set(serverId, channelId string) { + c.mu.Lock() + defer c.mu.Unlock() + + if old, ok := c.s2c[serverId]; ok && old != channelId { + delete(c.c2s, old) + } + + if old, ok := c.c2s[channelId]; ok && old != serverId { + delete(c.s2c, old) + } + + c.s2c[serverId] = channelId + c.c2s[channelId] = serverId +} + +func (c *Cache) GetByServerId(serverId string) (string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + cId, ok := c.s2c[serverId] + return cId, ok +} + +func (c *Cache) GetByChannelId(channelId string) (string, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + sId, ok := c.c2s[channelId] + return sId, ok +} + +func (c *Cache) RemoveByServerId(serverId string) { + c.mu.RLock() + defer c.mu.RUnlock() + + if channelId, ok := c.s2c[serverId]; ok { + delete(c.s2c, serverId) + delete(c.c2s, channelId) + } +} + +func (c *Cache) RemoveByChannelId(channelId string) { + c.mu.RLock() + defer c.mu.RUnlock() + + if serverId, ok := c.s2c[channelId]; ok { + delete(c.c2s, channelId) + delete(c.s2c, serverId) + } +} diff --git a/util/cache/structs.go b/util/cache/structs.go new file mode 100644 index 0000000..b5ce12f --- /dev/null +++ b/util/cache/structs.go @@ -0,0 +1,16 @@ +package cache + +import ( + "sync" +) + +type ShardedCache struct { + shards []*shard + shardMask uint32 +} + +type shard struct { + mu sync.RWMutex + s2c map[string]string // serverId -> channelId + c2s map[string]string // channelId -> serverId +} diff --git a/ws/handlers.go b/ws/handlers.go index 238efbd..9859295 100644 --- a/ws/handlers.go +++ b/ws/handlers.go @@ -1,71 +1 @@ -// handlers.go package ws - -import ( - "context" - "encoding/json" - "log/slog" - "time" -) - -type LoggingModHandler struct { - logger *slog.Logger -} - -type LoggingBotHandler struct { - logger *slog.Logger -} - -func NewLoggingModHandler(logger *slog.Logger) *LoggingModHandler { - return &LoggingModHandler{logger: logger} -} - -func NewLoggingBotHandler(logger *slog.Logger) *LoggingBotHandler { - return &LoggingBotHandler{logger: logger} -} - -func (h *LoggingModHandler) Handle(ctx context.Context, msg GatewayModMessageIn) error { - // For now, just log and pretend it's being forwarded - // TODO: Look up channel_id from database using server - // TODO: Forward to bot connection(s) - - fwd := ForwardedModMessage{ - Type: "mod", - ServerID: msg.Server, - ChannelID: "TODO", // will come from database lookup - User: msg.User, - Content: msg.Content, - Meta: msg.Meta, - Ts: msg.Ts, - ReceivedAt: msg.ReceivedAt, - ForwardedAt: time.Now().UTC(), - } - - b, _ := json.Marshal(fwd) - h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) - - return nil -} - -func (h *LoggingBotHandler) Handle(ctx context.Context, msg GatewayBotMessageIn) error { - // For now, just log and pretend it's being forwarded - // TODO: Look up server_id from database using channel_id - // TODO: Forward to mod connection(s) - - fwd := ForwardedBotMessage{ - Type: "bot", - ServerID: "TODO", // will come from database lookup - ChannelID: msg.ChannelID, - Author: msg.Author, - Content: msg.Content, - Meta: msg.Meta, - Ts: msg.Ts, - ReceivedAt: msg.ReceivedAt, - ForwardedAt: time.Now().UTC(), - } - - b, _ := json.Marshal(fwd) - h.logger.Debug("forwarding bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "payload", string(b)) - - return nil -} diff --git a/ws/structs.go b/ws/structs.go index bba61be..bb70593 100644 --- a/ws/structs.go +++ b/ws/structs.go @@ -3,6 +3,7 @@ package ws import ( "context" "encoding/json" + "homestead/homestead_gateway/util/cache" "log/slog" "sync" "time" @@ -17,48 +18,58 @@ type WebsocketGateway struct { upgrader websocket.Upgrader connsMu sync.Mutex - conns map[*websocket.Conn]connMetadata + conns map[*websocket.Conn]connectionMetaData + cache cache.Cache modHandler ModHandler botHandler BotHandler logger *slog.Logger - closefn func() error + closeFn func() error } -type User struct { +type MinecraftUser struct { ID string `json:"id"` Name string `json:"name"` } +type DiscordUser struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type Destination struct { + ChannelID string `json:"channel_id"` +} + // GatewayModMessageIn : Mod -> Gateway -> Bot type GatewayModMessageIn struct { - MsgID string `json:"msg_id"` - Server string `json:"server"` - User User `json:"user"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` - ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) + MsgID string `json:"msg_id"` + Server string `json:"server"` + Destination Destination `json:"destination"` + Author MinecraftUser `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` + ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) } // GatewayBotMessageIn : Bot -> Gateway -> Mod type GatewayBotMessageIn struct { MsgID string `json:"msg_id"` ChannelID string `json:"channel_id"` - Author string `json:"author"` + Author DiscordUser `json:"author"` Content string `json:"content"` Meta map[string]interface{} `json:"meta,omitempty"` Ts string `json:"ts,omitempty"` - ReceivedAt time.Time `json:"-"` + ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from bot) } -// ForwardedModMessage : Gateway -> Bot -type ForwardedModMessage struct { +// GatewayModMessageOut : Gateway -> Bot +type GatewayModMessageOut struct { Type string `json:"type"` // "mod" - ServerID string `json:"server_id"` ChannelID string `json:"channel_id"` - User User `json:"user"` + Author MinecraftUser `json:"author"` Content string `json:"content"` Meta map[string]interface{} `json:"meta,omitempty"` Ts string `json:"ts,omitempty"` @@ -66,12 +77,11 @@ type ForwardedModMessage struct { ForwardedAt time.Time `json:"forwarded_at"` } -// ForwardedBotMessage : Gateway -> Mod -type ForwardedBotMessage struct { +// GatewayBotMessageOut : Gateway -> Mod +type GatewayBotMessageOut struct { Type string `json:"type"` // "bot" - ServerID string `json:"server_id"` ChannelID string `json:"channel_id"` - Author string `json:"author"` + Author DiscordUser `json:"author"` Content string `json:"content"` Meta map[string]interface{} `json:"meta,omitempty"` Ts string `json:"ts,omitempty"` @@ -79,25 +89,16 @@ type ForwardedBotMessage struct { ForwardedAt time.Time `json:"forwarded_at"` } -type BotAck struct { - Type string `json:"type"` // "acknowledge" - MsgID string `json:"msg_id"` - Status string `json:"status,omitempty"` // "queued"/"sent" +type GatewayAck struct { + Status string `json:"status"` + Type string `json:"type"` } -type MessageEnvelope struct { +type Handshake struct { Type string `json:"type"` // "mod" or "bot" Data json.RawMessage `json:"data"` } -type ModHandler interface { - Handle(ctx context.Context, msg GatewayModMessageIn) error -} - -type BotHandler interface { - Handle(ctx context.Context, msg GatewayBotMessageIn) error -} - type ModHandshake struct { ServerID string `json:"server_id"` } @@ -106,7 +107,15 @@ type BotHandshake struct { BotID string `json:"bot_id"` } -type connMetadata struct { - connType string // "mod" or "bot" - id string // server_id or bot_id for logging +type ModHandler interface { + Handle(ctx context.Context, msg GatewayModMessageIn) error +} + +type BotHandler interface { + Handle(ctx context.Context, msg GatewayBotMessageIn) error +} + +type connectionMetaData struct { + connectionType string // "mod" or "bot" + id string // server_id or bot_id for logging } diff --git a/ws/util.go b/ws/util.go index aeeed09..4822779 100644 --- a/ws/util.go +++ b/ws/util.go @@ -2,32 +2,88 @@ package ws import ( "context" - "fmt" + "encoding/json" + "errors" "log/slog" "net/http" - "os/signal" - "syscall" "time" "github.com/gorilla/websocket" ) -func (g *WebsocketGateway) Start() error { - ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stop() +// (de-)register - return g.Serve(ctx, fmt.Sprintf(":%d", g.port)) +func (wsg *WebsocketGateway) listen(srv *http.Server, addr string, channel chan error) { + wsg.logger.Info("Gateway listening.", "addr", addr) + + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + channel <- err + } + + close(channel) +} + +func (wsg *WebsocketGateway) deafen(srv *http.Server) { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wsg.logger.Info("Shutting down Websocket Gateway.") + + _ = srv.Shutdown(shutdownCtx) + wsg.closeAll() +} + +// responses + +func (wsg *WebsocketGateway) sendHttpError(w http.ResponseWriter, message string, code int) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"message": message, "code": code}) +} + +func (wsg *WebsocketGateway) sendWebsocketError(conn *websocket.Conn, message string, code int) { + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _ = conn.WriteJSON(map[string]interface{}{"message": message, "code": code}) + _ = conn.Close() +} + +func (wsg *WebsocketGateway) sendWebsocketResponse(conn *websocket.Conn, content interface{}) error { + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + if err := conn.WriteJSON(content); err != nil { + wsg.logger.Error("Failed to respond to connection.", "remote", conn.RemoteAddr().String(), "err", err) + _ = conn.Close() + return err + } + + return nil } // util -func (g *WebsocketGateway) validateApiKey(r *http.Request) bool { +func (wsg *WebsocketGateway) validateAndUpgradeConnection(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { + if !wsg.validateApiKey(r) { + wsg.sendHttpError(w, "Unauthorized", 401) + wsg.logger.Warn("Authorization failed", "remote", r.RemoteAddr) + return nil, errors.New("unauthorized") + } + + conn, err := wsg.upgrader.Upgrade(w, r, nil) + if err != nil { + wsg.logger.Error("Upgrade error.", "remote", r.RemoteAddr) + return nil, err + } + + return conn, err +} + +func (wsg *WebsocketGateway) validateApiKey(r *http.Request) bool { apiKey := r.URL.Query().Get("api_key") if apiKey == "" { apiKey = r.Header.Get("X-API-Key") } - return !(apiKey == "" || apiKey != g.apiKey) + return !(apiKey == "" || apiKey != wsg.apiKey) } func writeJSONSafe(c *websocket.Conn, v interface{}) error { @@ -49,24 +105,26 @@ func loggingMiddleware(logger *slog.Logger, next http.Handler) http.Handler { // connections -func (g *WebsocketGateway) registerConn(c *websocket.Conn, meta connMetadata) { - g.connsMu.Lock() - g.conns[c] = meta - g.connsMu.Unlock() +func (wsg *WebsocketGateway) registerConn(c *websocket.Conn, meta connectionMetaData) { + wsg.connsMu.Lock() + wsg.conns[c] = meta + wsg.connsMu.Unlock() } -func (g *WebsocketGateway) unregisterConn(c *websocket.Conn) { - g.connsMu.Lock() - delete(g.conns, c) - g.connsMu.Unlock() +func (wsg *WebsocketGateway) unregisterConn(c *websocket.Conn) { + wsg.connsMu.Lock() + delete(wsg.conns, c) + wsg.connsMu.Unlock() } -func (g *WebsocketGateway) closeAll() { - g.connsMu.Lock() - defer g.connsMu.Unlock() - g.logger.Info("closing websocket connections") - for c := range g.conns { - _ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutting down"), time.Now().Add(time.Second)) +func (wsg *WebsocketGateway) closeAll() { + wsg.connsMu.Lock() + defer wsg.connsMu.Unlock() + + wsg.logger.Info("Closing all websocket connections.") + + for c := range wsg.conns { + _ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second)) _ = c.Close() } } diff --git a/ws/websocket.go b/ws/websocket.go index 7cc062b..8c91a20 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -3,11 +3,13 @@ package ws import ( "context" "encoding/json" - "errors" + "fmt" "homestead/homestead_gateway/util/config" "log/slog" "net" "net/http" + "os/signal" + "syscall" "time" "github.com/gorilla/websocket" @@ -16,7 +18,7 @@ import ( func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error, modH ModHandler, botH BotHandler) *WebsocketGateway { return &WebsocketGateway{ logger: logger, - closefn: closefn, + closeFn: closefn, apiKey: cfg.Websocket, upgrader: websocket.Upgrader{ ReadBufferSize: 1024, @@ -25,7 +27,7 @@ func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() return true // local by default; change for production }, }, - conns: make(map[*websocket.Conn]connMetadata), + conns: make(map[*websocket.Conn]connectionMetaData), bodySizeBytes: int64(cfg.BodySize) * 1024 * 1024, port: cfg.HttpPort, modHandler: modH, @@ -33,58 +35,48 @@ func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() } } -func (g *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { +func (wsg *WebsocketGateway) Start() error { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + return wsg.Serve(ctx, fmt.Sprintf(":%d", wsg.port)) +} + +func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { mux := http.NewServeMux() - mux.HandleFunc("/ws", g.handleWS) - mux.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(200) - _, _ = w.Write([]byte("ok")) - }) + mux.HandleFunc("/push", wsg.handlePush) + mux.HandleFunc("/ready", wsg.handleReady) + mux.HandleFunc("/health", wsg.handleHealth) + mux.HandleFunc("/register", wsg.handleRegister) srv := &http.Server{ Addr: listenAddr, - Handler: loggingMiddleware(g.logger, mux), + Handler: loggingMiddleware(wsg.logger, mux), BaseContext: func(l net.Listener) context.Context { return ctx }, } - errCh := make(chan error, 1) - go func() { - g.logger.Info("ws gateway listening", "addr", listenAddr) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - errCh <- err - } - close(errCh) - }() + + go wsg.listen(srv, listenAddr, errCh) select { case <-ctx.Done(): - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - g.logger.Info("shutting down http server") - _ = srv.Shutdown(shutdownCtx) - g.closeAll() + wsg.deafen(srv) return nil case err := <-errCh: return err } } -func (g *WebsocketGateway) handleWS(w http.ResponseWriter, r *http.Request) { - if !g.validateApiKey(r) { - http.Error(w, "unauthorized", http.StatusUnauthorized) - g.logger.Warn("ws auth failed", "remote", r.RemoteAddr) - return - } +// - conn, err := g.upgrader.Upgrade(w, r, nil) +func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) { + conn, err := wsg.validateAndUpgradeConnection(w, r) if err != nil { - g.logger.Error("ws upgrade error", err, "remote", r.RemoteAddr) return } - // Configure read limits & pong handler - if g.bodySizeBytes > 0 { - conn.SetReadLimit(g.bodySizeBytes) + if wsg.bodySizeBytes > 0 { + conn.SetReadLimit(wsg.bodySizeBytes) } else { conn.SetReadLimit(1 << 20) // sensible default 1MiB } @@ -95,72 +87,93 @@ func (g *WebsocketGateway) handleWS(w http.ResponseWriter, r *http.Request) { return nil }) - // First message must be a handshake identifying the connection type typ, data, err := conn.ReadMessage() if err != nil { - g.logger.Error("failed to read handshake", "err", err, "remote", conn.RemoteAddr().String()) - _ = conn.Close() + wsg.sendWebsocketError(conn, "Internal Server Error", 500) + wsg.logger.Error("Failed to read handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } if typ != websocket.TextMessage && typ != websocket.BinaryMessage { - g.logger.Warn("invalid handshake message type", "remote", conn.RemoteAddr().String()) - _ = writeJSONSafe(conn, map[string]string{"error": "first message must be handshake"}) - _ = conn.Close() + wsg.sendWebsocketError(conn, "First message must be a handshake.", 400) + wsg.logger.Warn("Invalid handshake message type.", "remote", conn.RemoteAddr().String()) return } - var envelope MessageEnvelope - if err := json.Unmarshal(data, &envelope); err != nil { - g.logger.Warn("invalid handshake json", "err", err, "remote", conn.RemoteAddr().String()) - _ = writeJSONSafe(conn, map[string]string{"error": "invalid handshake: " + err.Error()}) - _ = conn.Close() + var handshake Handshake + if err := json.Unmarshal(data, &handshake); err != nil { + wsg.sendWebsocketError(conn, "Malformed handshake.", 400) + wsg.logger.Warn("Malformed handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } - meta := connMetadata{connType: envelope.Type} + meta := connectionMetaData{connectionType: handshake.Type} - // Validate handshake based on type - switch envelope.Type { + switch handshake.Type { case "mod": - var hs ModHandshake - if err := json.Unmarshal(envelope.Data, &hs); err != nil { - g.logger.Warn("invalid mod handshake", "err", err, "remote", conn.RemoteAddr().String()) - _ = writeJSONSafe(conn, map[string]string{"error": "invalid mod handshake: " + err.Error()}) - _ = conn.Close() + var mhs ModHandshake + + if err := json.Unmarshal(handshake.Data, &mhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400) + wsg.logger.Warn("Malformed mod handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } - meta.id = hs.ServerID - g.registerConn(conn, meta) - g.logger.Info("mod connected", "server_id", hs.ServerID, "remote", conn.RemoteAddr().String()) - go g.modReadLoop(conn, meta) + + meta.id = mhs.ServerID + + if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "mod"}); err != nil { + return + } + + wsg.registerConn(conn, meta) + wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) + + go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps case "bot": - var hs BotHandshake - if err := json.Unmarshal(envelope.Data, &hs); err != nil { - g.logger.Warn("invalid bot handshake", "err", err, "remote", conn.RemoteAddr().String()) - _ = writeJSONSafe(conn, map[string]string{"error": "invalid bot handshake: " + err.Error()}) - _ = conn.Close() + var bhs BotHandshake + + if err := json.Unmarshal(handshake.Data, &bhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400) + wsg.logger.Warn("Malformed bot handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } - meta.id = hs.BotID - g.registerConn(conn, meta) - g.logger.Info("bot connected", "bot_id", hs.BotID, "remote", conn.RemoteAddr().String()) - go g.botReadLoop(conn, meta) + + meta.id = bhs.BotID + + if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil { + return + } + + wsg.registerConn(conn, meta) + wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) + + go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps default: - g.logger.Warn("unknown connection type", "type", envelope.Type, "remote", conn.RemoteAddr().String()) - _ = writeJSONSafe(conn, map[string]string{"error": "unknown connection type: " + envelope.Type}) - _ = conn.Close() + wsg.sendWebsocketError(conn, "Unknown handshake.", 400) + wsg.logger.Warn("Unknown connection type.", "remote", conn.RemoteAddr().String(), "type", handshake.Type) return } } -func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { +func (wsg *WebsocketGateway) handleReady(w http.ResponseWriter, r *http.Request) {} + +func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) +} + +func (wsg *WebsocketGateway) handleRegister(w http.ResponseWriter, r *http.Request) {} + +// + +func (wsg *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connectionMetaData) { defer func() { - g.unregisterConn(c) + wsg.unregisterConn(c) _ = c.Close() - g.logger.Info("mod disconnected", "server_id", meta.id, "remote", c.RemoteAddr().String()) + wsg.logger.Info("mod disconnected", "server_id", meta.id, "remote", c.RemoteAddr().String()) }() pingTicker := time.NewTicker(30 * time.Second) @@ -170,9 +183,9 @@ func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { typ, data, err := c.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - g.logger.Warn("unexpected mod close", "server_id", meta.id, "err", err) + wsg.logger.Warn("unexpected mod close", "server_id", meta.id, "err", err) } else { - g.logger.Debug("mod read error", "server_id", meta.id, "err", err) + wsg.logger.Debug("mod read error", "server_id", meta.id, "err", err) } return } @@ -184,21 +197,21 @@ func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { var msg GatewayModMessageIn if err := json.Unmarshal(data, &msg); err != nil { _ = writeJSONSafe(c, map[string]string{"error": "invalid json: " + err.Error()}) - g.logger.Warn("invalid json from mod", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + wsg.logger.Warn("invalid json from mod", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } msg.ReceivedAt = time.Now().UTC() if err := msg.Validate(); err != nil { _ = writeJSONSafe(c, map[string]string{"error": err.Error()}) - g.logger.Warn("mod message validation failed", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + wsg.logger.Warn("mod message validation failed", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } // Handle the message (forward to bot, enrich, etc.) - if err := g.modHandler.Handle(context.Background(), msg); err != nil { + if err := wsg.modHandler.Handle(context.Background(), msg); err != nil { _ = writeJSONSafe(c, map[string]string{"error": "handler error: " + err.Error()}) - g.logger.Error("mod handler error", "server_id", meta.id, "err", err) + wsg.logger.Error("mod handler error", "server_id", meta.id, "err", err) continue } @@ -209,7 +222,7 @@ func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { case <-pingTicker.C: _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - g.logger.Debug("write ping failed", "server_id", meta.id, "err", err) + wsg.logger.Debug("write ping failed", "server_id", meta.id, "err", err) return } default: @@ -217,11 +230,11 @@ func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { } } -func (g *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connMetadata) { +func (wsg *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connectionMetaData) { defer func() { - g.unregisterConn(c) + wsg.unregisterConn(c) _ = c.Close() - g.logger.Info("bot disconnected", "bot_id", meta.id, "remote", c.RemoteAddr().String()) + wsg.logger.Info("bot disconnected", "bot_id", meta.id, "remote", c.RemoteAddr().String()) }() pingTicker := time.NewTicker(30 * time.Second) @@ -231,9 +244,9 @@ func (g *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connMetadata) { typ, data, err := c.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - g.logger.Warn("unexpected bot close", "bot_id", meta.id, "err", err) + wsg.logger.Warn("unexpected bot close", "bot_id", meta.id, "err", err) } else { - g.logger.Debug("bot read error", "bot_id", meta.id, "err", err) + wsg.logger.Debug("bot read error", "bot_id", meta.id, "err", err) } return } @@ -245,21 +258,21 @@ func (g *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connMetadata) { var msg GatewayBotMessageIn if err := json.Unmarshal(data, &msg); err != nil { _ = writeJSONSafe(c, map[string]string{"error": "invalid json: " + err.Error()}) - g.logger.Warn("invalid json from bot", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + wsg.logger.Warn("invalid json from bot", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } msg.ReceivedAt = time.Now().UTC() if err := msg.Validate(); err != nil { _ = writeJSONSafe(c, map[string]string{"error": err.Error()}) - g.logger.Warn("bot message validation failed", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + wsg.logger.Warn("bot message validation failed", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } // Handle the message (forward to mod, enrich, etc.) - if err := g.botHandler.Handle(context.Background(), msg); err != nil { + if err := wsg.botHandler.Handle(context.Background(), msg); err != nil { _ = writeJSONSafe(c, map[string]string{"error": "handler error: " + err.Error()}) - g.logger.Error("bot handler error", "bot_id", meta.id, "err", err) + wsg.logger.Error("bot handler error", "bot_id", meta.id, "err", err) continue } @@ -270,7 +283,7 @@ func (g *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connMetadata) { case <-pingTicker.C: _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - g.logger.Debug("write ping failed", "bot_id", meta.id, "err", err) + wsg.logger.Debug("write ping failed", "bot_id", meta.id, "err", err) return } default: