From 4a668493c4274fc8078119891b7d1ad2f6c7b694 Mon Sep 17 00:00:00 2001 From: Overlord Date: Mon, 1 Dec 2025 13:40:58 +0100 Subject: [PATCH] temp push --- config.toml | 4 +- {shared => controller}/controller.go | 15 +- {shared => controller}/structs.go | 2 +- main.go | 24 +++- sim.go | 191 ++++++++++++++++++++++++++ util/logger/log.go | 2 +- ws/handlers.go | 71 ++++++++++ ws/structs.go | 87 +++++++++--- ws/util.go | 44 ++---- ws/validate.go | 38 ++++++ ws/websocket.go | 197 ++++++++++++++++++++------- 11 files changed, 559 insertions(+), 116 deletions(-) rename {shared => controller}/controller.go (57%) rename {shared => controller}/structs.go (89%) create mode 100644 sim.go create mode 100644 ws/handlers.go create mode 100644 ws/validate.go diff --git a/config.toml b/config.toml index f0eacef..b41f343 100644 --- a/config.toml +++ b/config.toml @@ -1,10 +1,10 @@ [log] -level = "info" +level = "debug" directory = "logs/" rotation = 3 # in days [gateway] -http_port = 8080 +http_port = 3333 websocket = "gateway" body_size = 2 # in MB queue_max = 8192 diff --git a/shared/controller.go b/controller/controller.go similarity index 57% rename from shared/controller.go rename to controller/controller.go index b27ecdb..285aa66 100644 --- a/shared/controller.go +++ b/controller/controller.go @@ -1,4 +1,4 @@ -package shared +package controller import ( "homestead/homestead_gateway/util/config" @@ -11,17 +11,16 @@ func NewGatewayController(cfg config.Config) GatewayController { if err != nil { panic(err) } - //hsl, hCloseFn, err := logger.New("HttpServer", cfg.Log) - //if err != nil { - // panic(err) - //} + + modHandler := ws.NewLoggingModHandler(wsl) + botHandler := ws.NewLoggingBotHandler(wsl) return GatewayController{ - Websocket: ws.NewWsGateway(cfg.Gateway, wsl, wCloseFn), + Websocket: ws.NewWsGateway(cfg.Gateway, wsl, wCloseFn, modHandler, botHandler), HttpServer: HttpGateway{}, } } -func (gc *GatewayController) Run() { - gc.Websocket.StartGatewayWithForwarder() +func (gc *GatewayController) Run() error { + return gc.Websocket.Start() } diff --git a/shared/structs.go b/controller/structs.go similarity index 89% rename from shared/structs.go rename to controller/structs.go index 26357fb..e56093b 100644 --- a/shared/structs.go +++ b/controller/structs.go @@ -1,4 +1,4 @@ -package shared +package controller import ( "homestead/homestead_gateway/ws" diff --git a/main.go b/main.go index f0f544f..351e983 100644 --- a/main.go +++ b/main.go @@ -2,8 +2,10 @@ package main import ( "flag" - "homestead/homestead_gateway/shared" + "fmt" + "homestead/homestead_gateway/controller" "homestead/homestead_gateway/util/config" + "homestead/homestead_gateway/util/logger" ) func main() { @@ -13,6 +15,22 @@ func main() { panic(err) } - controller := shared.NewGatewayController(*cfg) - controller.Run() + l, c, e := logger.New("yomama", cfg.Log) + if e != nil { + panic(e) + } + defer c() + + l.Debug("debug") + l.Info("info") + l.Warn("warn") + l.Error("error") + + panic(fmt.Sprintf("%+v", cfg.Log)) + + ctrl := controller.NewGatewayController(*cfg) + err = ctrl.Run() + if err != nil { + panic(err) + } } diff --git a/sim.go b/sim.go new file mode 100644 index 0000000..4ea529d --- /dev/null +++ b/sim.go @@ -0,0 +1,191 @@ +package main + +import ( + "encoding/json" + "fmt" + "log" + "net/url" + "time" + + "github.com/gorilla/websocket" +) + +type MessageEnvelope struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +type ModHandshake struct { + ServerID string `json:"server_id"` +} + +type BotHandshake struct { + BotID string `json:"bot_id"` +} + +type User struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type ModMessage struct { + MsgID string `json:"msg_id"` + Server string `json:"server"` + User User `json:"user"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` +} + +type BotMessage struct { + MsgID string `json:"msg_id"` + ChannelID string `json:"channel_id"` + Author string `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` +} + +func simulateMod() { + u := url.URL{Scheme: "ws", Host: "localhost:3333", Path: "/ws"} + u.RawQuery = "api_key=gateway" + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatalf("mod dial error: %v", err) + } + defer conn.Close() + + fmt.Println("[MOD] Connected") + + // Send handshake + handshake := ModHandshake{ServerID: "survival_server"} + hsData, _ := json.Marshal(handshake) + envelope := MessageEnvelope{ + Type: "mod", + Data: hsData, + } + envData, _ := json.Marshal(envelope) + + if err := conn.WriteMessage(websocket.TextMessage, envData); err != nil { + log.Fatalf("mod handshake write error: %v", err) + } + fmt.Println("[MOD] Sent handshake") + + // Read handshake response + _, resp, err := conn.ReadMessage() + if err != nil { + log.Fatalf("mod handshake response error: %v", err) + } + fmt.Printf("[MOD] Handshake response: %s\n", string(resp)) + + // Send a few messages + for i := 1; i <= 3; i++ { + msg := ModMessage{ + MsgID: fmt.Sprintf("mod_msg_%d", i), + Server: "survival_server", + User: User{ID: "player_123", Name: "Steve"}, + Content: fmt.Sprintf("Message %d from Minecraft!", i), + Meta: map[string]interface{}{ + "coordinates": map[string]int{"x": 100 + i*10, "y": 64, "z": 200}, + }, + Ts: time.Now().UTC().Format(time.RFC3339), + } + + msgData, _ := json.Marshal(msg) + if err := conn.WriteMessage(websocket.TextMessage, msgData); err != nil { + log.Fatalf("mod message write error: %v", err) + } + fmt.Printf("[MOD] Sent message: %s\n", msg.Content) + + // Read response + _, resp, err := conn.ReadMessage() + if err != nil { + log.Fatalf("mod response error: %v", err) + } + fmt.Printf("[MOD] Response: %s\n", string(resp)) + + time.Sleep(1 * time.Second) + } + + fmt.Println("[MOD] Closing connection") +} + +func simulateBot() { + time.Sleep(2 * time.Second) // Let mod connect first + + u := url.URL{Scheme: "ws", Host: "localhost:3333", Path: "/ws"} + u.RawQuery = "api_key=gateway" + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatalf("bot dial error: %v", err) + } + defer conn.Close() + + fmt.Println("[BOT] Connected") + + // Send handshake + handshake := BotHandshake{BotID: "discord_bot_1"} + hsData, _ := json.Marshal(handshake) + envelope := MessageEnvelope{ + Type: "bot", + Data: hsData, + } + envData, _ := json.Marshal(envelope) + + if err := conn.WriteMessage(websocket.TextMessage, envData); err != nil { + log.Fatalf("bot handshake write error: %v", err) + } + fmt.Println("[BOT] Sent handshake") + + // Read handshake response + _, resp, err := conn.ReadMessage() + if err != nil { + log.Fatalf("bot handshake response error: %v", err) + } + fmt.Printf("[BOT] Handshake response: %s\n", string(resp)) + + // Send a few messages + for i := 1; i <= 3; i++ { + msg := BotMessage{ + MsgID: fmt.Sprintf("bot_msg_%d", i), + ChannelID: "987654321", + Author: "DiscordUser#1234", + Content: fmt.Sprintf("Message %d from Discord!", i), + Meta: map[string]interface{}{ + "reactions": []string{"👍", "❤️"}, + }, + Ts: time.Now().UTC().Format(time.RFC3339), + } + + msgData, _ := json.Marshal(msg) + if err := conn.WriteMessage(websocket.TextMessage, msgData); err != nil { + log.Fatalf("bot message write error: %v", err) + } + fmt.Printf("[BOT] Sent message: %s\n", msg.Content) + + // Read response + _, resp, err := conn.ReadMessage() + if err != nil { + log.Fatalf("bot response error: %v", err) + } + fmt.Printf("[BOT] Response: %s\n", string(resp)) + + time.Sleep(1 * time.Second) + } + + fmt.Println("[BOT] Closing connection") +} + +func main() { + fmt.Println("Starting WebSocket client simulator...") + fmt.Println("Connecting to ws://localhost:3333/ws with api_key=test_key") + + go simulateMod() + go simulateBot() + + // Let them run + time.Sleep(15 * time.Second) + fmt.Println("Simulator finished") +} diff --git a/util/logger/log.go b/util/logger/log.go index 72f7a71..75e5397 100644 --- a/util/logger/log.go +++ b/util/logger/log.go @@ -23,7 +23,7 @@ func New(id string, cfg config.LogConfig) (*slog.Logger, func() error, error) { cfg.Rotation = 7 } - console := slog.NewTextHandler(&prefixWriter{inner: os.Stderr, prefix: []byte("[" + id + "] "), startLine: true}, &slog.HandlerOptions{AddSource: true}) + console := slog.NewTextHandler(&prefixWriter{inner: os.Stderr, prefix: []byte("[" + id + "] "), startLine: true}, &slog.HandlerOptions{AddSource: true, Level: cfg.Level}) router := newFileRouter(cfg.Directory, cfg.Rotation, id) root := slogmulti.Fanout(console, router) diff --git a/ws/handlers.go b/ws/handlers.go new file mode 100644 index 0000000..238efbd --- /dev/null +++ b/ws/handlers.go @@ -0,0 +1,71 @@ +// handlers.go +package ws + +import ( + "context" + "encoding/json" + "log/slog" + "time" +) + +type LoggingModHandler struct { + logger *slog.Logger +} + +type LoggingBotHandler struct { + logger *slog.Logger +} + +func NewLoggingModHandler(logger *slog.Logger) *LoggingModHandler { + return &LoggingModHandler{logger: logger} +} + +func NewLoggingBotHandler(logger *slog.Logger) *LoggingBotHandler { + return &LoggingBotHandler{logger: logger} +} + +func (h *LoggingModHandler) Handle(ctx context.Context, msg GatewayModMessageIn) error { + // For now, just log and pretend it's being forwarded + // TODO: Look up channel_id from database using server + // TODO: Forward to bot connection(s) + + fwd := ForwardedModMessage{ + Type: "mod", + ServerID: msg.Server, + ChannelID: "TODO", // will come from database lookup + User: msg.User, + Content: msg.Content, + Meta: msg.Meta, + Ts: msg.Ts, + ReceivedAt: msg.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + b, _ := json.Marshal(fwd) + h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) + + return nil +} + +func (h *LoggingBotHandler) Handle(ctx context.Context, msg GatewayBotMessageIn) error { + // For now, just log and pretend it's being forwarded + // TODO: Look up server_id from database using channel_id + // TODO: Forward to mod connection(s) + + fwd := ForwardedBotMessage{ + Type: "bot", + ServerID: "TODO", // will come from database lookup + ChannelID: msg.ChannelID, + Author: msg.Author, + Content: msg.Content, + Meta: msg.Meta, + Ts: msg.Ts, + ReceivedAt: msg.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + b, _ := json.Marshal(fwd) + h.logger.Debug("forwarding bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "payload", string(b)) + + return nil +} diff --git a/ws/structs.go b/ws/structs.go index 2bf768c..bba61be 100644 --- a/ws/structs.go +++ b/ws/structs.go @@ -1,6 +1,8 @@ package ws import ( + "context" + "encoding/json" "log/slog" "sync" "time" @@ -13,13 +15,12 @@ type WebsocketGateway struct { apiKey string bodySizeBytes int64 - upgrader websocket.Upgrader - modConnsMu sync.Mutex - modConns map[*websocket.Conn]struct{} + upgrader websocket.Upgrader + connsMu sync.Mutex + conns map[*websocket.Conn]connMetadata - botConnsMu sync.Mutex - botConns map[*websocket.Conn]*sync.Mutex - outgoingCh chan GatewayMessageOut + modHandler ModHandler + botHandler BotHandler logger *slog.Logger closefn func() error @@ -30,11 +31,8 @@ type User struct { Name string `json:"name"` } -type Destination struct { - Channel string `json:"channel"` -} - -type GatewayMessageIn struct { +// GatewayModMessageIn : Mod -> Gateway -> Bot +type GatewayModMessageIn struct { MsgID string `json:"msg_id"` Server string `json:"server"` User User `json:"user"` @@ -44,12 +42,41 @@ type GatewayMessageIn struct { ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) } -type GatewayMessageOut struct { - Type string `json:"type"` // "message" - Payload GatewayMessageIn `json:"payload"` - Destination Destination `json:"destination"` - ForwardedBy string `json:"forwarded_by"` // "ws"/"http" - ForwardedAt time.Time `json:"forwarded_at"` +// GatewayBotMessageIn : Bot -> Gateway -> Mod +type GatewayBotMessageIn struct { + MsgID string `json:"msg_id"` + ChannelID string `json:"channel_id"` + Author string `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` + ReceivedAt time.Time `json:"-"` +} + +// ForwardedModMessage : Gateway -> Bot +type ForwardedModMessage struct { + Type string `json:"type"` // "mod" + ServerID string `json:"server_id"` + ChannelID string `json:"channel_id"` + User User `json:"user"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` + ReceivedAt time.Time `json:"received_at"` + ForwardedAt time.Time `json:"forwarded_at"` +} + +// ForwardedBotMessage : Gateway -> Mod +type ForwardedBotMessage struct { + Type string `json:"type"` // "bot" + ServerID string `json:"server_id"` + ChannelID string `json:"channel_id"` + Author string `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` + ReceivedAt time.Time `json:"received_at"` + ForwardedAt time.Time `json:"forwarded_at"` } type BotAck struct { @@ -57,3 +84,29 @@ type BotAck struct { MsgID string `json:"msg_id"` Status string `json:"status,omitempty"` // "queued"/"sent" } + +type MessageEnvelope struct { + Type string `json:"type"` // "mod" or "bot" + Data json.RawMessage `json:"data"` +} + +type ModHandler interface { + Handle(ctx context.Context, msg GatewayModMessageIn) error +} + +type BotHandler interface { + Handle(ctx context.Context, msg GatewayBotMessageIn) error +} + +type ModHandshake struct { + ServerID string `json:"server_id"` +} + +type BotHandshake struct { + BotID string `json:"bot_id"` +} + +type connMetadata struct { + connType string // "mod" or "bot" + id string // server_id or bot_id for logging +} diff --git a/ws/util.go b/ws/util.go index 904932d..aeeed09 100644 --- a/ws/util.go +++ b/ws/util.go @@ -2,7 +2,6 @@ package ws import ( "context" - "encoding/json" "fmt" "log/slog" "net/http" @@ -13,30 +12,11 @@ import ( "github.com/gorilla/websocket" ) -func (g *WebsocketGateway) StartGatewayWithForwarder() { - go func() { - for out := range g.Outgoing() { - b, _ := json.Marshal(out) - // use Info because these are normal forward events - g.logger.Info("forwarder -> bot", "msg", string(b)) - } - }() - +func (g *WebsocketGateway) Start() error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() - if err := g.Serve(ctx, fmt.Sprintf(":%d", g.port)); err != nil { - g.logger.Error("gateway serve error", err) - } - close(g.outgoingCh) -} - -func (g *WebsocketGateway) Outgoing() <-chan GatewayMessageOut { - return g.outgoingCh -} - -func (g *WebsocketGateway) OutgoingLen() int { - return len(g.outgoingCh) + return g.Serve(ctx, fmt.Sprintf(":%d", g.port)) } // util @@ -69,23 +49,23 @@ func loggingMiddleware(logger *slog.Logger, next http.Handler) http.Handler { // connections -func (g *WebsocketGateway) registerConn(c *websocket.Conn) { - g.modConnsMu.Lock() - g.modConns[c] = struct{}{} - g.modConnsMu.Unlock() +func (g *WebsocketGateway) registerConn(c *websocket.Conn, meta connMetadata) { + g.connsMu.Lock() + g.conns[c] = meta + g.connsMu.Unlock() } func (g *WebsocketGateway) unregisterConn(c *websocket.Conn) { - g.modConnsMu.Lock() - delete(g.modConns, c) - g.modConnsMu.Unlock() + g.connsMu.Lock() + delete(g.conns, c) + g.connsMu.Unlock() } func (g *WebsocketGateway) closeAll() { - g.modConnsMu.Lock() - defer g.modConnsMu.Unlock() + g.connsMu.Lock() + defer g.connsMu.Unlock() g.logger.Info("closing websocket connections") - for c := range g.modConns { + for c := range g.conns { _ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "shutting down"), time.Now().Add(time.Second)) _ = c.Close() } diff --git a/ws/validate.go b/ws/validate.go new file mode 100644 index 0000000..f8feff6 --- /dev/null +++ b/ws/validate.go @@ -0,0 +1,38 @@ +package ws + +import ( + "errors" + "strings" +) + +func (m *GatewayModMessageIn) Validate() error { + if strings.TrimSpace(m.MsgID) == "" { + return errors.New("msg_id missing") + } + if strings.TrimSpace(m.Server) == "" { + return errors.New("server missing") + } + if strings.TrimSpace(m.User.ID) == "" { + return errors.New("user.id missing") + } + if strings.TrimSpace(m.Content) == "" { + return errors.New("content missing") + } + return nil +} + +func (m *GatewayBotMessageIn) Validate() error { + if strings.TrimSpace(m.MsgID) == "" { + return errors.New("msg_id missing") + } + if strings.TrimSpace(m.ChannelID) == "" { + return errors.New("channel_id missing") + } + if strings.TrimSpace(m.Author) == "" { + return errors.New("author missing") + } + if strings.TrimSpace(m.Content) == "" { + return errors.New("content missing") + } + return nil +} diff --git a/ws/websocket.go b/ws/websocket.go index 0797a12..7cc062b 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -8,29 +8,12 @@ import ( "log/slog" "net" "net/http" - "strings" "time" "github.com/gorilla/websocket" ) -func (m *GatewayMessageIn) Validate() error { - if strings.TrimSpace(m.MsgID) == "" { - return errors.New("msg_id missing") - } - if strings.TrimSpace(m.Server) == "" { - return errors.New("server missing") - } - if strings.TrimSpace(m.User.ID) == "" { - return errors.New("user.mod_uid missing") - } - if strings.TrimSpace(m.Content) == "" { - return errors.New("content missing") - } - return nil -} - -func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error) *WebsocketGateway { +func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error, modH ModHandler, botH BotHandler) *WebsocketGateway { return &WebsocketGateway{ logger: logger, closefn: closefn, @@ -42,14 +25,14 @@ func NewWsGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() return true // local by default; change for production }, }, - outgoingCh: make(chan GatewayMessageOut, cfg.QueueSize), - modConns: make(map[*websocket.Conn]struct{}), + conns: make(map[*websocket.Conn]connMetadata), bodySizeBytes: int64(cfg.BodySize) * 1024 * 1024, port: cfg.HttpPort, + modHandler: modH, + botHandler: botH, } } -// Serve starts the HTTP server and /ws endpoint and blocks until ctx cancelled or server fails. func (g *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { mux := http.NewServeMux() mux.HandleFunc("/ws", g.handleWS) @@ -67,7 +50,7 @@ func (g *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { errCh := make(chan error, 1) go func() { g.logger.Info("ws gateway listening", "addr", listenAddr) - if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { errCh <- err } close(errCh) @@ -99,9 +82,6 @@ func (g *WebsocketGateway) handleWS(w http.ResponseWriter, r *http.Request) { return } - g.registerConn(conn) - g.logger.Info("ws connected", "remote", conn.RemoteAddr().String()) - // Configure read limits & pong handler if g.bodySizeBytes > 0 { conn.SetReadLimit(g.bodySizeBytes) @@ -115,69 +95,182 @@ func (g *WebsocketGateway) handleWS(w http.ResponseWriter, r *http.Request) { return nil }) - go g.readLoop(conn) + // First message must be a handshake identifying the connection type + typ, data, err := conn.ReadMessage() + if err != nil { + g.logger.Error("failed to read handshake", "err", err, "remote", conn.RemoteAddr().String()) + _ = conn.Close() + return + } + + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + g.logger.Warn("invalid handshake message type", "remote", conn.RemoteAddr().String()) + _ = writeJSONSafe(conn, map[string]string{"error": "first message must be handshake"}) + _ = conn.Close() + return + } + + var envelope MessageEnvelope + if err := json.Unmarshal(data, &envelope); err != nil { + g.logger.Warn("invalid handshake json", "err", err, "remote", conn.RemoteAddr().String()) + _ = writeJSONSafe(conn, map[string]string{"error": "invalid handshake: " + err.Error()}) + _ = conn.Close() + return + } + + meta := connMetadata{connType: envelope.Type} + + // Validate handshake based on type + switch envelope.Type { + case "mod": + var hs ModHandshake + if err := json.Unmarshal(envelope.Data, &hs); err != nil { + g.logger.Warn("invalid mod handshake", "err", err, "remote", conn.RemoteAddr().String()) + _ = writeJSONSafe(conn, map[string]string{"error": "invalid mod handshake: " + err.Error()}) + _ = conn.Close() + return + } + meta.id = hs.ServerID + g.registerConn(conn, meta) + g.logger.Info("mod connected", "server_id", hs.ServerID, "remote", conn.RemoteAddr().String()) + go g.modReadLoop(conn, meta) + + case "bot": + var hs BotHandshake + if err := json.Unmarshal(envelope.Data, &hs); err != nil { + g.logger.Warn("invalid bot handshake", "err", err, "remote", conn.RemoteAddr().String()) + _ = writeJSONSafe(conn, map[string]string{"error": "invalid bot handshake: " + err.Error()}) + _ = conn.Close() + return + } + meta.id = hs.BotID + g.registerConn(conn, meta) + g.logger.Info("bot connected", "bot_id", hs.BotID, "remote", conn.RemoteAddr().String()) + go g.botReadLoop(conn, meta) + + default: + g.logger.Warn("unknown connection type", "type", envelope.Type, "remote", conn.RemoteAddr().String()) + _ = writeJSONSafe(conn, map[string]string{"error": "unknown connection type: " + envelope.Type}) + _ = conn.Close() + return + } } -func (g *WebsocketGateway) readLoop(c *websocket.Conn) { +func (g *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connMetadata) { defer func() { g.unregisterConn(c) _ = c.Close() - g.logger.Info("ws disconnected", "remote", c.RemoteAddr().String()) + g.logger.Info("mod disconnected", "server_id", meta.id, "remote", c.RemoteAddr().String()) }() pingTicker := time.NewTicker(30 * time.Second) defer pingTicker.Stop() for { - // Read one message (blocks until message arrives) typ, data, err := c.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - g.logger.Warn("unexpected ws close", "err", err) + g.logger.Warn("unexpected mod close", "server_id", meta.id, "err", err) } else { - g.logger.Debug("ws read error", "err", err) + g.logger.Debug("mod read error", "server_id", meta.id, "err", err) } return } + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { continue } - var in GatewayMessageIn - if err := json.Unmarshal(data, &in); err != nil { + var msg GatewayModMessageIn + if err := json.Unmarshal(data, &msg); err != nil { _ = writeJSONSafe(c, map[string]string{"error": "invalid json: " + err.Error()}) - g.logger.Warn("invalid json from client", "remote", c.RemoteAddr().String(), "err", err) + g.logger.Warn("invalid json from mod", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } - in.ReceivedAt = time.Now().UTC() - if err := in.Validate(); err != nil { + + msg.ReceivedAt = time.Now().UTC() + if err := msg.Validate(); err != nil { _ = writeJSONSafe(c, map[string]string{"error": err.Error()}) - g.logger.Warn("message validation failed", "remote", c.RemoteAddr().String(), "err", err) + g.logger.Warn("mod message validation failed", "server_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) continue } - out := GatewayMessageOut{ - Type: "message", - Payload: in, - ForwardedAt: time.Now().UTC(), + // Handle the message (forward to bot, enrich, etc.) + if err := g.modHandler.Handle(context.Background(), msg); err != nil { + _ = writeJSONSafe(c, map[string]string{"error": "handler error: " + err.Error()}) + g.logger.Error("mod handler error", "server_id", meta.id, "err", err) + continue } - // Non-blocking enqueue with backpressure - select { - case g.outgoingCh <- out: - _ = writeJSONSafe(c, map[string]string{"status": "queued"}) - g.logger.Debug("enqueued message", "msg_id", in.MsgID, "server", in.Server) - default: - _ = writeJSONSafe(c, map[string]string{"error": "gateway busy"}) - g.logger.Warn("outgoing queue full", "msg_id", in.MsgID, "remote", c.RemoteAddr().String()) - } + _ = writeJSONSafe(c, map[string]string{"status": "ok"}) - // also handle pings periodically (so client sees ping frequently) + // Handle pings select { case <-pingTicker.C: _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - g.logger.Debug("write ping failed", "err", err) + g.logger.Debug("write ping failed", "server_id", meta.id, "err", err) + return + } + default: + } + } +} + +func (g *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connMetadata) { + defer func() { + g.unregisterConn(c) + _ = c.Close() + g.logger.Info("bot disconnected", "bot_id", meta.id, "remote", c.RemoteAddr().String()) + }() + + pingTicker := time.NewTicker(30 * time.Second) + defer pingTicker.Stop() + + for { + typ, data, err := c.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + g.logger.Warn("unexpected bot close", "bot_id", meta.id, "err", err) + } else { + g.logger.Debug("bot read error", "bot_id", meta.id, "err", err) + } + return + } + + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + continue + } + + var msg GatewayBotMessageIn + if err := json.Unmarshal(data, &msg); err != nil { + _ = writeJSONSafe(c, map[string]string{"error": "invalid json: " + err.Error()}) + g.logger.Warn("invalid json from bot", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + continue + } + + msg.ReceivedAt = time.Now().UTC() + if err := msg.Validate(); err != nil { + _ = writeJSONSafe(c, map[string]string{"error": err.Error()}) + g.logger.Warn("bot message validation failed", "bot_id", meta.id, "remote", c.RemoteAddr().String(), "err", err) + continue + } + + // Handle the message (forward to mod, enrich, etc.) + if err := g.botHandler.Handle(context.Background(), msg); err != nil { + _ = writeJSONSafe(c, map[string]string{"error": "handler error: " + err.Error()}) + g.logger.Error("bot handler error", "bot_id", meta.id, "err", err) + continue + } + + _ = writeJSONSafe(c, map[string]string{"status": "ok"}) + + // Handle pings + select { + case <-pingTicker.C: + _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { + g.logger.Debug("write ping failed", "bot_id", meta.id, "err", err) return } default: