diff --git a/sim.go b/sim.go index a8257e6..2309c1a 100644 --- a/sim.go +++ b/sim.go @@ -2,15 +2,23 @@ package main import ( "encoding/json" - "fmt" "log" "net/url" + "os" + "os/signal" + "syscall" "time" "github.com/gorilla/websocket" ) -type MessageEnvelope struct { +const ( + gatewayURL = "ws://localhost:3333/push" + apiKey = "gateway" + serverID = "test-server-001" +) + +type Handshake struct { Type string `json:"type"` Data json.RawMessage `json:"data"` } @@ -19,173 +27,163 @@ type ModHandshake struct { ServerID string `json:"server_id"` } -type BotHandshake struct { - BotID string `json:"bot_id"` +type GatewayAck struct { + Status string `json:"status"` + Type string `json:"type"` } -type User struct { +type MinecraftUser struct { ID string `json:"id"` Name string `json:"name"` } -type ModMessage struct { - MsgID string `json:"msg_id"` - Server string `json:"server"` - User User `json:"user"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` +type Destination struct { + ChannelID string `json:"channel_id"` } -type BotMessage struct { - MsgID string `json:"msg_id"` - ChannelID string `json:"channel_id"` - Author string `json:"author"` - Content string `json:"content"` - Meta map[string]interface{} `json:"meta,omitempty"` - Ts string `json:"ts,omitempty"` -} - -func simulateMod() { - u := url.URL{Scheme: "ws", Host: "localhost:3333", Path: "/ws"} - u.RawQuery = "api_key=gateway" - - conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - log.Fatalf("mod dial error: %v", err) - } - defer conn.Close() - - fmt.Println("[MOD] Connected") - - // Send handshake - handshake := ModHandshake{ServerID: "survival_server"} - hsData, _ := json.Marshal(handshake) - envelope := MessageEnvelope{ - Type: "mod", - Data: hsData, - } - envData, _ := json.Marshal(envelope) - - if err := conn.WriteMessage(websocket.TextMessage, envData); err != nil { - log.Fatalf("mod handshake write error: %v", err) - } - fmt.Println("[MOD] Sent handshake") - - // Read handshake response - _, resp, err := conn.ReadMessage() - if err != nil { - log.Fatalf("mod handshake response error: %v", err) - } - fmt.Printf("[MOD] Handshake response: %s\n", string(resp)) - - // Send a few messages - for i := 1; i <= 3; i++ { - msg := ModMessage{ - MsgID: fmt.Sprintf("mod_msg_%d", i), - Server: "survival_server", - User: User{ID: "player_123", Name: "Steve"}, - Content: fmt.Sprintf("Message %d from Minecraft!", i), - Meta: map[string]interface{}{ - "coordinates": map[string]int{"x": 100 + i*10, "y": 64, "z": 200}, - }, - Ts: time.Now().UTC().Format(time.RFC3339), - } - - msgData, _ := json.Marshal(msg) - if err := conn.WriteMessage(websocket.TextMessage, msgData); err != nil { - log.Fatalf("mod message write error: %v", err) - } - fmt.Printf("[MOD] Sent message: %s\n", msg.Content) - - // Read response - _, resp, err := conn.ReadMessage() - if err != nil { - log.Fatalf("mod response error: %v", err) - } - fmt.Printf("[MOD] Response: %s\n", string(resp)) - - time.Sleep(1 * time.Second) - } - - fmt.Println("[MOD] Closing connection") -} - -func simulateBot() { - time.Sleep(2 * time.Second) // Let mod connect first - - u := url.URL{Scheme: "ws", Host: "localhost:3333", Path: "/ws"} - u.RawQuery = "api_key=gateway" - - conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) - if err != nil { - log.Fatalf("bot dial error: %v", err) - } - defer conn.Close() - - fmt.Println("[BOT] Connected") - - // Send handshake - handshake := BotHandshake{BotID: "discord_bot_1"} - hsData, _ := json.Marshal(handshake) - envelope := MessageEnvelope{ - Type: "bot", - Data: hsData, - } - envData, _ := json.Marshal(envelope) - - if err := conn.WriteMessage(websocket.TextMessage, envData); err != nil { - log.Fatalf("bot handshake write error: %v", err) - } - fmt.Println("[BOT] Sent handshake") - - // Read handshake response - _, resp, err := conn.ReadMessage() - if err != nil { - log.Fatalf("bot handshake response error: %v", err) - } - fmt.Printf("[BOT] Handshake response: %s\n", string(resp)) - - // Send a few messages - for i := 1; i <= 3; i++ { - msg := BotMessage{ - MsgID: fmt.Sprintf("bot_msg_%d", i), - ChannelID: "987654321", - Author: "DiscordUser#1234", - Content: fmt.Sprintf("Message %d from Discord!", i), - Meta: map[string]interface{}{ - "reactions": []string{"👍", "❤️"}, - }, - Ts: time.Now().UTC().Format(time.RFC3339), - } - - msgData, _ := json.Marshal(msg) - if err := conn.WriteMessage(websocket.TextMessage, msgData); err != nil { - log.Fatalf("bot message write error: %v", err) - } - fmt.Printf("[BOT] Sent message: %s\n", msg.Content) - - // Read response - _, resp, err := conn.ReadMessage() - if err != nil { - log.Fatalf("bot response error: %v", err) - } - fmt.Printf("[BOT] Response: %s\n", string(resp)) - - time.Sleep(1 * time.Second) - } - - fmt.Println("[BOT] Closing connection") +type GatewayModMessageIn struct { + MsgID string `json:"msg_id"` + Server string `json:"server"` + Destination Destination `json:"destination"` + Author MinecraftUser `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts string `json:"ts,omitempty"` } func main() { - fmt.Println("Starting WebSocket client simulator...") - fmt.Println("Connecting to ws://localhost:3333/ws with api_key=gateway") + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) - go simulateMod() - go simulateBot() + // Build WebSocket URL with API key + u, err := url.Parse(gatewayURL) + if err != nil { + log.Fatalf("Failed to parse URL: %v", err) + } + q := u.Query() + q.Set("api_key", apiKey) + u.RawQuery = q.Encode() - // Let them run - time.Sleep(15 * time.Second) - fmt.Println("Simulator finished") + log.Printf("Connecting to %s", u.String()) + + // Connect to WebSocket + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + log.Println("Connected to gateway") + + // Set up ping handler - respond to pings from server + conn.SetPingHandler(func(appData string) error { + log.Println("Received ping from server, sending pong") + err := conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(5*time.Second)) + if err != nil { + log.Printf("Failed to send pong: %v", err) + return err + } + return nil + }) + + // Send handshake + modHS := ModHandshake{ServerID: serverID} + modHSData, err := json.Marshal(modHS) + if err != nil { + log.Fatalf("Failed to marshal mod handshake: %v", err) + } + + handshake := Handshake{ + Type: "mod", + Data: modHSData, + } + + if err := conn.WriteJSON(handshake); err != nil { + log.Fatalf("Failed to send handshake: %v", err) + } + + log.Println("Handshake sent") + + // Read acknowledgment + var ack GatewayAck + if err := conn.ReadJSON(&ack); err != nil { + log.Fatalf("Failed to read acknowledgment: %v", err) + } + + log.Printf("Received acknowledgment: status=%s, type=%s", ack.Status, ack.Type) + + // Channel for incoming messages + done := make(chan struct{}) + + // Read loop - handles incoming messages and processes control frames + go func() { + defer close(done) + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.Printf("WebSocket error: %v", err) + } else { + log.Printf("Connection closed: %v", err) + } + return + } + + // Only log text/binary messages (ping/pong handled by handlers) + if messageType == websocket.TextMessage || messageType == websocket.BinaryMessage { + log.Printf("Received from server: %s", string(message)) + } + } + }() + + // Optional: Send a test message after connecting + time.Sleep(2 * time.Second) + testMsg := GatewayModMessageIn{ + MsgID: "test-msg-001", + Server: serverID, + Destination: Destination{ + ChannelID: "123456789", + }, + Author: MinecraftUser{ + ID: "player-uuid-123", + Name: "TestPlayer", + }, + Content: "Hello from simulated mod!", + Ts: time.Now().UTC().Format(time.RFC3339), + } + + if err := conn.WriteJSON(testMsg); err != nil { + log.Printf("Failed to send test message: %v", err) + } else { + log.Println("Sent test message to gateway") + } + + log.Println("Connection established. Responding to pings. Press Ctrl+C to disconnect.") + + // Wait for interrupt or connection close + for { + select { + case <-done: + log.Println("Connection closed") + return + case <-interrupt: + log.Println("Interrupt received, closing connection...") + + // Send close message + err := conn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + ) + if err != nil { + log.Printf("Write close error: %v", err) + return + } + + select { + case <-done: + case <-time.After(time.Second): + } + return + } + } } diff --git a/ws/handlers.go b/ws/handlers.go index 9859295..c387568 100644 --- a/ws/handlers.go +++ b/ws/handlers.go @@ -1 +1,107 @@ package ws + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) { + conn, err := wsg.validateAndUpgradeConnection(w, r) + if err != nil { + return + } + + if wsg.bodySizeBytes > 0 { + conn.SetReadLimit(wsg.bodySizeBytes) + } else { + conn.SetReadLimit(1 << 20) // sensible default 1MiB + } + + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + conn.SetPongHandler(func(appData string) error { + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + typ, data, err := conn.ReadMessage() + if err != nil { + wsg.sendWebsocketError(conn, "Internal Server Error", 500) + wsg.logger.Error("Failed to read handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + wsg.sendWebsocketError(conn, "First message must be a handshake.", 400) + wsg.logger.Warn("Invalid handshake message type.", "remote", conn.RemoteAddr().String()) + return + } + + var handshake Handshake + if err := json.Unmarshal(data, &handshake); err != nil { + wsg.sendWebsocketError(conn, "Malformed handshake.", 400) + wsg.logger.Warn("Malformed handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + meta := connectionMetaData{connectionType: handshake.Type} + + switch handshake.Type { + case "mod": + var mhs ModHandshake + + if err := json.Unmarshal(handshake.Data, &mhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400) + wsg.logger.Warn("Malformed mod handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + meta.id = mhs.ServerID + + if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "mod"}); err != nil { + return + } + + wsg.registerConn(conn, meta) + wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) + + go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps + + case "bot": + var bhs BotHandshake + + if err := json.Unmarshal(handshake.Data, &bhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400) + wsg.logger.Warn("Malformed bot handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + meta.id = bhs.BotID + + if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil { + return + } + + wsg.registerConn(conn, meta) + wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) + + go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps + + default: + wsg.sendWebsocketError(conn, "Unknown handshake.", 400) + wsg.logger.Warn("Unknown connection type.", "remote", conn.RemoteAddr().String(), "type", handshake.Type) + return + } +} + +func (wsg *WebsocketGateway) handleReady(w http.ResponseWriter, r *http.Request) {} + +func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) +} + +func (wsg *WebsocketGateway) handleRegister(w http.ResponseWriter, r *http.Request) {} diff --git a/ws/temp.go b/ws/temp.go new file mode 100644 index 0000000..d40f5c4 --- /dev/null +++ b/ws/temp.go @@ -0,0 +1,70 @@ +package ws + +import ( + "context" + "encoding/json" + "log/slog" + "time" +) + +type LoggingModHandler struct { + logger *slog.Logger +} + +type LoggingBotHandler struct { + logger *slog.Logger +} + +func NewLoggingModHandler(logger *slog.Logger) *LoggingModHandler { + return &LoggingModHandler{logger: logger} +} + +func NewLoggingBotHandler(logger *slog.Logger) *LoggingBotHandler { + return &LoggingBotHandler{logger: logger} +} + +func (h *LoggingModHandler) Handle(ctx context.Context, msg GatewayModMessageIn) error { + // For now, just log and pretend it's being forwarded + // TODO: Look up channel_id from database using server + // TODO: Forward to bot connection(s) + + fwd := GatewayModMessageOut{ + Type: "mod", + ChannelID: "TODO", // will come from database lookup + Author: msg.Author, + Content: msg.Content, + Meta: msg.Meta, + Ts: msg.Ts, + ReceivedAt: msg.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + b, _ := json.Marshal(fwd) + h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content) + h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) + + return nil +} + +func (h *LoggingBotHandler) Handle(ctx context.Context, msg GatewayBotMessageIn) error { + // For now, just log and pretend it's being forwarded + // TODO: Look up server_id from database using channel_id + // TODO: Forward to mod connection(s) + + fwd := GatewayBotMessageOut{ + Type: "bot", + ChannelID: msg.ChannelID, + Author: msg.Author, + Content: msg.Content, + Meta: msg.Meta, + Ts: msg.Ts, + ReceivedAt: msg.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + b, _ := json.Marshal(fwd) + h.logger.Info("received bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "author", msg.Author, "content", msg.Content) + h.logger.Debug("forwarding bot message", "msg_id", msg.MsgID, "channel", msg.ChannelID, "payload", string(b)) + + return nil +} diff --git a/ws/validate.go b/ws/validate.go index f8feff6..6e494b3 100644 --- a/ws/validate.go +++ b/ws/validate.go @@ -12,8 +12,8 @@ func (m *GatewayModMessageIn) Validate() error { if strings.TrimSpace(m.Server) == "" { return errors.New("server missing") } - if strings.TrimSpace(m.User.ID) == "" { - return errors.New("user.id missing") + if strings.TrimSpace(m.Author.ID) == "" { + return errors.New("author.id missing") } if strings.TrimSpace(m.Content) == "" { return errors.New("content missing") @@ -28,7 +28,7 @@ func (m *GatewayBotMessageIn) Validate() error { if strings.TrimSpace(m.ChannelID) == "" { return errors.New("channel_id missing") } - if strings.TrimSpace(m.Author) == "" { + if strings.TrimSpace(m.Author.ID) == "" { return errors.New("author missing") } if strings.TrimSpace(m.Content) == "" { diff --git a/ws/websocket.go b/ws/websocket.go index 8c91a20..4422c4b 100644 --- a/ws/websocket.go +++ b/ws/websocket.go @@ -69,106 +69,6 @@ func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error // -func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) { - conn, err := wsg.validateAndUpgradeConnection(w, r) - if err != nil { - return - } - - if wsg.bodySizeBytes > 0 { - conn.SetReadLimit(wsg.bodySizeBytes) - } else { - conn.SetReadLimit(1 << 20) // sensible default 1MiB - } - - _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - conn.SetPongHandler(func(appData string) error { - _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) - return nil - }) - - typ, data, err := conn.ReadMessage() - if err != nil { - wsg.sendWebsocketError(conn, "Internal Server Error", 500) - wsg.logger.Error("Failed to read handshake.", "remote", conn.RemoteAddr().String(), "err", err) - return - } - - if typ != websocket.TextMessage && typ != websocket.BinaryMessage { - wsg.sendWebsocketError(conn, "First message must be a handshake.", 400) - wsg.logger.Warn("Invalid handshake message type.", "remote", conn.RemoteAddr().String()) - return - } - - var handshake Handshake - if err := json.Unmarshal(data, &handshake); err != nil { - wsg.sendWebsocketError(conn, "Malformed handshake.", 400) - wsg.logger.Warn("Malformed handshake.", "remote", conn.RemoteAddr().String(), "err", err) - return - } - - meta := connectionMetaData{connectionType: handshake.Type} - - switch handshake.Type { - case "mod": - var mhs ModHandshake - - if err := json.Unmarshal(handshake.Data, &mhs); err != nil { - wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400) - wsg.logger.Warn("Malformed mod handshake.", "remote", conn.RemoteAddr().String(), "err", err) - return - } - - meta.id = mhs.ServerID - - if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "mod"}); err != nil { - return - } - - wsg.registerConn(conn, meta) - wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) - - go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps - - case "bot": - var bhs BotHandshake - - if err := json.Unmarshal(handshake.Data, &bhs); err != nil { - wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400) - wsg.logger.Warn("Malformed bot handshake.", "remote", conn.RemoteAddr().String(), "err", err) - return - } - - meta.id = bhs.BotID - - if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil { - return - } - - wsg.registerConn(conn, meta) - wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) - - go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps - - default: - wsg.sendWebsocketError(conn, "Unknown handshake.", 400) - wsg.logger.Warn("Unknown connection type.", "remote", conn.RemoteAddr().String(), "type", handshake.Type) - return - } -} - -func (wsg *WebsocketGateway) handleReady(w http.ResponseWriter, r *http.Request) {} - -func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(200) - _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) -} - -func (wsg *WebsocketGateway) handleRegister(w http.ResponseWriter, r *http.Request) {} - -// - func (wsg *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connectionMetaData) { defer func() { wsg.unregisterConn(c) @@ -179,6 +79,18 @@ func (wsg *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connectionMetaD pingTicker := time.NewTicker(30 * time.Second) defer pingTicker.Stop() + // Send pings in a separate goroutine + go func() { + for range pingTicker.C { + _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { + wsg.logger.Debug("write ping failed", "server_id", meta.id, "err", err) + return + } + wsg.logger.Debug("sent ping to mod", "server_id", meta.id) + } + }() + for { typ, data, err := c.ReadMessage() if err != nil { @@ -216,17 +128,6 @@ func (wsg *WebsocketGateway) modReadLoop(c *websocket.Conn, meta connectionMetaD } _ = writeJSONSafe(c, map[string]string{"status": "ok"}) - - // Handle pings - select { - case <-pingTicker.C: - _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) - if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - wsg.logger.Debug("write ping failed", "server_id", meta.id, "err", err) - return - } - default: - } } } @@ -240,6 +141,18 @@ func (wsg *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connectionMetaD pingTicker := time.NewTicker(30 * time.Second) defer pingTicker.Stop() + // Send pings in a separate goroutine + go func() { + for range pingTicker.C { + _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { + wsg.logger.Debug("write ping failed", "bot_id", meta.id, "err", err) + return + } + wsg.logger.Debug("sent ping to bot", "bot_id", meta.id) + } + }() + for { typ, data, err := c.ReadMessage() if err != nil { @@ -277,16 +190,5 @@ func (wsg *WebsocketGateway) botReadLoop(c *websocket.Conn, meta connectionMetaD } _ = writeJSONSafe(c, map[string]string{"status": "ok"}) - - // Handle pings - select { - case <-pingTicker.C: - _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) - if err := c.WriteMessage(websocket.PingMessage, nil); err != nil { - wsg.logger.Debug("write ping failed", "bot_id", meta.id, "err", err) - return - } - default: - } } }