diff --git a/controller/controller.go b/controller/controller.go index 86873b4..2584fe1 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -12,11 +12,8 @@ func NewGatewayController(cfg config.Config) GatewayController { panic(err) } - modHandler := ws.NewLoggingModHandler(wsl) - botHandler := ws.NewLoggingBotHandler(wsl) - return GatewayController{ - Websocket: ws.NewWebsocketGateway(cfg.Gateway, wsl, wCloseFn, modHandler, botHandler), + Websocket: ws.NewWebsocketGateway(cfg.Gateway, wsl, wCloseFn), HttpServer: HttpGateway{}, } } diff --git a/main.go b/main.go index 1be8242..fa5670f 100644 --- a/main.go +++ b/main.go @@ -19,7 +19,3 @@ func main() { panic(err) } } - -/** TODO -- queue for messages, both ways (Ack "queued" instead of "completed"), filled queue drops oldest entry -*/ diff --git a/sim.go b/sim.go index e534411..6530720 100644 --- a/sim.go +++ b/sim.go @@ -34,23 +34,24 @@ type GatewayAck struct { Type string `json:"type"` } -type MinecraftUser struct { +type User struct { ID string `json:"id"` Name string `json:"name"` } type Destination struct { - ChannelID string `json:"channel_id"` + ID string `json:"channel_id,omitempty"` } -type GatewayModMessageIn struct { - MsgID string `json:"msg_id"` - Server string `json:"server"` - Destination Destination `json:"destination"` - Author MinecraftUser `json:"author"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` +type GatewayMessageIn struct { + ID string `json:"id"` // where am I from (channel_id or server_id) + MsgID string `json:"msg_id"` // msg id + Destination Destination `json:"destination,omitempty"` // where do I wanna go (channel_id or empty if from Bot) + Author User `json:"author"` // who sent the message + Content string `json:"content"` // message content + Meta map[string]interface{} `json:"meta,omitempty"` // additional metadata + Ts time.Time `json:"ts,omitempty"` // timestamp + ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) } func main() { @@ -140,18 +141,18 @@ func main() { // Optional: Send a test message after connecting time.Sleep(2 * time.Second) - testMsg := GatewayModMessageIn{ - MsgID: "test-msg-001", - Server: serverID, + testMsg := GatewayMessageIn{ + MsgID: "test-msg-001", + ID: serverID, Destination: Destination{ - ChannelID: "123456789", + ID: "123456789", }, - Author: MinecraftUser{ + Author: User{ ID: "player-uuid-123", Name: "TestPlayer", }, Content: "Hello from simulated mod!", - Ts: time.Now().UTC().Format(time.RFC3339), + Ts: time.Now().UTC(), } if err := conn.WriteJSON(testMsg); err != nil { diff --git a/util/queue/structs.go b/util/queue/structs.go new file mode 100644 index 0000000..0588606 --- /dev/null +++ b/util/queue/structs.go @@ -0,0 +1,66 @@ +package queue + +import ( + "sync" +) + +// Queue is a thin wrapper around a buffered channel. +type Queue[T any] struct { + ch chan T + closedMu sync.Mutex + closed bool + closedCh chan struct{} +} + +func NewQueue[T any](capacity int) *Queue[T] { + if capacity <= 0 { + panic("capacity > 0 required") + } + + return &Queue[T]{ + ch: make(chan T, capacity), + closedCh: make(chan struct{}), + } +} + +// Cap returns capacity. +func (q *Queue[T]) Cap() int { return cap(q.ch) } + +// Len returns current length (snapshot). +func (q *Queue[T]) Len() int { return len(q.ch) } + +// Enqueue returns immediately: true if enqueued, false otherwise. +func (q *Queue[T]) Enqueue(v T) bool { + select { + case q.ch <- v: + return true + default: + return false + } +} + +// Dequeue returns immediately (item, true) or (zero, false) if empty. +func (q *Queue[T]) Dequeue() (T, bool) { + var zero T + select { + case v := <-q.ch: + return v, true + default: + return zero, false + } +} + +// Close closes the queue. Further Enqueue attempts return ErrClosed. Consumers drain until channel empty then see ErrClosed. +func (q *Queue[T]) Close() { + q.closedMu.Lock() + + if q.closed { + q.closedMu.Unlock() + return + } + + q.closed = true + close(q.closedCh) + close(q.ch) + q.closedMu.Unlock() +} diff --git a/ws/handlers.go b/ws/handlers.go index 838241f..d05b927 100644 --- a/ws/handlers.go +++ b/ws/handlers.go @@ -29,20 +29,20 @@ func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) typ, data, err := conn.ReadMessage() if err != nil { - wsg.sendWebsocketError(conn, "Internal Server Error", 500) + wsg.sendWebsocketError(conn, "Internal Server Error", 500, true) wsg.logger.Error("Failed to read handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } if typ != websocket.TextMessage && typ != websocket.BinaryMessage { - wsg.sendWebsocketError(conn, "First message must be a handshake.", 400) + wsg.sendWebsocketError(conn, "Initial message must be a handshake.", 400, true) wsg.logger.Warn("Invalid handshake message type.", "remote", conn.RemoteAddr().String()) return } var handshake Handshake if err := json.Unmarshal(data, &handshake); err != nil { - wsg.sendWebsocketError(conn, "Malformed handshake.", 400) + wsg.sendWebsocketError(conn, "Malformed handshake.", 400, true) wsg.logger.Warn("Malformed handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } @@ -54,7 +54,7 @@ func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) var mhs ModHandshake if err := json.Unmarshal(handshake.Data, &mhs); err != nil { - wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400) + wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400, true) wsg.logger.Warn("Malformed mod handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } @@ -68,13 +68,13 @@ func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) wsg.registerConn(conn, meta, "mod") wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) - go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps + go wsg.read(conn, meta, "mod") case "bot": var bhs BotHandshake if err := json.Unmarshal(handshake.Data, &bhs); err != nil { - wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400) + wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400, true) wsg.logger.Warn("Malformed bot handshake.", "remote", conn.RemoteAddr().String(), "err", err) return } @@ -82,7 +82,7 @@ func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) meta.ID = bhs.BotID if ok := wsg.registerConn(conn, meta, "bot"); !ok { - wsg.sendWebsocketError(conn, "Bot already connected.", 409) + wsg.sendWebsocketError(conn, "Bot already connected.", 409, true) return } @@ -92,10 +92,10 @@ func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) - go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps + go wsg.read(conn, meta, "bot") default: - wsg.sendWebsocketError(conn, "Unknown handshake.", 400) + wsg.sendWebsocketError(conn, "Unknown handshake.", 400, true) wsg.logger.Warn("Unknown connection type.", "remote", conn.RemoteAddr().String(), "type", handshake.Type) return } diff --git a/ws/structs.go b/ws/structs.go index 9cbd361..96ea5b0 100644 --- a/ws/structs.go +++ b/ws/structs.go @@ -3,6 +3,7 @@ package ws import ( "encoding/json" "homestead/homestead_gateway/util/cache" + "homestead/homestead_gateway/util/queue" "log/slog" "time" @@ -17,73 +18,43 @@ type WebsocketGateway struct { upgrader websocket.Upgrader cache *cache.Cache + queue *queue.Queue[GatewayMessageOut] + bot *websocket.Conn conns *cache.ConnectionCache - modHandler ModHandler - botHandler BotHandler - logger *slog.Logger closeFn func() error } -type MinecraftUser struct { - ID string `json:"id"` - Name string `json:"name"` -} - -type DiscordUser struct { +type User struct { ID string `json:"id"` Name string `json:"name"` } type Destination struct { - ChannelID string `json:"channel_id"` + ID string `json:"channel_id,omitempty"` } -// GatewayModMessageIn : Mod -> Gateway -> Bot -type GatewayModMessageIn struct { - MsgID string `json:"msg_id"` - Server string `json:"server"` - Destination Destination `json:"destination"` - Author MinecraftUser `json:"author"` +type GatewayMessageIn struct { + Type string + ID string `json:"id"` // where am I from (channel_id or server_id) + MsgID string `json:"msg_id"` // msg id + Destination Destination `json:"destination,omitempty"` // where do I wanna go (channel_id or empty if from Bot) + Author User `json:"author"` // who sent the message + Content string `json:"content"` // message content + Meta map[string]interface{} `json:"meta,omitempty"` // additional metadata + Ts time.Time `json:"ts,omitempty"` // timestamp + ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) +} + +type GatewayMessageOut struct { + Type string `json:"type"` // "mod"|"bot" + ID string `json:"channel_id,omitempty"` // message.Destination.ID + Author User `json:"author"` Content string `json:"content"` Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` - ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) -} - -// GatewayBotMessageIn : Bot -> Gateway -> Mod -type GatewayBotMessageIn struct { - MsgID string `json:"msg_id"` - ChannelID string `json:"channel_id"` - Author DiscordUser `json:"author"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` - ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from bot) -} - -// GatewayModMessageOut : Gateway -> Bot -type GatewayModMessageOut struct { - Type string `json:"type"` // "mod" - ChannelID string `json:"channel_id"` - Author MinecraftUser `json:"author"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` - ReceivedAt time.Time `json:"received_at"` - ForwardedAt time.Time `json:"forwarded_at"` -} - -// GatewayBotMessageOut : Gateway -> Mod -type GatewayBotMessageOut struct { - Type string `json:"type"` // "bot" - ChannelID string `json:"channel_id"` - Author DiscordUser `json:"author"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` + Ts time.Time `json:"ts,omitempty"` ReceivedAt time.Time `json:"received_at"` ForwardedAt time.Time `json:"forwarded_at"` } @@ -105,11 +76,3 @@ type ModHandshake struct { type BotHandshake struct { BotID string `json:"bot_id"` } - -type ModHandler interface { - Handle(conn *websocket.Conn, msg GatewayModMessageIn) error -} - -type BotHandler interface { - Handle(conn *websocket.Conn, msg GatewayBotMessageIn) error -} diff --git a/ws/temp.go b/ws/temp.go index 8a64b07..86ca50b 100644 --- a/ws/temp.go +++ b/ws/temp.go @@ -1,78 +1,47 @@ package ws -import ( - "encoding/json" - "log/slog" - "time" - - "github.com/gorilla/websocket" -) - -type LoggingModHandler struct { - logger *slog.Logger -} - -type LoggingBotHandler struct { - logger *slog.Logger -} - -func NewLoggingModHandler(logger *slog.Logger) *LoggingModHandler { - return &LoggingModHandler{logger: logger} -} - -func NewLoggingBotHandler(logger *slog.Logger) *LoggingBotHandler { - return &LoggingBotHandler{logger: logger} -} - -func (h *LoggingModHandler) Handle(conn *websocket.Conn, msg GatewayModMessageIn) error { - // For now, just log and pretend it's being forwarded - // TODO: Look up channel_id from database using server - // TODO: Forward to bot connection(s) - - fwd := GatewayModMessageOut{ - Type: "mod", - ChannelID: "TODO", // will come from database lookup - Author: msg.Author, - Content: msg.Content, - Meta: msg.Meta, - Ts: msg.Ts, - ReceivedAt: msg.ReceivedAt, - ForwardedAt: time.Now().UTC(), - } - - _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) - - if err := conn.WriteJSON(fwd); err != nil { - _ = conn.Close() - return err - } - - b, _ := json.Marshal(fwd) - h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content) - h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) - - return nil -} - -func (h *LoggingBotHandler) Handle(conn *websocket.Conn, msg GatewayBotMessageIn) error { - // For now, just log and pretend it's being forwarded - // TODO: Look up server_id from database using channel_id - // TODO: Forward to mod connection(s) - - fwd := GatewayBotMessageOut{ - Type: "bot", - ChannelID: msg.ChannelID, - Author: msg.Author, - Content: msg.Content, - Meta: msg.Meta, - Ts: msg.Ts, - ReceivedAt: msg.ReceivedAt, - ForwardedAt: time.Now().UTC(), - } - - b, _ := json.Marshal(fwd) - h.logger.Info("received bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "author", msg.Author, "content", msg.Content) - h.logger.Debug("forwarding bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "payload", string(b)) - - return nil -} +//type LoggingModHandler struct { +// logger *slog.Logger +//} +// +//type LoggingBotHandler struct { +// logger *slog.Logger +//} +// +//func NewLoggingModHandler(logger *slog.Logger) *LoggingModHandler { +// return &LoggingModHandler{logger: logger} +//} +// +//func NewLoggingBotHandler(logger *slog.Logger) *LoggingBotHandler { +// return &LoggingBotHandler{logger: logger} +//} +// +//func (h *LoggingModHandler) Handle(conn *websocket.Conn, msg GatewayMessageIn) error { +// // For now, just log and pretend it's being forwarded +// // TODO: Look up channel_id from database using server +// // TODO: Forward to bot connection(s) +// +// fwd := GatewayMessageOut{ +// Type: "mod", +// ChannelID: "TODO", // will come from database lookup +// Author: msg.Author, +// Content: msg.Content, +// Meta: msg.Meta, +// Ts: msg.Ts, +// ReceivedAt: msg.ReceivedAt, +// ForwardedAt: time.Now().UTC(), +// } +// +// _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) +// +// if err := conn.WriteJSON(fwd); err != nil { +// _ = conn.Close() +// return err +// } +// +// b, _ := json.Marshal(fwd) +// h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content) +// h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) +// +// return nil +//} diff --git a/ws/util.go b/ws/util.go index 376dda7..6003517 100644 --- a/ws/util.go +++ b/ws/util.go @@ -47,10 +47,12 @@ func (wsg *WebsocketGateway) sendWebsocketPing(conn *websocket.Conn) { _ = conn.WriteMessage(websocket.PingMessage, nil) } -func (wsg *WebsocketGateway) sendWebsocketError(conn *websocket.Conn, message string, code int) { +func (wsg *WebsocketGateway) sendWebsocketError(conn *websocket.Conn, message string, code int, close bool) { _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) _ = conn.WriteJSON(map[string]interface{}{"message": message, "code": code}) - _ = conn.Close() + if close { + _ = conn.Close() + } } func (wsg *WebsocketGateway) sendWebsocketResponse(conn *websocket.Conn, content interface{}) error { diff --git a/ws/validate.go b/ws/validate.go index 6e494b3..1431e47 100644 --- a/ws/validate.go +++ b/ws/validate.go @@ -5,34 +5,23 @@ import ( "strings" ) -func (m *GatewayModMessageIn) Validate() error { +func (m *GatewayMessageIn) Validate() error { + if strings.TrimSpace(m.ID) == "" { + return errors.New("id missing") + } if strings.TrimSpace(m.MsgID) == "" { return errors.New("msg_id missing") } - if strings.TrimSpace(m.Server) == "" { - return errors.New("server missing") - } if strings.TrimSpace(m.Author.ID) == "" { return errors.New("author.id missing") } if strings.TrimSpace(m.Content) == "" { return errors.New("content missing") } - return nil -} -func (m *GatewayBotMessageIn) Validate() error { - if strings.TrimSpace(m.MsgID) == "" { - return errors.New("msg_id missing") - } - if strings.TrimSpace(m.ChannelID) == "" { - return errors.New("channel_id missing") - } - if strings.TrimSpace(m.Author.ID) == "" { - return errors.New("author missing") - } - if strings.TrimSpace(m.Content) == "" { - return errors.New("content missing") + if m.Type == "mod" && strings.TrimSpace(m.Destination.ID) == "" { + return errors.New("destination.channel_id missing") } + return nil } diff --git a/ws/websocket.go b/ws/websocket.go index a9f1cd8..c9e8d64 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -6,6 +6,7 @@ import ( "fmt" "homestead/homestead_gateway/util/cache" "homestead/homestead_gateway/util/config" + "homestead/homestead_gateway/util/queue" "log/slog" "net" "net/http" @@ -16,16 +17,15 @@ import ( "github.com/gorilla/websocket" ) -func NewWebsocketGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error, modH ModHandler, botH BotHandler) *WebsocketGateway { +func NewWebsocketGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error) *WebsocketGateway { return &WebsocketGateway{ - logger: logger, - closeFn: closefn, - modHandler: modH, - botHandler: botH, - port: cfg.HttpPort, - apiKey: cfg.Websocket, - cache: cache.NewCache(), - conns: cache.NewConnectionCache(), + logger: logger, + closeFn: closefn, + port: cfg.HttpPort, + apiKey: cfg.Websocket, + cache: cache.NewCache(), + queue: queue.NewQueue[GatewayMessageOut](32), + conns: cache.NewConnectionCache(), upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, @@ -69,139 +69,251 @@ func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error // -func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { +//func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { +// defer func() { +// wsg.unregisterConn(conn, meta, "mod") +// wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) +// }() +// +// ticker := time.NewTicker(30 * time.Second) +// defer ticker.Stop() +// +// go func() { +// for range ticker.C { +// wsg.sendWebsocketPing(conn) +// } +// }() +// +// for { +// typ, data, err := conn.ReadMessage() +// +// if err != nil { +// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { +// wsg.logger.Warn("Mod-Client unexpectedly closed the connection.", "err", err) +// } +// return +// } +// +// if typ != websocket.TextMessage && typ != websocket.BinaryMessage { +// continue +// } +// +// var msg GatewayModMessageIn +// if err := json.Unmarshal(data, &msg); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) +// wsg.logger.Warn("invalid json from mod", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) +// continue +// } +// +// msg.ReceivedAt = time.Now().UTC() +// if err := msg.Validate(); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) +// wsg.logger.Warn("mod message validation failed", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) +// continue +// } +// +// // Handle the message (forward to bot, enrich, etc.) +// if err := wsg.modHandler.Handle(conn, msg); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) +// wsg.logger.Error("mod handler error", "server_id", meta.ID, "err", err) +// continue +// } +// +// _ = writeJSONSafe(conn, map[string]string{"status": "completed"}) // or "queued" +// } +//} +// +//func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { +// defer func() { +// wsg.unregisterConn(conn, meta, "bot") +// wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String()) +// }() +// +// pingTicker := time.NewTicker(30 * time.Second) +// defer pingTicker.Stop() +// +// // Send pings in a separate goroutine +// go func() { +// for range pingTicker.C { +// _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) +// if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { +// wsg.logger.Debug("write ping failed", "bot_id", meta.ID, "err", err) +// return +// } +// wsg.logger.Debug("sent ping to bot", "bot_id", meta.ID) +// } +// }() +// +// for { +// typ, data, err := conn.ReadMessage() +// if err != nil { +// if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { +// wsg.logger.Warn("unexpected bot close", "bot_id", meta.ID, "err", err) +// } else { +// wsg.logger.Debug("bot read error", "bot_id", meta.ID, "err", err) +// } +// return +// } +// +// if typ != websocket.TextMessage && typ != websocket.BinaryMessage { +// continue +// } +// +// var msg GatewayBotMessageIn +// if err := json.Unmarshal(data, &msg); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) +// wsg.logger.Warn("invalid json from bot", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) +// continue +// } +// +// msg.ReceivedAt = time.Now().UTC() +// if err := msg.Validate(); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) +// wsg.logger.Warn("bot message validation failed", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) +// continue +// } +// +// // Handle the message (forward to mod, enrich, etc.) +// if err := wsg.botHandler.Handle(conn, msg); err != nil { +// _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) +// wsg.logger.Error("bot handler error", "bot_id", meta.ID, "err", err) +// continue +// } +// +// _ = writeJSONSafe(conn, map[string]string{"status": "ok"}) +// } +//} + +func (wsg *WebsocketGateway) read(conn *websocket.Conn, meta cache.ConnectionMetaData, _type string) { defer func() { - wsg.unregisterConn(conn, meta, "mod") - wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) - }() - - ticker := time.NewTicker(30 * time.Second) - defer ticker.Stop() - - go func() { - for range ticker.C { - wsg.sendWebsocketPing(conn) - } - }() - - for { - typ, data, err := conn.ReadMessage() - - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - wsg.logger.Warn("Mod-Client unexpectedly closed the connection.", "err", err) - } - return - } - - if typ != websocket.TextMessage && typ != websocket.BinaryMessage { - continue - } - - var msg GatewayModMessageIn - if err := json.Unmarshal(data, &msg); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) - wsg.logger.Warn("invalid json from mod", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) - continue - } - - msg.ReceivedAt = time.Now().UTC() - if err := msg.Validate(); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) - wsg.logger.Warn("mod message validation failed", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) - continue - } - - // Handle the message (forward to bot, enrich, etc.) - if err := wsg.modHandler.Handle(conn, msg); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) - wsg.logger.Error("mod handler error", "server_id", meta.ID, "err", err) - continue - } - - _ = writeJSONSafe(conn, map[string]string{"status": "completed"}) // or "queued" - } -} - -func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { - defer func() { - wsg.unregisterConn(conn, meta, "bot") - wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String()) - }() - - pingTicker := time.NewTicker(30 * time.Second) - defer pingTicker.Stop() - - // Send pings in a separate goroutine - go func() { - for range pingTicker.C { - _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) - if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { - wsg.logger.Debug("write ping failed", "bot_id", meta.ID, "err", err) - return - } - wsg.logger.Debug("sent ping to bot", "bot_id", meta.ID) - } - }() - - for { - typ, data, err := conn.ReadMessage() - if err != nil { - if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { - wsg.logger.Warn("unexpected bot close", "bot_id", meta.ID, "err", err) - } else { - wsg.logger.Debug("bot read error", "bot_id", meta.ID, "err", err) - } - return - } - - if typ != websocket.TextMessage && typ != websocket.BinaryMessage { - continue - } - - var msg GatewayBotMessageIn - if err := json.Unmarshal(data, &msg); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) - wsg.logger.Warn("invalid json from bot", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) - continue - } - - msg.ReceivedAt = time.Now().UTC() - if err := msg.Validate(); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) - wsg.logger.Warn("bot message validation failed", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) - continue - } - - // Handle the message (forward to mod, enrich, etc.) - if err := wsg.botHandler.Handle(conn, msg); err != nil { - _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) - wsg.logger.Error("bot handler error", "bot_id", meta.ID, "err", err) - continue - } - - _ = writeJSONSafe(conn, map[string]string{"status": "ok"}) - } -} - -func (wsg *WebsocketGateway) read(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) { - defer func() { - wsg.unregisterConn(conn, meta, typ) + wsg.unregisterConn(conn, meta, _type) wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String()) }() ticker := time.NewTicker(30 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer ticker.Stop() + defer cancel() go func() { - for range ticker.C { - wsg.sendWebsocketPing(conn) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + wsg.sendWebsocketPing(conn) + } } }() - switch typ { - case "mod": + for { + typ, data, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + wsg.logger.Error("Client unexpectedly closed the connection.", "err", err) + } + return + } - case "bot": + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + continue + } + + ts := time.Now().UTC() + var message GatewayMessageIn + if err := json.Unmarshal(data, &message); err != nil { + wsg.sendWebsocketError(conn, "Malformed message.", 400, false) + wsg.logger.Warn("Received malformed message json from client.", "remote", conn.RemoteAddr().String(), "err", err) + continue + } + + message.Type = _type + message.ReceivedAt = ts + if err := message.Validate(); err != nil { + wsg.sendWebsocketError(conn, "Malformed message.", 400, false) + wsg.logger.Warn("Received malformed message json from client; validation failed.", "remote", conn.RemoteAddr().String(), "err", err) + continue + } + + var ok bool + var destConn *websocket.Conn + + switch message.Type { + case "mod": + if wsg.bot != nil { + ok = true + destConn = wsg.bot + } else { + ok = false + } + case "bot": + var id string + id, ok = wsg.cache.GetByChannelId(message.ID) + if ok { + var dest *websocket.Conn + dest, _, ok = wsg.conns.GetById(id) + if ok { + destConn = dest + } else { + wsg.sendWebsocketError(conn, "Internal Server Error", 500, true) + wsg.logger.Error("Invalid cache structure.", "remote", conn.RemoteAddr().String(), "id", id) + return + } + } + default: + panic("invalid message type") + } + + if ok { + if destConn == nil { + wsg.sendWebsocketError(conn, "Internal Server Error", 500, true) + wsg.logger.Error("Destination connection unavailable.", "remote", conn.RemoteAddr().String()) + return + } + + err = wsg.sendWebsocketResponse(destConn, GatewayMessageOut{ + Type: message.Type, + ID: message.Destination.ID, + Author: message.Author, + Content: message.Content, + Meta: message.Meta, + Ts: message.Ts, + ReceivedAt: message.ReceivedAt, + ForwardedAt: time.Now().UTC(), + }) + + if err != nil { + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) + wsg.logger.Error("Failed to forward message.", "remote", conn.RemoteAddr().String(), "err", err) + continue + } + + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "completed", Type: message.Type}) + continue + } + + if message.Type == "mod" { + wsg.cache.Set(message.ID, message.Destination.ID) + } + + out := GatewayMessageOut{ + Type: message.Type, + ID: message.Destination.ID, + Author: message.Author, + Content: message.Content, + Meta: message.Meta, + Ts: message.Ts, + ReceivedAt: message.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + queued := wsg.queue.Enqueue(out) + if !queued { + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) + wsg.logger.Warn("Failed to queue message.", "remote", conn.RemoteAddr().String()) + continue + } + + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "queued", Type: message.Type}) } }