diff --git a/api/analytic/analytic.go b/api/analytic/analytic.go index 2d0ac8f5..51998879 100644 --- a/api/analytic/analytic.go +++ b/api/analytic/analytic.go @@ -34,6 +34,8 @@ func Analytic(c *gin.Context) { defer ws.Close() + peerGone := startWSKeepalive(ws) + var stat Stat for { @@ -84,6 +86,7 @@ func Analytic(c *gin.Context) { stat.Network = *network // write + _ = ws.SetWriteDeadline(time.Now().Add(wsWriteWait)) err = ws.WriteJSON(stat) if err != nil { if helper.IsUnexpectedWebsocketError(err) { @@ -96,6 +99,9 @@ func Analytic(c *gin.Context) { case <-kernel.Context.Done(): logger.Debug("Analytic: Context cancelled, closing WebSocket") return + case <-peerGone: + logger.Debug("Analytic: peer disconnected, closing WebSocket") + return case <-time.After(1 * time.Second): } } diff --git a/api/analytic/nodes.go b/api/analytic/nodes.go index cb502c02..6c16d8d4 100644 --- a/api/analytic/nodes.go +++ b/api/analytic/nodes.go @@ -27,6 +27,8 @@ func GetNodeStat(c *gin.Context) { defer ws.Close() + peerGone := startWSKeepalive(ws) + // Counter to track iterations for periodic full info update counter := 0 const fullInfoInterval = 6 // Send full info every 6 iterations (every minute if interval is 10s) @@ -70,6 +72,7 @@ func GetNodeStat(c *gin.Context) { } // write + _ = ws.SetWriteDeadline(time.Now().Add(wsWriteWait)) err = ws.WriteJSON(data) if err != nil { if helper.IsUnexpectedWebsocketError(err) { @@ -84,6 +87,9 @@ func GetNodeStat(c *gin.Context) { case <-kernel.Context.Done(): logger.Debug("GetNodeStat: Context cancelled, closing WebSocket") return + case <-peerGone: + logger.Debug("GetNodeStat: peer disconnected, closing WebSocket") + return case <-time.After(10 * time.Second): } } @@ -102,9 +108,12 @@ func GetNodesAnalytic(c *gin.Context) { defer ws.Close() + peerGone := startWSKeepalive(ws) + for { // Send snapshot of NodeMap data to client to avoid concurrent access nodeSnapshot := analytic.SnapshotNodeMap() + _ = ws.SetWriteDeadline(time.Now().Add(wsWriteWait)) err = ws.WriteJSON(nodeSnapshot) if err != nil { if helper.IsUnexpectedWebsocketError(err) { @@ -117,6 +126,9 @@ func GetNodesAnalytic(c *gin.Context) { case <-kernel.Context.Done(): logger.Debug("GetNodesAnalytic: Context cancelled, closing WebSocket") return + case <-peerGone: + logger.Debug("GetNodesAnalytic: peer disconnected, closing WebSocket") + return case <-time.After(10 * time.Second): } } diff --git a/api/analytic/ws_keepalive.go b/api/analytic/ws_keepalive.go new file mode 100644 index 00000000..2784abb8 --- /dev/null +++ b/api/analytic/ws_keepalive.go @@ -0,0 +1,72 @@ +package analytic + +import ( + "time" + + "github.com/0xJacky/Nginx-UI/internal/helper" + "github.com/gorilla/websocket" + "github.com/uozi-tech/cosy/logger" +) + +// Analytic push handlers only write to the client — they never read. Without +// ping/pong and read deadlines, a silently half-closed TCP connection keeps +// the server looping and writing for hours until the OS finally surfaces the +// error, while the client sees no updates and never triggers auto-reconnect. +// These constants mirror the values used by api/cluster/websocket.go so both +// sides of the cluster share the same keepalive contract. +const ( + wsWriteWait = 10 * time.Second + wsPongWait = 60 * time.Second + wsPingPeriod = (wsPongWait * 9) / 10 +) + +// startWSKeepalive arms a read deadline + pong handler on the connection and +// spawns two goroutines: a reader that drains control frames (so pongs reset +// the deadline) and a pinger that emits a ping on pingPeriod. When the peer +// stops responding, the read deadline fires, the reader returns, and done is +// closed. The handler's deferred ws.Close() ultimately releases the socket — +// this helper never calls Close() itself on the read path, so a caller must +// `defer ws.Close()` after upgrade. +// +// The returned done channel is closed once the reader exits (read error, +// read deadline expiry, or peer close). Callers should select on it to bail +// out of their write loop promptly instead of waiting for the next WriteJSON +// to fail. +func startWSKeepalive(ws *websocket.Conn) <-chan struct{} { + done := make(chan struct{}) + + _ = ws.SetReadDeadline(time.Now().Add(wsPongWait)) + ws.SetPongHandler(func(string) error { + return ws.SetReadDeadline(time.Now().Add(wsPongWait)) + }) + + go func() { + defer close(done) + for { + if _, _, err := ws.ReadMessage(); err != nil { + if helper.IsUnexpectedWebsocketError(err) { + logger.Error("WebSocket read error:", err) + } + return + } + } + }() + + go func() { + ticker := time.NewTicker(wsPingPeriod) + defer ticker.Stop() + for { + select { + case <-done: + return + case <-ticker.C: + if err := ws.WriteControl(websocket.PingMessage, nil, time.Now().Add(wsWriteWait)); err != nil { + _ = ws.Close() + return + } + } + } + }() + + return done +} diff --git a/app/src/pinia/moudule/nodeAvailability.ts b/app/src/pinia/moudule/nodeAvailability.ts index a30d127f..76c7d926 100644 --- a/app/src/pinia/moudule/nodeAvailability.ts +++ b/app/src/pinia/moudule/nodeAvailability.ts @@ -1,4 +1,5 @@ import type { AnalyticNode, Node } from '@/api/node' +import { useDocumentVisibility, useEventListener, useOnline } from '@vueuse/core' import analytic from '@/api/analytic' import nodeApi from '@/api/node' import { useWebSocket } from '@/lib/websocket' @@ -150,6 +151,34 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => { } } + // The underlying useWebSocket gives up after ~10 quick retries. That's fine + // for a transient blip but leaves us stuck for long pauses — backend restart, + // laptop sleep, flaky VPN. Re-opening the socket whenever the tab becomes + // visible or the network comes back provides a cheap recovery path without + // hammering the server while the user isn't looking. + if (typeof window !== 'undefined') { + const visibility = useDocumentVisibility() + const online = useOnline() + + watch(visibility, (value, previous) => { + if (!isInitialized.value) { + return + } + if (value === 'visible' && previous !== 'visible' && !isConnected.value) { + connectWebSocket() + } + }) + + watch(online, (value, previous) => { + if (!isInitialized.value) { + return + } + if (value && !previous && !isConnected.value) { + connectWebSocket() + } + }) + } + // Start monitoring (initialize + WebSocket) async function startMonitoring() { await initialize() @@ -193,12 +222,12 @@ export const useNodeAvailabilityStore = defineStore('nodeAvailability', () => { return node?.name ?? '' } - // Auto-cleanup WebSocket on page unload - if (typeof window !== 'undefined') { - window.addEventListener('beforeunload', () => { - stopMonitoring() - }) - } + // Auto-cleanup WebSocket on page unload. Using VueUse's useEventListener ties + // the listener to the setup store's effect scope so HMR/$dispose reliably + // removes it instead of accumulating duplicates. + useEventListener(typeof window !== 'undefined' ? window : null, 'beforeunload', () => { + stopMonitoring() + }) return { nodes: readonly(nodes), diff --git a/internal/analytic/node_record.go b/internal/analytic/node_record.go index 234932a3..34498927 100644 --- a/internal/analytic/node_record.go +++ b/internal/analytic/node_record.go @@ -54,6 +54,17 @@ var ( retryMutex sync.Mutex ) +// WebSocket keepalive timings for the connection to remote nodes. +// pongWait bounds how long ReadJSON may block; pingPeriod must be < pongWait so +// the peer has a chance to respond before the deadline fires. Declared as var +// (not const) so tests can shorten them without redefining the production +// defaults. +var ( + nodeWSWriteWait = 10 * time.Second + nodeWSPongWait = 60 * time.Second + nodeWSPingPeriod = (nodeWSPongWait * 9) / 10 +) + func getRetryState(nodeID uint64) *NodeRetryState { retryMutex.Lock() defer retryMutex.Unlock() @@ -381,6 +392,11 @@ func RetrieveNodesStatus(ctx context.Context) { continue } if err := nodeAnalyticRecord(n, ctx); err != nil { + // Context cancellation means the manager is shutting + // down — don't pollute retry state with phantom failures. + if ctx.Err() != nil { + return + } if helper.IsUnexpectedWebsocketError(err) { logger.Error(err) } @@ -487,6 +503,14 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error { updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed") }() + // Arm read deadline and refresh it on every pong. Without this, a silently + // half-dead TCP connection (NAT drop, peer hang) would block ReadJSON below + // indefinitely, freezing this node's retry loop until the process restarts. + _ = c.SetReadDeadline(time.Now().Add(nodeWSPongWait)) + c.SetPongHandler(func(string) error { + return c.SetReadDeadline(time.Now().Add(nodeWSPongWait)) + }) + go func() { select { case <-scopeCtx.Done(): @@ -496,6 +520,26 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error { } }() + // Periodic ping keeps the connection warm and triggers the deadline above + // when the peer stops responding. + go func() { + ticker := time.NewTicker(nodeWSPingPeriod) + defer ticker.Stop() + for { + select { + case <-scopeCtx.Done(): + return + case <-ctx.Done(): + return + case <-ticker.C: + if err := c.WriteControl(websocket.PingMessage, nil, time.Now().Add(nodeWSWriteWait)); err != nil { + _ = c.Close() + return + } + } + } + }() + for { select { case <-scopeCtx.Done(): @@ -508,11 +552,16 @@ func nodeAnalyticRecord(nodeModel *model.Node, ctx context.Context) error { var rawMsg json.RawMessage err = c.ReadJSON(&rawMsg) if err != nil { + // Surface every read failure (close frame, read deadline expiry, TCP + // reset) as a retryable error. Returning nil here used to trigger + // markConnectionSuccess on the caller, hiding dead connections and + // flipping node status back to online on the next snapshot. if helper.IsUnexpectedWebsocketError(err) { updateNodeStatus(nodeModel.ID, false, "websocket_error") - return err + } else { + updateNodeStatus(nodeModel.ID, false, "websocket_connection_closed") } - return nil + return err } nodeMapMu.Lock() diff --git a/internal/analytic/node_record_test.go b/internal/analytic/node_record_test.go new file mode 100644 index 00000000..37d483b6 --- /dev/null +++ b/internal/analytic/node_record_test.go @@ -0,0 +1,102 @@ +package analytic + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "strings" + "testing" + "time" + + "github.com/0xJacky/Nginx-UI/model" + "github.com/gorilla/websocket" +) + +// TestNodeAnalyticRecordHalfDeadConnection reproduces the bug that caused node +// status to freeze until the nginx-ui process was restarted: a remote node +// that accepts the WebSocket upgrade but then stops responding (e.g. silent +// TCP hang, peer frozen) used to leave nodeAnalyticRecord blocked on ReadJSON +// forever, starving the per-node retry loop. With the keepalive in place, +// ReadJSON must unblock within pongWait and return an error so the caller can +// schedule a reconnect. +func TestNodeAnalyticRecordHalfDeadConnection(t *testing.T) { + // Shrink the keepalive window so the test finishes quickly. Restore on exit + // so other tests in the package see the production values. + origPong, origPing, origWrite := nodeWSPongWait, nodeWSPingPeriod, nodeWSWriteWait + nodeWSPongWait = 300 * time.Millisecond + nodeWSPingPeriod = 100 * time.Millisecond + nodeWSWriteWait = 100 * time.Millisecond + t.Cleanup(func() { + nodeWSPongWait, nodeWSPingPeriod, nodeWSWriteWait = origPong, origPing, origWrite + }) + + // A test server that satisfies InitNode's HTTP probe and then accepts the + // analytic WebSocket upgrade but never writes a message or answers a ping. + upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} + mux := http.NewServeMux() + mux.HandleFunc("/api/node", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(NodeInfo{Version: "test"}) + }) + mux.HandleFunc("/api/analytic/intro", func(w http.ResponseWriter, r *http.Request) { + c, err := upgrader.Upgrade(w, r, nil) + if err != nil { + return + } + defer c.Close() + // Swallow the ping by overriding the default pong-on-ping handler: do + // nothing, so the client's read deadline must expire on its own. + c.SetPingHandler(func(string) error { return nil }) + // Block until the connection is closed by the peer. + for { + if _, _, err := c.ReadMessage(); err != nil { + return + } + } + }) + srv := httptest.NewServer(mux) + t.Cleanup(srv.Close) + + // Use the raw httptest URL; GetWebSocketURL will rewrite http:// to ws://. + node := &model.Node{ + Model: model.Model{ID: 42}, + Name: "half-dead", + URL: srv.URL, + Token: "test-token", + } + // Make sure the NodeMap slot exists so updateNodeStatus is a no-op on the + // shared map across parallel tests. + nodeMapMu.Lock() + if NodeMap == nil { + NodeMap = make(TNodeMap) + } + nodeMapMu.Unlock() + t.Cleanup(func() { + nodeMapMu.Lock() + delete(NodeMap, node.ID) + nodeMapMu.Unlock() + }) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + errCh := make(chan error, 1) + go func() { + errCh <- nodeAnalyticRecord(node, ctx) + }() + + select { + case err := <-errCh: + if err == nil { + t.Fatalf("expected nodeAnalyticRecord to fail on read deadline, got nil") + } + // Read-deadline expiry surfaces as an i/o timeout wrapped in the + // websocket close-error path; either way it must be non-nil. + if !strings.Contains(err.Error(), "timeout") && !strings.Contains(err.Error(), "closed") { + t.Logf("returned err = %v (non-nil, acceptable)", err) + } + case <-time.After(2 * time.Second): + t.Fatalf("nodeAnalyticRecord did not return within 2s — read deadline / ping-pong not enforced") + } +}