package ws import ( "context" "encoding/json" "fmt" "homestead/homestead_gateway/util/config" "log/slog" "net" "net/http" "os/signal" "syscall" "time" "github.com/gorilla/websocket" ) func NewWebsocketGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error) *WebsocketGateway { return &WebsocketGateway{ logger: logger, closeFn: closefn, port: cfg.HttpPort, apiKey: cfg.Websocket, registry: NewRegistry(32), upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { return true // local by default; change for production }, }, bodySizeBytes: int64(cfg.BodySize) * 1024 * 1024, } } func (wsg *WebsocketGateway) Start() error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() return wsg.Serve(ctx, fmt.Sprintf(":%d", wsg.port)) } func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { mux := http.NewServeMux() mux.HandleFunc("/sync", wsg.handleSync) mux.HandleFunc("/health", wsg.handleHealth) srv := &http.Server{ Addr: listenAddr, Handler: loggingMiddleware(wsg.logger, mux), BaseContext: func(l net.Listener) context.Context { return ctx }, } errCh := make(chan error, 1) go wsg.listen(srv, listenAddr, errCh) select { case <-ctx.Done(): wsg.deafen(srv) return nil case err := <-errCh: return err } } // //func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { // defer func() { // wsg.unregisterConn(conn, meta, "mod") // wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) // }() // // ticker := time.NewTicker(30 * time.Second) // defer ticker.Stop() // // go func() { // for range ticker.C { // wsg.sendWebsocketPing(conn) // } // }() // // for { // typ, data, err := conn.ReadMessage() // // if err != nil { // if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { // wsg.logger.Warn("Mod-Client unexpectedly closed the connection.", "err", err) // } // return // } // // if typ != websocket.TextMessage && typ != websocket.BinaryMessage { // continue // } // // var msg GatewayModMessageIn // if err := json.Unmarshal(data, &msg); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) // wsg.logger.Warn("invalid json from mod", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) // continue // } // // msg.ReceivedAt = time.Now().UTC() // if err := msg.Validate(); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) // wsg.logger.Warn("mod message validation failed", "server_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) // continue // } // // // Handle the message (forward to bot, enrich, etc.) // if err := wsg.modHandler.Handle(conn, msg); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) // wsg.logger.Error("mod handler error", "server_id", meta.ID, "err", err) // continue // } // // _ = writeJSONSafe(conn, map[string]string{"status": "completed"}) // or "queued" // } //} // //func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { // defer func() { // wsg.unregisterConn(conn, meta, "bot") // wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String()) // }() // // pingTicker := time.NewTicker(30 * time.Second) // defer pingTicker.Stop() // // // Send pings in a separate goroutine // go func() { // for range pingTicker.C { // _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) // if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { // wsg.logger.Debug("write ping failed", "bot_id", meta.ID, "err", err) // return // } // wsg.logger.Debug("sent ping to bot", "bot_id", meta.ID) // } // }() // // for { // typ, data, err := conn.ReadMessage() // if err != nil { // if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { // wsg.logger.Warn("unexpected bot close", "bot_id", meta.ID, "err", err) // } else { // wsg.logger.Debug("bot read error", "bot_id", meta.ID, "err", err) // } // return // } // // if typ != websocket.TextMessage && typ != websocket.BinaryMessage { // continue // } // // var msg GatewayBotMessageIn // if err := json.Unmarshal(data, &msg); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": "invalid json: " + err.Error()}) // wsg.logger.Warn("invalid json from bot", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) // continue // } // // msg.ReceivedAt = time.Now().UTC() // if err := msg.Validate(); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": err.Error()}) // wsg.logger.Warn("bot message validation failed", "bot_id", meta.ID, "remote", conn.RemoteAddr().String(), "err", err) // continue // } // // // Handle the message (forward to mod, enrich, etc.) // if err := wsg.botHandler.Handle(conn, msg); err != nil { // _ = writeJSONSafe(conn, map[string]string{"error": "handler error: " + err.Error()}) // wsg.logger.Error("bot handler error", "bot_id", meta.ID, "err", err) // continue // } // // _ = writeJSONSafe(conn, map[string]string{"status": "ok"}) // } //} func (wsg *WebsocketGateway) read(conn *websocket.Conn, _type, channelId string) { defer func() { wsg.unregisterConn(conn, _type, channelId) wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String()) }() ticker := time.NewTicker(30 * time.Second) ctx, cancel := context.WithCancel(context.Background()) defer ticker.Stop() defer cancel() go func() { for { select { case <-ctx.Done(): return case <-ticker.C: wsg.sendWebsocketPing(conn) } } }() for { typ, data, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { wsg.logger.Error("Client unexpectedly closed the connection.", "err", err) } return } if typ != websocket.TextMessage && typ != websocket.BinaryMessage { continue } ts := time.Now().UTC() var message GatewayMessageIn if err := json.Unmarshal(data, &message); err != nil { wsg.sendWebsocketError(conn, "Malformed message.", 400, false) wsg.logger.Warn("Received malformed message json from client.", "remote", conn.RemoteAddr().String(), "err", err) continue } message.Type = _type message.ReceivedAt = ts if err := message.Validate(); err != nil { wsg.sendWebsocketError(conn, "Malformed message.", 400, false) wsg.logger.Warn("Received malformed message json from client; validation failed.", "remote", conn.RemoteAddr().String(), "err", err) continue } var outID string if _type == "mod" { outID = channelId } else { outID = message.ID } out := GatewayMessageOut{ Type: message.Type, ID: outID, Author: message.Author, Content: message.Content, Meta: message.Meta, Ts: message.Ts, ReceivedAt: message.ReceivedAt, ForwardedAt: time.Now().UTC(), } delivered, queued, err := wsg.registry.Send(out.ID, out, func(c *websocket.Conn, m GatewayMessageOut) error { _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) return c.WriteJSON(m) }) if err != nil { wsg.logger.Error("registry send error", "err", err) _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) continue } if delivered { _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "completed", Type: message.Type}) continue } if queued { _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "queued", Type: message.Type}) continue } _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) } }