temp push

This commit is contained in:
2025-12-02 10:34:45 +01:00
parent 8f7db6256b
commit 7a30adc2e8
7 changed files with 73 additions and 25 deletions

View File

@@ -21,7 +21,5 @@ func main() {
} }
/** TODO /** TODO
- queue for messages, both ways (Ack "queued" instead of "completed"), filled queue drops oldest entry - queue for messages, both ways (Ack "queued" instead of "completed"), filled queue drops oldest entry
*/ */

4
sim.go
View File

@@ -1,3 +1,5 @@
//go:build sim
package main package main
import ( import (
@@ -13,7 +15,7 @@ import (
) )
const ( const (
gatewayURL = "ws://localhost:3333/push" gatewayURL = "ws://localhost:3333/sync"
apiKey = "gateway" apiKey = "gateway"
serverID = "test-server-001" serverID = "test-server-001"
) )

View File

@@ -9,7 +9,7 @@ import (
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request) { func (wsg *WebsocketGateway) handleSync(w http.ResponseWriter, r *http.Request) {
conn, err := wsg.validateAndUpgradeConnection(w, r) conn, err := wsg.validateAndUpgradeConnection(w, r)
if err != nil { if err != nil {
return return
@@ -65,7 +65,7 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request)
return return
} }
wsg.registerConn(conn, meta) wsg.registerConn(conn, meta, "mod")
wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID) wsg.logger.Info("Mod connected via Websocket.", "remote", conn.RemoteAddr().String(), "server_id", mhs.ServerID)
go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps go wsg.modReadLoop(conn, meta) // replace with external handler mayhaps
@@ -81,11 +81,15 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request)
meta.ID = bhs.BotID meta.ID = bhs.BotID
if ok := wsg.registerConn(conn, meta, "bot"); !ok {
wsg.sendWebsocketError(conn, "Bot already connected.", 409)
return
}
if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil { if err = wsg.sendWebsocketResponse(conn, GatewayAck{Status: "connected", Type: "bot"}); err != nil {
return return
} }
wsg.registerConn(conn, meta)
wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID) wsg.logger.Info("Bot connected via Websocket.", "remote", conn.RemoteAddr().String(), "bot_id", bhs.BotID)
go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps go wsg.botReadLoop(conn, meta) // replace with external handler mayhaps
@@ -97,12 +101,8 @@ func (wsg *WebsocketGateway) handlePush(w http.ResponseWriter, r *http.Request)
} }
} }
func (wsg *WebsocketGateway) handleReady(w http.ResponseWriter, r *http.Request) {}
func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) { func (wsg *WebsocketGateway) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
w.WriteHeader(200) w.WriteHeader(200)
_ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"}) _ = json.NewEncoder(w).Encode(map[string]interface{}{"status": "healthy"})
} }
func (wsg *WebsocketGateway) handleRegister(w http.ResponseWriter, r *http.Request) {}

View File

@@ -16,9 +16,9 @@ type WebsocketGateway struct {
upgrader websocket.Upgrader upgrader websocket.Upgrader
cache *cache.Cache cache *cache.Cache
botConn *websocket.Conn bot *websocket.Conn
conns *cache.ConnectionCache conns *cache.ConnectionCache
modHandler ModHandler modHandler ModHandler
botHandler BotHandler botHandler BotHandler

View File

@@ -40,6 +40,13 @@ func (h *LoggingModHandler) Handle(conn *websocket.Conn, msg GatewayModMessageIn
ForwardedAt: time.Now().UTC(), ForwardedAt: time.Now().UTC(),
} }
_ = conn.SetWriteDeadline(time.Now().Add(5 * time.Second))
if err := conn.WriteJSON(fwd); err != nil {
_ = conn.Close()
return err
}
b, _ := json.Marshal(fwd) b, _ := json.Marshal(fwd)
h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content) h.logger.Info("received mod message", "msg_id", msg.MsgID, "server", msg.Server, "Author", msg.Author.Name, "content", msg.Content)
h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b)) h.logger.Debug("forwarding mod message", "msg_id", msg.MsgID, "server", msg.Server, "payload", string(b))

View File

