package ws import ( "context" "encoding/json" "fmt" "homestead/homestead_gateway/util/config" "log/slog" "net" "net/http" "os/signal" "syscall" "time" "github.com/gorilla/websocket" ) func NewWebsocketGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error) *WebsocketGateway { return &WebsocketGateway{ logger: logger, closeFn: closefn, port: cfg.HttpPort, apiKey: cfg.Websocket, upgrader: NewUpgrader(), registry: NewRegistry(cfg.QueueSize), bodySizeBytes: int64(cfg.BodySize) * 1024 * 1024, } } func (wsg *WebsocketGateway) Start() error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() return wsg.Serve(ctx, fmt.Sprintf(":%d", wsg.port)) } func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { mux := http.NewServeMux() mux.HandleFunc("/sync", wsg.handleSync) mux.HandleFunc("/health", wsg.handleHealth) srv := &http.Server{ Addr: listenAddr, Handler: wsg.loggingMiddleware(mux), BaseContext: func(l net.Listener) context.Context { return ctx }, } errCh := make(chan error, 1) go wsg.listen(srv, listenAddr, errCh) select { case <-ctx.Done(): wsg.deafen(srv) return nil case err := <-errCh: return err } } // func (wsg *WebsocketGateway) read(conn *websocket.Conn, _type, channelId string) { defer func() { wsg.unregisterConn(_type, channelId) wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String()) }() ticker := time.NewTicker(30 * time.Second) ctx, cancel := context.WithCancel(context.Background()) defer ticker.Stop() defer cancel() go func() { for { select { case <-ctx.Done(): return case <-ticker.C: wsg.sendWebsocketPing(conn) } } }() for { typ, data, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { wsg.logger.Error("Client unexpectedly closed the connection.", "err", err) } return } if typ != websocket.TextMessage && typ != websocket.BinaryMessage { continue } ts := time.Now().UTC() var message GatewayMessageIn if err := json.Unmarshal(data, &message); err != nil { wsg.sendWebsocketError(conn, "Malformed message.", 400, false) wsg.logger.Warn("Received malformed message json from client.", "remote", conn.RemoteAddr().String(), "err", err) continue } message.Type = _type message.ReceivedAt = ts if err := message.Validate(); err != nil { wsg.sendWebsocketError(conn, "Malformed message.", 400, false) wsg.logger.Warn("Received malformed message json from client; validation failed.", "remote", conn.RemoteAddr().String(), "err", err) continue } var outID string if _type == "mod" { outID = channelId } else { outID = message.ID } out := GatewayMessageOut{ Type: message.Type, ID: outID, Author: message.Author, Content: message.Content, Meta: message.Meta, Ts: message.Ts, ReceivedAt: message.ReceivedAt, ForwardedAt: time.Now().UTC(), } delivered, queued, err := wsg.registry.Send(out.ID, out, func(c *websocket.Conn, m GatewayMessageOut) error { _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) return c.WriteJSON(m) }) if err != nil { wsg.logger.Error("registry send error", "err", err) _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) continue } if delivered { _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "completed", Type: message.Type}) continue } if queued { _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "queued", Type: message.Type}) continue } _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) } }