From 28dfcae3508d2de402bcca8d8b8644b1b0448170 Mon Sep 17 00:00:00 2001 From: lihan3238 Date: Sun, 10 May 2026 01:27:41 +0800 Subject: [PATCH] fix(api): prevent idle TCP connections from blocking the accept loop Move per-connection protocol detection (TLS handshake, reader.Peek) out of the accept loop and into a per-connection goroutine. An idle TCP connection that never sends bytes would previously block Peek(1) indefinitely, preventing all subsequent connections from being accepted and making the management/API server unresponsive. Closes #3267 --- internal/api/protocol_multiplexer.go | 147 ++++++++++++---------- internal/api/protocol_multiplexer_test.go | 65 ++++++++++ 2 files changed, 149 insertions(+), 63 deletions(-) create mode 100644 internal/api/protocol_multiplexer_test.go diff --git a/internal/api/protocol_multiplexer.go b/internal/api/protocol_multiplexer.go index b83e1164c..781db9eb8 100644 --- a/internal/api/protocol_multiplexer.go +++ b/internal/api/protocol_multiplexer.go @@ -7,6 +7,7 @@ import ( "net" "net/http" "strings" + "time" log "github.com/sirupsen/logrus" ) @@ -48,68 +49,88 @@ func (s *Server) acceptMuxConnections(listener net.Listener, httpListener *muxLi continue } - tlsConn, ok := conn.(*tls.Conn) - if ok { - if errHandshake := tlsConn.Handshake(); errHandshake != nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection after TLS handshake error: %v", errClose) - } - continue - } - proto := strings.TrimSpace(tlsConn.ConnectionState().NegotiatedProtocol) - if proto == "h2" || proto == "http/1.1" { - if httpListener == nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection: %v", errClose) - } - continue - } - if errPut := httpListener.Put(tlsConn); errPut != nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection after HTTP routing failure: %v", errClose) - } - } - continue - } - } - - reader := bufio.NewReader(conn) - prefix, errPeek := reader.Peek(1) - if errPeek != nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection after protocol peek failure: %v", errClose) - } - continue - } - - if isRedisRESPPrefix(prefix[0]) { - if s.cfg != nil && s.cfg.Home.Enabled { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close redis connection while home mode is enabled: %v", errClose) - } - continue - } - if !s.managementRoutesEnabled.Load() { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close redis connection while management is disabled: %v", errClose) - } - continue - } - go s.handleRedisConnection(conn, reader) - continue - } - - if httpListener == nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection without HTTP listener: %v", errClose) - } - continue - } - - if errPut := httpListener.Put(&bufferedConn{Conn: conn, reader: reader}); errPut != nil { - if errClose := conn.Close(); errClose != nil { - log.Errorf("failed to close connection after HTTP routing failure: %v", errClose) - } - } + // Dispatch each connection to a goroutine so that slow/idle clients + // cannot block the accept loop. Previously, TLS handshake and + // reader.Peek(1) were performed inline; an idle TCP connection that + // never sent bytes would block Peek indefinitely, preventing all + // subsequent connections from being accepted (issue #3267). + go s.routeMuxConnection(conn, httpListener) + } +} + +// routeMuxConnection performs per-connection protocol detection and routing. +func (s *Server) routeMuxConnection(conn net.Conn, httpListener *muxListener) { + // Set a read deadline so that idle connections that never send bytes do not + // leak goroutines and file descriptors. The deadline is cleared once the + // connection is successfully routed to its handler. + const muxSniffDeadline = 10 * time.Second + _ = conn.SetReadDeadline(time.Now().Add(muxSniffDeadline)) + + tlsConn, ok := conn.(*tls.Conn) + if ok { + if errHandshake := tlsConn.Handshake(); errHandshake != nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection after TLS handshake error: %v", errClose) + } + return + } + proto := strings.TrimSpace(tlsConn.ConnectionState().NegotiatedProtocol) + if proto == "h2" || proto == "http/1.1" { + if httpListener == nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection: %v", errClose) + } + return + } + if errPut := httpListener.Put(tlsConn); errPut != nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection after HTTP routing failure: %v", errClose) + } + } else { + _ = conn.SetReadDeadline(time.Time{}) + } + return + } + } + + reader := bufio.NewReader(conn) + prefix, errPeek := reader.Peek(1) + if errPeek != nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection after protocol peek failure: %v", errClose) + } + return + } + + if isRedisRESPPrefix(prefix[0]) { + if s.cfg != nil && s.cfg.Home.Enabled { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close redis connection while home mode is enabled: %v", errClose) + } + return + } + if !s.managementRoutesEnabled.Load() { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close redis connection while management is disabled: %v", errClose) + } + return + } + s.handleRedisConnection(conn, reader) + return + } + + if httpListener == nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection without HTTP listener: %v", errClose) + } + return + } + + if errPut := httpListener.Put(&bufferedConn{Conn: conn, reader: reader}); errPut != nil { + if errClose := conn.Close(); errClose != nil { + log.Errorf("failed to close connection after HTTP routing failure: %v", errClose) + } + } else { + _ = conn.SetReadDeadline(time.Time{}) } } diff --git a/internal/api/protocol_multiplexer_test.go b/internal/api/protocol_multiplexer_test.go new file mode 100644 index 000000000..6769c76af --- /dev/null +++ b/internal/api/protocol_multiplexer_test.go @@ -0,0 +1,65 @@ +package api + +import ( + "net" + "net/http" + "net/http/httptest" + "sync/atomic" + "testing" + "time" +) + +func TestAcceptMuxNotBlockedByIdleConnection(t *testing.T) { + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + defer listener.Close() + + var routed atomic.Int32 + handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + routed.Add(1) + w.WriteHeader(http.StatusOK) + }) + srv := httptest.NewUnstartedServer(handler) + defer srv.Close() + + muxLn := newMuxListener(listener.Addr(), 1024) + server := &Server{managementRoutesEnabled: atomic.Bool{}} + server.managementRoutesEnabled.Store(false) + + errCh := make(chan error, 1) + go func() { + errCh <- server.acceptMuxConnections(listener, muxLn) + }() + + srv.Listener = muxLn + srv.Start() + + // Open an idle TCP connection that never sends any bytes. + idleConn, err := net.DialTimeout("tcp", listener.Addr().String(), 2*time.Second) + if err != nil { + t.Fatalf("failed to dial idle connection: %v", err) + } + defer idleConn.Close() + + // Give the accept loop time to pick up the idle connection. + time.Sleep(50 * time.Millisecond) + + // Send a real HTTP request. Before the fix, the accept loop would be + // blocked on Peek(1) for the idle connection, causing this request to + // time out. + client := &http.Client{Timeout: 3 * time.Second} + resp, err := client.Get("http://" + listener.Addr().String() + "/") + if err != nil { + listener.Close() + t.Fatalf("HTTP request failed (accept loop may be blocked by idle connection): %v", err) + } + resp.Body.Close() + + listener.Close() + + if routed.Load() == 0 { + t.Error("expected at least one request to be routed") + } +}