@@ -105,17 +105,33 @@ func loggingMiddleware(logger *slog.Logger, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now() start := time.Now()
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
logger.Info("http request", "remote", r.RemoteAddr, "method", r.Method, "path", r.URL.Path, "duration", time.Since(start)) logger.Info("Incoming HTTP request.", "remote", r.RemoteAddr, "path", r.URL.Path, "duration", time.Since(start))
}) })
} }
// connections // connections
func (wsg *WebsocketGateway) registerConn(conn *websocket.Conn, meta cache.ConnectionMetaData) { func (wsg *WebsocketGateway) registerConn(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) bool {
if typ == "bot" {
if wsg.bot != nil {
return false
}
wsg.bot = conn
return true
}
wsg.conns.Set(meta.ID, conn, &meta) wsg.conns.Set(meta.ID, conn, &meta)
return true
} }
func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.ConnectionMetaData) { func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) {
if typ == "bot" {
_ = wsg.bot.Close()
wsg.bot = nil
return
}
wsg.conns.RemoveById(meta.ID) wsg.conns.RemoveById(meta.ID)
_ = conn.Close() _ = conn.Close()
} }
@@ -123,8 +139,15 @@ func (wsg *WebsocketGateway) unregisterConn(conn *websocket.Conn, meta cache.Con
func (wsg *WebsocketGateway) closeAll() { func (wsg *WebsocketGateway) closeAll() {
wsg.logger.Info("Closing all websocket connections.") wsg.logger.Info("Closing all websocket connections.")
if wsg.bot != nil {
_ = wsg.bot.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second))
_ = wsg.bot.Close()
}
wsg.conns.Range(func(id string, c *websocket.Conn, meta *cache.ConnectionMetaData) { wsg.conns.Range(func(id string, c *websocket.Conn, meta *cache.ConnectionMetaData) {
_ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second)) _ = c.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "Shutting down."), time.Now().Add(time.Second))
_ = c.Close() _ = c.Close()
}) })
wsg.conns.Clear()
} }

View File

@@ -46,10 +46,8 @@ func (wsg *WebsocketGateway) Start() error {
func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error { func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/push", wsg.handlePush) mux.HandleFunc("/sync", wsg.handleSync)
mux.HandleFunc("/ready", wsg.handleReady)
mux.HandleFunc("/health", wsg.handleHealth) mux.HandleFunc("/health", wsg.handleHealth)
mux.HandleFunc("/register", wsg.handleRegister)
srv := &http.Server{ srv := &http.Server{
Addr: listenAddr, Addr: listenAddr,
@@ -73,8 +71,8 @@ func (wsg *WebsocketGateway) Serve(ctx context.Context, listenAddr string) error
func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) {
defer func() { defer func() {
wsg.unregisterConn(conn, meta) wsg.unregisterConn(conn, meta, "mod")
wsg.logger.Info("Mod-Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID) wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String(), "server_id", meta.ID)
}() }()
ticker := time.NewTicker(30 * time.Second) ticker := time.NewTicker(30 * time.Second)
@@ -91,9 +89,7 @@ func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.Connec
if err != nil { if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure) {
wsg.logger.Warn("unexpected mod close", "server_id", meta.ID, "err", err) wsg.logger.Warn("Mod-Client unexpectedly closed the connection.", "err", err)
} else {
wsg.logger.Debug("mod read error", "server_id", meta.ID, "err", err)
} }
return return
} }
@@ -129,7 +125,7 @@ func (wsg *WebsocketGateway) modReadLoop(conn *websocket.Conn, meta cache.Connec
func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) { func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.ConnectionMetaData) {
defer func() { defer func() {
wsg.unregisterConn(conn, meta) wsg.unregisterConn(conn, meta, "bot")
wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String()) wsg.logger.Info("bot disconnected", "bot_id", meta.ID, "remote", conn.RemoteAddr().String())
}() }()
@@ -187,3 +183,25 @@ func (wsg *WebsocketGateway) botReadLoop(conn *websocket.Conn, meta cache.Connec
_ = writeJSONSafe(conn, map[string]string{"status": "ok"}) _ = writeJSONSafe(conn, map[string]string{"status": "ok"})
} }
} }
func (wsg *WebsocketGateway) read(conn *websocket.Conn, meta cache.ConnectionMetaData, typ string) {
defer func() {
wsg.unregisterConn(conn, meta, typ)
wsg.logger.Info("Client disconnected.", "remote", conn.RemoteAddr().String())
}()
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
go func() {
for range ticker.C {
wsg.sendWebsocketPing(conn)
}
}()
switch typ {
case "mod":
case "bot":
}
}