diff --git a/.gitignore b/.gitignore index 5b90e79..7c68b5e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,27 +1,18 @@ -# ---> Go -# If you prefer the allow list template instead of the deny list, see community template: -# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore -# -# Binaries for programs and plugins +* +!*/ +!*.* + *.exe *.exe~ *.dll *.so *.dylib - -# Test binary, built with `go test -c` *.test - -# Output of the go coverage tool, specifically when used with LiteIDE *.out - -# Dependency directories (remove the comment below to include it) -# vendor/ - -# Go workspace file go.work go.work.sum - -# env file .env +.idea +/.dea/ +/logs/ diff --git a/README.md b/README.md index e4f35b4..f03f694 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,15 @@ # HomesteadGateway -Gateway between multiple HomesteadRelay's and the HomesteadToGo Bot. \ No newline at end of file +Gateway between multiple HomesteadRelay's and the HomesteadToGo Bot. + + +## dev notes + +perhaps drop database, instead + +``` +Mod -> websocket /register { server_id, channel_id } // grabbed from mod config +Bot -> websocket /ready { channel_id } // ready if Mod with fitting channel_id has called /register +Gateway -> memory cache (server_id -> channel_id; channel_id -> server_id) // mem enough +Mod/Bot -> websocket /ws { ... } -> Bot/Mod // sync +``` diff --git a/bot.go b/bot.go new file mode 100644 index 0000000..8dd5130 --- /dev/null +++ b/bot.go @@ -0,0 +1,253 @@ +//go:build simbot + +package main + +import ( + "encoding/json" + "flag" + "fmt" + "log" + "math/rand" + "net/url" + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/gorilla/websocket" +) + +const ( + gatewayURL = "ws://localhost:3333/sync" + apiKey = "gateway" + channelID = "123456789" +) + +var ( + minInterval = 500 * time.Millisecond + maxInterval = 5 * time.Second +) + +type Handshake struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +type BotHandshake struct { + ChannelId string `json:"channel_id"` +} + +type GatewayAck struct { + Status string `json:"status"` + Type string `json:"type"` +} + +type User struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type Destination struct { + ID string `json:"channel_id,omitempty"` +} + +type GatewayMessageIn struct { + ID string `json:"id"` + MsgID string `json:"msg_id"` + Destination Destination `json:"destination,omitempty"` + Author User `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts time.Time `json:"ts,omitempty"` + ReceivedAt time.Time `json:"-"` +} + +func randomDuration(min, max time.Duration) time.Duration { + if max <= min { + return min + } + diff := int64(max - min) + n := rand.Int63n(diff) + return min + time.Duration(n) +} + +func main() { + rand.Seed(time.Now().UnixNano()) + + botID := flag.String("bot", "sim-bot-1", "bot id") + sendAfter := flag.Duration("send-after", 0, "optional: send a bot->mod message after this delay (e.g. 2s)") + sendMsg := flag.String("msg", "Hello from bot!", "optional bot->mod test message content") + flag.Parse() + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) + + u, err := url.Parse(gatewayURL) + if err != nil { + log.Fatalf("Failed to parse URL: %v", err) + } + q := u.Query() + q.Set("api_key", apiKey) + u.RawQuery = q.Encode() + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + var writeMu sync.Mutex + + conn.SetPingHandler(func(appData string) error { + writeMu.Lock() + err := conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(5*time.Second)) + writeMu.Unlock() + if err != nil { + log.Printf("Failed to send pong: %v", err) + return err + } + return nil + }) + + bhs := BotHandshake{ChannelId: channelID} + data, err := json.Marshal(bhs) + if err != nil { + _ = conn.Close() + log.Fatalf("Failed to marshal bot handshake: %v", err) + } + hs := Handshake{Type: "bot", Data: data} + + writeMu.Lock() + if err := conn.WriteJSON(hs); err != nil { + writeMu.Unlock() + _ = conn.Close() + log.Fatalf("Failed to send handshake: %v", err) + } + writeMu.Unlock() + + conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + msgType, raw, err := conn.ReadMessage() + if err != nil { + _ = conn.Close() + log.Fatalf("Failed to read handshake response: %v", err) + } + _ = conn.SetReadDeadline(time.Time{}) + + if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { + log.Printf("Raw handshake reply: %s", string(raw)) + var ack GatewayAck + if err := json.Unmarshal(raw, &ack); err != nil { + log.Printf("Handshake reply is not JSON or unmarshal failed: %v", err) + } else { + log.Printf("Parsed ack: status=%q, type=%q", ack.Status, ack.Type) + } + } else { + log.Printf("Handshake reply type=%d", msgType) + } + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + msgType, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.Printf("WebSocket error: %v", err) + } else { + log.Printf("Connection closed or read error: %v", err) + } + return + } + if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { + log.Printf("Received from gateway: %s", string(message)) + } + } + }() + + var msgCounter uint64 = 1 + + if *sendAfter > 0 { + go func() { + time.Sleep(*sendAfter) + msg := GatewayMessageIn{ + MsgID: fmt.Sprintf("bot-msg-%06d", atomic.AddUint64(&msgCounter, 1)), + ID: channelID, + Author: User{ + ID: *botID, + Name: "SimBot", + }, + Content: *sendMsg, + Ts: time.Now().UTC(), + } + writeMu.Lock() + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := conn.WriteJSON(msg); err != nil { + log.Printf("Failed to send bot->mod test message: %v", err) + } else { + log.Printf("Sent bot->mod test message (channel=%s)", channelID) + } + _ = conn.SetWriteDeadline(time.Time{}) + writeMu.Unlock() + }() + } + + go func() { + for { + select { + case <-done: + return + default: + } + d := randomDuration(minInterval, maxInterval) + select { + case <-done: + return + case <-time.After(d): + msgNum := atomic.AddUint64(&msgCounter, 1) + msg := GatewayMessageIn{ + MsgID: fmt.Sprintf("sim-bot-msg-%06d", msgNum), + ID: channelID, + Author: User{ + ID: fmt.Sprintf("%s-%d", *botID, msgNum%1000), + Name: fmt.Sprintf("SimBot%d", msgNum%1000), + }, + Content: fmt.Sprintf("Automated bot message #%d (delay %s)", msgNum, d), + Ts: time.Now().UTC(), + } + writeMu.Lock() + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + if err := conn.WriteJSON(msg); err != nil { + writeMu.Unlock() + log.Printf("Failed to send automated bot message: %v", err) + return + } + _ = conn.SetWriteDeadline(time.Time{}) + writeMu.Unlock() + log.Printf("Sent automated bot message %s", msg.MsgID) + } + } + }() + + for { + select { + case <-done: + log.Println("Connection closed by server") + _ = conn.Close() + return + case <-interrupt: + log.Println("Interrupt received, closing connection...") + writeMu.Lock() + _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "")) + writeMu.Unlock() + select { + case <-done: + case <-time.After(time.Second): + } + _ = conn.Close() + return + } + } +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..8850d7b --- /dev/null +++ b/config.toml @@ -0,0 +1,10 @@ +[log] +level = "debug" +directory = "logs/" +rotation = 3 # in days + +[gateway] +http_port = 3333 +websocket = "gateway" +body_size = 2 # in MB +queue_max = 32 diff --git a/controller/controller.go b/controller/controller.go new file mode 100644 index 0000000..2584fe1 --- /dev/null +++ b/controller/controller.go @@ -0,0 +1,23 @@ +package controller + +import ( + "homestead/homestead_gateway/util/config" + "homestead/homestead_gateway/util/logger" + "homestead/homestead_gateway/ws" +) + +func NewGatewayController(cfg config.Config) GatewayController { + wsl, wCloseFn, err := logger.New("Websocket", cfg.Log) + if err != nil { + panic(err) + } + + return GatewayController{ + Websocket: ws.NewWebsocketGateway(cfg.Gateway, wsl, wCloseFn), + HttpServer: HttpGateway{}, + } +} + +func (gc *GatewayController) Run() error { + return gc.Websocket.Start() +} diff --git a/controller/structs.go b/controller/structs.go new file mode 100644 index 0000000..e56093b --- /dev/null +++ b/controller/structs.go @@ -0,0 +1,12 @@ +package controller + +import ( + "homestead/homestead_gateway/ws" +) + +type GatewayController struct { + Websocket *ws.WebsocketGateway + HttpServer HttpGateway +} + +type HttpGateway struct{} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..086d25e --- /dev/null +++ b/go.mod @@ -0,0 +1,15 @@ +module homestead/homestead_gateway + +go 1.25.4 + +require ( + github.com/pelletier/go-toml/v2 v2.2.4 + github.com/samber/slog-multi v1.6.0 +) + +require ( + github.com/gorilla/websocket v1.5.3 // indirect + github.com/samber/lo v1.52.0 // indirect + github.com/samber/slog-common v0.19.0 // indirect + golang.org/x/text v0.22.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..41997cc --- /dev/null +++ b/go.sum @@ -0,0 +1,12 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= +github.com/samber/lo v1.52.0 h1:Rvi+3BFHES3A8meP33VPAxiBZX/Aws5RxrschYGjomw= +github.com/samber/lo v1.52.0/go.mod h1:4+MXEGsJzbKGaUEQFKBq2xtfuznW9oz/WrgyzMzRoM0= +github.com/samber/slog-common v0.19.0 h1:fNcZb8B2uOLooeYwFpAlKjkQTUafdjfqKcwcC89G9YI= +github.com/samber/slog-common v0.19.0/go.mod h1:dTz+YOU76aH007YUU0DffsXNsGFQRQllPQh9XyNoA3M= +github.com/samber/slog-multi v1.6.0 h1:i1uBY+aaln6ljwdf7Nrt4Sys8Kk6htuYuXDHWJsHtZg= +github.com/samber/slog-multi v1.6.0/go.mod h1:qTqzmKdPpT0h4PFsTN5rYRgLwom1v+fNGuIrl1Xnnts= +golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= +golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= diff --git a/main.go b/main.go new file mode 100644 index 0000000..fa5670f --- /dev/null +++ b/main.go @@ -0,0 +1,21 @@ +package main + +import ( + "flag" + "homestead/homestead_gateway/controller" + "homestead/homestead_gateway/util/config" +) + +func main() { + cfgPath := flag.String("config", "config.toml", "configuration file") + cfg, err := config.LoadConfig(*cfgPath) + if err != nil { + panic(err) + } + + ctrl := controller.NewGatewayController(*cfg) + err = ctrl.Run() + if err != nil { + panic(err) + } +} diff --git a/sim.go b/sim.go new file mode 100644 index 0000000..ccc4aeb --- /dev/null +++ b/sim.go @@ -0,0 +1,257 @@ +//go:build sim + +package main + +import ( + "encoding/json" + "fmt" + "log" + "math/rand" + "net/url" + "os" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/gorilla/websocket" +) + +const ( + gatewayURL = "ws://localhost:3333/sync" + apiKey = "gateway" + serverID = "test-server-001" + // THE CHANNEL ID the mod says it serves. Must match gateway expectation. + channelID = "123456789" +) + +// send interval range (random between minInterval and maxInterval) +var ( + minInterval = 500 * time.Millisecond + maxInterval = 5 * time.Second +) + +type Handshake struct { + Type string `json:"type"` + Data json.RawMessage `json:"data"` +} + +// ModHandshake now includes ChannelID +type ModHandshake struct { + ServerID string `json:"server_id"` + ChannelID string `json:"channel_id"` +} + +type GatewayAck struct { + Status string `json:"status"` + Type string `json:"type"` +} + +type User struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type Destination struct { + ID string `json:"channel_id,omitempty"` +} + +type GatewayMessageIn struct { + ID string `json:"id"` // where am I from (channel_id or server_id) + MsgID string `json:"msg_id"` // msg id + Destination Destination `json:"destination,omitempty"` // where do I wanna go (channel_id or empty if from Bot) + Author User `json:"author"` // who sent the message + Content string `json:"content"` // message content + Meta map[string]interface{} `json:"meta,omitempty"` // additional metadata + Ts time.Time `json:"ts,omitempty"` // timestamp + ReceivedAt time.Time `json:"-"` // ReceivedAt is populated by gateway (not from mod) +} + +func main() { + rand.Seed(time.Now().UnixNano()) + + interrupt := make(chan os.Signal, 1) + signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) + + u, err := url.Parse(gatewayURL) + if err != nil { + log.Fatalf("Failed to parse URL: %v", err) + } + q := u.Query() + q.Set("api_key", apiKey) + u.RawQuery = q.Encode() + + log.Printf("Connecting to %s", u.String()) + + conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil) + if err != nil { + log.Fatalf("Failed to connect: %v", err) + } + defer conn.Close() + + log.Println("Connected to gateway") + + var writeMu sync.Mutex + + conn.SetPingHandler(func(appData string) error { + log.Println("Received ping from server, sending pong") + writeMu.Lock() + defer writeMu.Unlock() + if err := conn.WriteControl(websocket.PongMessage, []byte(appData), time.Now().Add(5*time.Second)); err != nil { + log.Printf("Failed to send pong: %v", err) + return err + } + return nil + }) + + modHS := ModHandshake{ + ServerID: serverID, + ChannelID: channelID, + } + modHSData, err := json.Marshal(modHS) + if err != nil { + log.Fatalf("Failed to marshal mod handshake: %v", err) + } + + handshake := Handshake{ + Type: "mod", + Data: modHSData, + } + + writeMu.Lock() + if err := conn.WriteJSON(handshake); err != nil { + writeMu.Unlock() + log.Fatalf("Failed to send handshake: %v", err) + } + writeMu.Unlock() + log.Println("Handshake sent") + + var ack GatewayAck + if err := conn.ReadJSON(&ack); err != nil { + log.Fatalf("Failed to read acknowledgment: %v", err) + } + log.Printf("Received acknowledgment: status=%q, type=%q", ack.Status, ack.Type) + + done := make(chan struct{}) + + go func() { + defer close(done) + for { + messageType, message, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + log.Printf("WebSocket error: %v", err) + } else { + log.Printf("Connection closed/read error: %v", err) + } + return + } + switch messageType { + case websocket.TextMessage, websocket.BinaryMessage: + log.Printf("Received from server: %s", string(message)) + default: + } + } + }() + + var msgCounter uint64 = 1 + + func() { + testMsg := GatewayMessageIn{ + MsgID: fmt.Sprintf("test-msg-%06d", atomic.AddUint64(&msgCounter, 1)), + ID: serverID, + Destination: Destination{ + ID: channelID, + }, + Author: User{ + ID: "player-uuid-123", + Name: "TestPlayer", + }, + Content: "Hello from simulated mod!", + Ts: time.Now().UTC(), + } + + writeMu.Lock() + if err := conn.WriteJSON(testMsg); err != nil { + log.Printf("Failed to send test message: %v", err) + } else { + log.Println("Sent initial test message to gateway") + } + writeMu.Unlock() + }() + + go func() { + for { + d := randomDuration(minInterval, maxInterval) + + select { + case <-done: + return + case <-time.After(d): + // build message + msgNum := atomic.AddUint64(&msgCounter, 1) + msg := GatewayMessageIn{ + MsgID: fmt.Sprintf("sim-msg-%06d", msgNum), + ID: serverID, + Destination: Destination{ + ID: channelID, + }, + Author: User{ + ID: fmt.Sprintf("sim-user-%d", msgNum%1000), + Name: fmt.Sprintf("SimUser%d", msgNum%1000), + }, + Content: fmt.Sprintf("Random interval message #%d (delay %s)", msgNum, d), + Ts: time.Now().UTC(), + } + + writeMu.Lock() + if err := conn.WriteJSON(msg); err != nil { + log.Printf("Failed to send simulated message: %v", err) + writeMu.Unlock() + return + } + writeMu.Unlock() + + log.Printf("Sent simulated message %s (next wait up to %s)", msg.MsgID, maxInterval) + } + } + }() + + log.Println("Connection established. Sending simulated messages at random intervals. Press Ctrl+C to disconnect.") + + for { + select { + case <-done: + log.Println("Connection read loop closed, exiting") + return + case <-interrupt: + log.Println("Interrupt received, closing connection...") + + writeMu.Lock() + err := conn.WriteMessage( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + ) + writeMu.Unlock() + if err != nil { + log.Printf("Write close error: %v", err) + } + + select { + case <-done: + case <-time.After(1 * time.Second): + } + return + } + } +} + +func randomDuration(min, max time.Duration) time.Duration { + if max <= min { + return min + } + diff := int64(max - min) + n := rand.Int63n(diff) + return min + time.Duration(n) +} diff --git a/util/config/structs.go b/util/config/structs.go new file mode 100644 index 0000000..5437791 --- /dev/null +++ b/util/config/structs.go @@ -0,0 +1,21 @@ +package config + +import "log/slog" + +type Config struct { + Log LogConfig `toml:"log"` + Gateway GatewayConfig `toml:"gateway"` +} + +type GatewayConfig struct { + HttpPort int `toml:"http_port"` + Websocket string `toml:"websocket"` + BodySize int `toml:"body_size"` + QueueSize int `toml:"queue_max"` +} + +type LogConfig struct { + Level slog.Level `toml:"level"` + Directory string `toml:"directory"` + Rotation int `toml:"rotation"` +} diff --git a/util/config/toml.go b/util/config/toml.go new file mode 100644 index 0000000..a3351bf --- /dev/null +++ b/util/config/toml.go @@ -0,0 +1,21 @@ +package config + +import ( + "fmt" + "os" + + "github.com/pelletier/go-toml/v2" +) + +func LoadConfig(path string) (*Config, error) { + file, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("failed to open config: %w", err) + } + + var cfg Config + if err = toml.NewDecoder(file).Decode(&cfg); err != nil { + return nil, err + } + return &cfg, nil +} diff --git a/util/logger/log.go b/util/logger/log.go new file mode 100644 index 0000000..75e5397 --- /dev/null +++ b/util/logger/log.go @@ -0,0 +1,191 @@ +package logger + +import ( + "context" + "fmt" + "homestead/homestead_gateway/util/config" + "log/slog" + "os" + "path/filepath" + "time" + + slogmulti "github.com/samber/slog-multi" +) + +// New creates a logger identified by id. +// Returns: *slog.Logger, closeFunc, error. +// Call closeFunc() on shutdown to close open files. +func New(id string, cfg config.LogConfig) (*slog.Logger, func() error, error) { + if cfg.Directory == "" { + cfg.Directory = "logs" + } + if cfg.Rotation <= 0 { + cfg.Rotation = 7 + } + + console := slog.NewTextHandler(&prefixWriter{inner: os.Stderr, prefix: []byte("[" + id + "] "), startLine: true}, &slog.HandlerOptions{AddSource: true, Level: cfg.Level}) + router := newFileRouter(cfg.Directory, cfg.Rotation, id) + root := slogmulti.Fanout(console, router) + + return slog.New(root), router.CloseFiles, nil +} + +func (p *prefixWriter) Write(b []byte) (int, error) { + p.mu.Lock() + defer p.mu.Unlock() + + totalWritten := 0 + if p.startLine { + n, err := p.inner.Write(p.prefix) + totalWritten += n + if err != nil { + return totalWritten, err + } + p.startLine = false + } + + n, err := p.inner.Write(b) + totalWritten += n + if err != nil { + return totalWritten, err + } + + if len(b) > 0 && b[len(b)-1] == '\n' { + p.startLine = true + } + return totalWritten, nil +} + +func newFileRouter(baseDir string, rotationDays int, id string) *fileRouter { + return &fileRouter{ + handlers: make(map[string]slog.Handler), + files: make(map[string]*os.File), + baseDir: baseDir, + rotationDays: rotationDays, + id: id, + dirTimeLayout: "2006-01-02", // e.g. 2025-11-30 + } +} + +//goland:noinspection GoUnusedParameter +func (r *fileRouter) Enabled(ctx context.Context, lvl slog.Level) bool { + // Conservatively true; actual handler will decide. + return true +} + +func (r *fileRouter) Handle(ctx context.Context, rec slog.Record) error { + now := time.Now() + dirName := now.Format(r.dirTimeLayout) + dirPath := filepath.Join(r.baseDir, dirName) + filePath := filepath.Join(dirPath, r.id+".log") + + h, err := r.getHandler(dirPath, filePath) + if err != nil { + return fmt.Errorf("file router get handler: %w", err) + } + + return h.Handle(ctx, rec) +} + +func (r *fileRouter) WithAttrs(attrs []slog.Attr) slog.Handler { + return r +} + +func (r *fileRouter) WithGroup(name string) slog.Handler { + return r +} + +// getHandler returns a text handler for the given file, creating dir/file and cleaning up old dirs if needed. +func (r *fileRouter) getHandler(dirPath, filePath string) (slog.Handler, error) { + r.mu.RLock() + h, ok := r.handlers[filePath] + r.mu.RUnlock() + if ok { + return h, nil + } + + r.mu.Lock() + defer r.mu.Unlock() + if h, ok = r.handlers[filePath]; ok { + return h, nil + } + + _, statErr := os.Stat(dirPath) + dirExisted := statErr == nil + + if err := os.MkdirAll(dirPath, 0o755); err != nil { + return nil, fmt.Errorf("mkdir %s: %w", dirPath, err) + } + + if !dirExisted { + if err := r.cleanupOldDirs(); err != nil { + // don't fail logging just because cleanup failed; report to stderr and continue. + _, _ = fmt.Fprintf(os.Stderr, "logger: cleanupOldDirs error: %v\n", err) + } + } + + f, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0o644) + if err != nil { + return nil, fmt.Errorf("open log file %s: %w", filePath, err) + } + + textHandler := slog.NewTextHandler(f, &slog.HandlerOptions{ + AddSource: false, + }) + + r.files[filePath] = f + r.handlers[filePath] = textHandler + return textHandler, nil +} + +// cleanupOldDirs scans r.baseDir for directories matching the dirTimeLayout and deletes any whose +// day-start time is older than rotationDays from now. +func (r *fileRouter) cleanupOldDirs() error { + entries, err := os.ReadDir(r.baseDir) + if err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("read baseDir: %w", err) + } + + now := time.Now() + cutoff := now.AddDate(0, 0, -r.rotationDays) + + for _, e := range entries { + if !e.IsDir() { + continue + } + name := e.Name() + t, err := time.ParseInLocation(r.dirTimeLayout, name, time.Local) + if err != nil { + continue + } + + if t.Before(cutoff) { + path := filepath.Join(r.baseDir, name) + + if err := os.RemoveAll(path); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "logger: failed to remove old log dir %s: %v\n", path, err) + } + } + } + + return nil +} + +// CloseFiles closes open files +func (r *fileRouter) CloseFiles() error { + r.mu.Lock() + defer r.mu.Unlock() + + var firstErr error + for p, f := range r.files { + if err := f.Close(); err != nil && firstErr == nil { + firstErr = err + } + delete(r.files, p) + delete(r.handlers, p) + } + return firstErr +} diff --git a/util/logger/structs.go b/util/logger/structs.go new file mode 100644 index 0000000..b9b8f2f --- /dev/null +++ b/util/logger/structs.go @@ -0,0 +1,27 @@ +package logger + +import ( + "io" + "log/slog" + "os" + "sync" +) + +type fileRouter struct { + mu sync.RWMutex + handlers map[string]slog.Handler // map[filePath]handler + files map[string]*os.File // map[filePath]*os.File to close later + baseDir string + rotationDays int + id string + dirTimeLayout string // "2006-01-02" - daily dirs +} + +// prefixWriter - writes a prefix at the start of each new line. +// It is safe for concurrent use. +type prefixWriter struct { + inner io.Writer + prefix []byte + mu sync.Mutex + startLine bool +} diff --git a/ws/handlers.go b/ws/handlers.go new file mode 100644 index 0000000..512bf9e --- /dev/null +++ b/ws/handlers.go @@ -0,0 +1,108 @@ +package ws + +import ( + "encoding/json" + "net/http" + "time" + + "github.com/gorilla/websocket" +) + +func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) { + conn, err := wsg.validateAndUpgradeConnection(w, r) + if err != nil { + return + } + + if wsg.bodySizeBytes > 0 { + conn.SetReadLimit(wsg.bodySizeBytes) + } else { + conn.SetReadLimit(1 << 20) // sensible default 1MiB + } + + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + conn.SetPongHandler(func(appData string) error { + _ = conn.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + + typ, data, err := conn.ReadMessage() + if err != nil { + wsg.sendWebsocketError(conn, "Internal Server Error", 500, true) + wsg.logger.Error("Failed to read handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + wsg.sendWebsocketError(conn, "Initial message must be a handshake.", 400, true) + wsg.logger.Warn("Invalid handshake message type.", "remote", conn.RemoteAddr().String()) + return + } + + var handshake Handshake + if err := json.Unmarshal(data, &handshake); err != nil { + wsg.sendWebsocketError(conn, "Malformed handshake.", 400, true) + wsg.logger.Warn("Malformed handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + switch handshake.Type { + case "mod": + var mhs ModHandshake + if err := json.Unmarshal(handshake.Data, &mhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400, true) + wsg.logger.Warn("Malformed mod handshake.", "remote", conn.RemoteAddr().String(), "err", err) + return + } + + if mhs.ServerID == "" || mhs.ChannelID == "" { + wsg.sendWebsocketError(conn, "Malformed mod handshake.", 400, true) + return + } + + if !wsg.registerConn(conn, "mod", mhs.ChannelID, mhs.ServerID) { + wsg.sendWebsocketError(conn, "Failed to register mod.", 500, true) + return + } + + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "mod"}) + wsg.registry.FlushChannelWithSender(mhs.ChannelID, wsg.flush) + + wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String()) + go wsg.read(conn, "mod", mhs.ChannelID) + + case "bot": + var bhs BotHandshake + if err := json.Unmarshal(handshake.Data, &bhs); err != nil { + wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400, true) + return + } + + if bhs.ChannelId == "" { + wsg.sendWebsocketError(conn, "Malformed bot handshake.", 400, true) + return + } + + if !wsg.registerConn(conn, "bot", bhs.ChannelId, "") { + wsg.sendWebsocketError(conn, "Bot already connected.", 409, true) + return + } + + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}) + wsg.registry.FlushAllToBotWithSender(wsg.flush) + + wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String()) + go wsg.read(conn, "bot", bhs.ChannelId) + + default: + wsg.sendWebsocketError(conn, "Unknown handshake.", 400, true) + wsg.logger.Warn("Unknown connection type.", "remote", conn.RemoteAddr().String(), "type", handshake.Type) + return + } +} + +func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(200) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) +} diff --git a/ws/registry.go b/ws/registry.go new file mode 100644 index 0000000..be8d6a5 --- /dev/null +++ b/ws/registry.go @@ -0,0 +1,254 @@ +package ws + +import ( + "fmt" + "time" + + "github.com/gorilla/websocket" +) + +func (q *BoundedQueue) Enqueue(m GatewayMessageOut) bool { + q.mu.Lock() + defer q.mu.Unlock() + if q.capacity == 0 { + return false + } + + if q.length < q.capacity { + q.buf[(q.start+q.length)%q.capacity] = m + q.length++ + return true + } + + // overwrite oldest + q.buf[q.start] = m + q.start = (q.start + 1) % q.capacity + + return true +} + +func (q *BoundedQueue) PopAll() []GatewayMessageOut { + q.mu.Lock() + defer q.mu.Unlock() + if q.length == 0 { + return nil + } + + out := make([]GatewayMessageOut, 0, q.length) + for i := 0; i < q.length; i++ { + out = append(out, q.buf[(q.start+i)%q.capacity]) + } + + q.start = 0 + q.length = 0 + + return out +} + +func (q *BoundedQueue) Len() int { + q.mu.Lock() + defer q.mu.Unlock() + return q.length +} + +// + +func (r *Registry) getOrCreate(channel string) *ChannelEntry { + r.mu.RLock() + e := r.entries[channel] + r.mu.RUnlock() + if e != nil { + return e + } + + r.mu.Lock() + defer r.mu.Unlock() + if e = r.entries[channel]; e == nil { + e = newChannelEntry(channel, r.queueCap) + r.entries[channel] = e + } + return e +} + +func (r *Registry) ForEach(cb func(channelID string)) { + r.mu.RLock() + ids := make([]string, 0, len(r.entries)) + for id := range r.entries { + ids = append(ids, id) + } + r.mu.RUnlock() + + for _, id := range ids { + cb(id) + } +} + +// + +// RegisterMod : map channel_id -> mod conn (serverID) +func (r *Registry) RegisterMod(channelID, serverID string, conn *websocket.Conn) { + e := r.getOrCreate(channelID) + e.mu.Lock() + defer e.mu.Unlock() + + if e.Mod != nil && e.Mod.Conn != nil { + + _ = e.Mod.Conn.Close() + } + + e.Mod = &ConnWrapper{Conn: conn, ServerID: serverID, LastSeen: time.Now()} + // flush queued bot->mod messages for this channel + // caller should use FlushChannelWithSender to perform actual sends +} + +// RegisterBot : single connection for bot. after registration call FlushAllToBotWithSender +func (r *Registry) RegisterBot(conn *websocket.Conn) { + r.botMu.Lock() + + if r.bot != nil && r.bot.Conn != nil { + _ = r.bot.Conn.Close() + } + + r.bot = &ConnWrapper{Conn: conn, LastSeen: time.Now()} + r.botMu.Unlock() +} + +func (r *Registry) UnregisterMod(channelID string) { + r.mu.RLock() + e := r.entries[channelID] + r.mu.RUnlock() + if e == nil { + return + } + + e.mu.Lock() + modConn := e.Mod + e.Mod = nil + e.mu.Unlock() + + if modConn != nil && modConn.Conn != nil { + closeConn(modConn.Conn) + } +} + +func (r *Registry) UnregisterBot() { + r.botMu.Lock() + botConn := r.bot + r.bot = nil + r.botMu.Unlock() + + if botConn != nil && botConn.Conn != nil { + closeConn(botConn.Conn) + } +} + +func (r *Registry) Send(channelID string, out GatewayMessageOut, sendOverConn func(*websocket.Conn, GatewayMessageOut) error) (delivered bool, queued bool, err error) { + if out.Type == "mod" { + r.botMu.Lock() + b := r.bot + r.botMu.Unlock() + + if b != nil && b.Conn != nil { + if err := sendOverConn(b.Conn, out); err == nil { + return true, false, nil + } + _ = b.Conn.Close() + r.UnregisterBot() + } + + e := r.getOrCreate(channelID) + e.mu.Lock() + enq := e.Queue.Enqueue(out) + e.mu.Unlock() + + if !enq { + return false, false, fmt.Errorf("queue disabled") + } + return false, true, nil + } + + e := r.getOrCreate(channelID) + e.mu.Lock() + mod := e.Mod + e.mu.Unlock() + + if mod != nil && mod.Conn != nil { + if err := sendOverConn(mod.Conn, out); err == nil { + return true, false, nil + } + _ = mod.Conn.Close() + r.UnregisterMod(channelID) + } + + e.mu.Lock() + enq := e.Queue.Enqueue(out) + e.mu.Unlock() + + if !enq { + return false, false, fmt.Errorf("queue disabled") + } + return false, true, nil +} + +// + +func (r *Registry) FlushChannelWithSender(channelID string, sendOverConn func(*websocket.Conn, GatewayMessageOut) error) { + r.mu.RLock() + e := r.entries[channelID] + r.mu.RUnlock() + if e == nil { + return + } + + e.mu.Lock() + if e.Mod == nil || e.Mod.Conn == nil { + e.mu.Unlock() + return + } + + msgs := e.Queue.PopAll() + modConn := e.Mod.Conn + e.mu.Unlock() + + for _, m := range msgs { + if err := sendOverConn(modConn, m); err != nil { + // if send fails, re-enqueue (best-effort), drop-oldest logic applies + e.mu.Lock() + _ = e.Queue.Enqueue(m) + e.mu.Unlock() + } + } +} + +func (r *Registry) FlushAllToBotWithSender(sendOverConn func(*websocket.Conn, GatewayMessageOut) error) { + r.botMu.Lock() + b := r.bot + r.botMu.Unlock() + if b == nil || b.Conn == nil { + return + } + + r.mu.RLock() + entries := make([]*ChannelEntry, 0, len(r.entries)) + for _, e := range r.entries { + entries = append(entries, e) + } + r.mu.RUnlock() + + for _, e := range entries { + e.mu.Lock() + msgs := e.Queue.PopAll() + e.mu.Unlock() + + if len(msgs) == 0 { + continue + } + for _, m := range msgs { + if err := sendOverConn(b.Conn, m); err != nil { + e.mu.Lock() + _ = e.Queue.Enqueue(m) + e.mu.Unlock() + } + } + } +} diff --git a/ws/structs.go b/ws/structs.go new file mode 100644 index 0000000..f601775 --- /dev/null +++ b/ws/structs.go @@ -0,0 +1,110 @@ +package ws + +import ( + "encoding/json" + "log/slog" + "sync" + "time" + + "github.com/gorilla/websocket" +) + +type WebsocketGateway struct { + port int + apiKey string + bodySizeBytes int64 + + upgrader websocket.Upgrader + + registry *Registry + + logger *slog.Logger + closeFn func() error +} + +// + +type Registry struct { + mu sync.RWMutex + entries map[string]*ChannelEntry + queueCap int + + botMu sync.Mutex + bot *ConnWrapper +} + +type ConnWrapper struct { + Conn *websocket.Conn + ServerID string // set for mods (the server_id) + LastSeen time.Time +} + +type BoundedQueue struct { + mu sync.Mutex + buf []GatewayMessageOut + start int + length int + capacity int +} + +type ChannelEntry struct { + mu sync.Mutex + Channel string + Mod *ConnWrapper + Queue *BoundedQueue +} + +// + +type User struct { + ID string `json:"id"` + Name string `json:"name"` +} + +type Destination struct { + ID string `json:"channel_id,omitempty"` +} + +type GatewayMessageIn struct { + Type string + ID string `json:"id"` + MsgID string `json:"msg_id"` + Destination Destination `json:"destination,omitempty"` + Author User `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts time.Time `json:"ts,omitempty"` + ReceivedAt time.Time `json:"-"` +} + +type GatewayMessageOut struct { + Type string `json:"type"` + ID string `json:"channel_id,omitempty"` + Author User `json:"author"` + Content string `json:"content"` + Meta map[string]interface{} `json:"meta,omitempty"` + Ts time.Time `json:"ts,omitempty"` + ReceivedAt time.Time `json:"received_at"` + ForwardedAt time.Time `json:"forwarded_at"` +} + +type GatewayAck struct { + Status string `json:"status"` + Type string `json:"type"` +} + +// + +type Handshake struct { + Type string `json:"type"` // "mod" or "bot" + Data json.RawMessage `json:"data"` +} + +type ModHandshake struct { + ServerID string `json:"server_id"` + ChannelID string `json:"channel_id"` +} + +type BotHandshake struct { + ChannelId string `json:"channel_id"` +} diff --git a/ws/util.go b/ws/util.go new file mode 100644 index 0000000..3e9a8b7 --- /dev/null +++ b/ws/util.go @@ -0,0 +1,213 @@ +package ws + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "strings" + "time" + + "github.com/gorilla/websocket" +) + +// (de-)register + +func (wsg *WebsocketGateway) listen(srv *http.Server, addr string, channel chan error) { + wsg.logger.Info("Gateway listening.", "addr", addr) + + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + channel <- err + } + + close(channel) +} + +func (wsg *WebsocketGateway) deafen(srv *http.Server) { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + wsg.logger.Info("Shutting down Websocket Gateway.") + + _ = srv.Shutdown(shutdownCtx) + wsg.closeAll() +} + +// responses + +func (wsg *WebsocketGateway) flush(c *websocket.Conn, m GatewayMessageOut) error { + _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) + return c.WriteJSON(m) +} + +func (wsg *WebsocketGateway) sendHttpError(w http.ResponseWriter, message string, code int) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(map[string]interface{}{"message": message, "code": code}) +} + +func (wsg *WebsocketGateway) sendWebsocketPing(conn *websocket.Conn) { + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _ = conn.WriteMessage(websocket.PingMessage, nil) +} + +func (wsg *WebsocketGateway) sendWebsocketError(conn *websocket.Conn, message string, code int, close bool) { + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + _ = conn.WriteJSON(map[string]interface{}{"message": message, "code": code}) + if close { + _ = conn.Close() + } +} + +func (wsg *WebsocketGateway) sendWebsocketResponse(conn *websocket.Conn, content interface{}) error { + _ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second)) + + if err := conn.WriteJSON(content); err != nil { + wsg.logger.Error("Failed to respond to connection.", "remote", conn.RemoteAddr().String(), "err", err) + _ = conn.Close() + return err + } + + return nil +} + +// util + +func (wsg *WebsocketGateway) validateAndUpgradeConnection(w http.ResponseWriter, r *http.Request) (*websocket.Conn, error) { + if !wsg.validateApiKey(r) { + wsg.sendHttpError(w, "Unauthorized", 401) + wsg.logger.Warn("Authorization failed", "remote", r.RemoteAddr) + return nil, errors.New("unauthorized") + } + + conn, err := wsg.upgrader.Upgrade(w, r, nil) + if err != nil { + wsg.logger.Error("Upgrade error.", "remote", r.RemoteAddr) + return nil, err + } + + return conn, err +} + +func (wsg *WebsocketGateway) validateApiKey(r *http.Request) bool { + apiKey := r.URL.Query().Get("api_key") + if apiKey == "" { + apiKey = r.Header.Get("X-API-Key") + } + + return !(apiKey == "" || apiKey != wsg.apiKey) +} + +func (wsg *WebsocketGateway) loggingMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + next.ServeHTTP(w, r) + wsg.logger.Info("Incoming HTTP request.", "remote", r.RemoteAddr, "path", r.URL.Path, "duration", time.Since(start)) + }) +} + +// connections + +func closeConn(conn *websocket.Conn) { + _ = conn.SetWriteDeadline(time.Now().Add(time.Second)) + _ = conn.WriteControl( + websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Disconnecting."), + time.Now().Add(time.Second), + ) + _ = conn.Close() +} + +func (wsg *WebsocketGateway) registerConn(conn *websocket.Conn, typ, channelId, serverId string) bool { + if typ == "bot" { + wsg.registry.botMu.Lock() + if wsg.registry.bot != nil && wsg.registry.bot.Conn != nil { + wsg.registry.botMu.Unlock() + return false + } + wsg.registry.botMu.Unlock() + + wsg.registry.RegisterBot(conn) + return true + } + + wsg.registry.RegisterMod(channelId, serverId, conn) + return true +} + +func (wsg *WebsocketGateway) unregisterConn(typ, channelId string) { + if typ == "bot" { + wsg.registry.UnregisterBot() + return + } + + wsg.registry.UnregisterMod(channelId) +} + +func (wsg *WebsocketGateway) closeAll() { + wsg.logger.Info("Closing all websocket connections.") + + wsg.registry.UnregisterBot() + + wsg.registry.ForEach(func(channelID string) { + wsg.registry.UnregisterMod(channelID) + }) +} + +// + +func NewUpgrader() websocket.Upgrader { + return websocket.Upgrader{ + ReadBufferSize: 1024, + WriteBufferSize: 1024, + CheckOrigin: func(r *http.Request) bool { + return true // local by default; change for production + }, + } +} + +func NewRegistry(queueCap int) *Registry { + return &Registry{ + entries: make(map[string]*ChannelEntry), + queueCap: queueCap, + } +} + +func NewBoundedQueue(cap int) *BoundedQueue { + if cap <= 0 { + cap = 128 + } + return &BoundedQueue{buf: make([]GatewayMessageOut, cap), capacity: cap} +} + +func newChannelEntry(channel string, cap int) *ChannelEntry { + return &ChannelEntry{ + Channel: channel, + Queue: NewBoundedQueue(cap), + } +} + +// + +func (m *GatewayMessageIn) Validate() error { + if strings.TrimSpace(m.ID) == "" { + return errors.New("id missing") + } + if strings.TrimSpace(m.MsgID) == "" { + return errors.New("msg_id missing") + } + if strings.TrimSpace(m.Author.ID) == "" { + return errors.New("author.id missing") + } + if strings.TrimSpace(m.Content) == "" { + return errors.New("content missing") + } + + if m.Type == "mod" && strings.TrimSpace(m.Destination.ID) == "" { + return errors.New("destination.channel_id missing") + } + + return nil +} + +func (c *ConnWrapper) Alive() bool { return c != nil && c.Conn != nil } diff --git a/ws/websocket.go b/ws/websocket.go new file mode 100644 index 0000000..d392fac --- /dev/null +++ b/ws/websocket.go @@ -0,0 +1,155 @@ +package ws + +import ( + "context" + "encoding/json" + "fmt" + "homestead/homestead_gateway/util/config" + "log/slog" + "net" + "net/http" + "os/signal" + "syscall" + "time" + + "github.com/gorilla/websocket" +) + +func NewWebsocketGateway(cfg config.GatewayConfig, logger *slog.Logger, closefn func() error) *WebsocketGateway { + return &WebsocketGateway{ + logger: logger, + closeFn: closefn, + port: cfg.HttpPort, + apiKey: cfg.Websocket, + upgrader: NewUpgrader(), + registry: NewRegistry(cfg.QueueSize), + bodySizeBytes: int64(cfg.BodySize) * 1024 * 1024, + } +} + +func (wsg *WebsocketGateway) Start() error { + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + return wsg.Serve(ctx, fmt.Sprintf(":%d", wsg.port)) +} + +func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { + mux := http.NewServeMux() + mux.HandleFunc("/sync", wsg.handleSync) + mux.HandleFunc("/health", wsg.handleHealth) + + srv := &http.Server{ + Addr: listenAddr, + Handler: wsg.loggingMiddleware(mux), + BaseContext: func(l net.Listener) context.Context { return ctx }, + } + errCh := make(chan error, 1) + + go wsg.listen(srv, listenAddr, errCh) + + select { + case <-ctx.Done(): + wsg.deafen(srv) + return nil + case err := <-errCh: + return err + } +} + +// + +func (wsg *WebsocketGateway) read(conn *websocket.Conn, _type, channelId string) { + defer func() { + wsg.unregisterConn(_type, channelId) + wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String()) + }() + + ticker := time.NewTicker(30 * time.Second) + ctx, cancel := context.WithCancel(context.Background()) + defer ticker.Stop() + defer cancel() + + go func() { + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + wsg.sendWebsocketPing(conn) + } + } + }() + + for { + typ, data, err := conn.ReadMessage() + if err != nil { + if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { + wsg.logger.Error("Client unexpectedly closed the connection.", "err", err) + } + return + } + + if typ != websocket.TextMessage && typ != websocket.BinaryMessage { + continue + } + + ts := time.Now().UTC() + var message GatewayMessageIn + if err := json.Unmarshal(data, &message); err != nil { + wsg.sendWebsocketError(conn, "Malformed message.", 400, false) + wsg.logger.Warn("Received malformed message json from client.", "remote", conn.RemoteAddr().String(), "err", err) + continue + } + + message.Type = _type + message.ReceivedAt = ts + if err := message.Validate(); err != nil { + wsg.sendWebsocketError(conn, "Malformed message.", 400, false) + wsg.logger.Warn("Received malformed message json from client; validation failed.", "remote", conn.RemoteAddr().String(), "err", err) + continue + } + + var outID string + if _type == "mod" { + outID = channelId + } else { + outID = message.ID + } + + out := GatewayMessageOut{ + Type: message.Type, + ID: outID, + Author: message.Author, + Content: message.Content, + Meta: message.Meta, + Ts: message.Ts, + ReceivedAt: message.ReceivedAt, + ForwardedAt: time.Now().UTC(), + } + + delivered, queued, err := wsg.registry.Send(out.ID, out, func(c *websocket.Conn, m GatewayMessageOut) error { + _ = c.SetWriteDeadline(time.Now().Add(5 * time.Second)) + return c.WriteJSON(m) + }) + + if err != nil { + wsg.logger.Error("registry send error", "err", err) + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) + continue + } + + if delivered { + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "completed", Type: message.Type}) + continue + } + + if queued { + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "queued", Type: message.Type}) + continue + } + + _ = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "failed", Type: message.Type}) + + } +}