From bb5ac40a674cac65549852af9ecfcd6355acb0bb Mon Sep 17 00:00:00 2001 From: Luis Pater Date: Tue, 19 May 2026 16:44:42 +0800 Subject: [PATCH] feat(client): add timeout handling for Redis operations and subscription failover - Introduced `homeRedisOperationTimeout` and `homeSubscriptionReceiveTimeout` constants for configurable timeouts. - Enhanced Redis connection options with operation timeout settings and failover mechanisms. - Implemented subscription failover logic on heartbeat timeouts to improve resilience. - Updated message handling to support additional Redis event types, including Pong and Subscription. --- internal/home/client.go | 86 ++++++++++++++++++++++++++++++++++------- 1 file changed, 72 insertions(+), 14 deletions(-) diff --git a/internal/home/client.go b/internal/home/client.go index cb0850e40..2c81187e4 100644 --- a/internal/home/client.go +++ b/internal/home/client.go @@ -31,6 +31,8 @@ const ( homeReconnectInterval = time.Second homeReconnectFailoverThreshold = 3 + homeRedisOperationTimeout = 3 * time.Second + homeSubscriptionReceiveTimeout = 3 * time.Second redisChannelCluster = "cluster" ) @@ -177,9 +179,15 @@ func (c *Client) redisOptionsLocked(addr string) (*redis.Options, error) { return nil, errTLS } return &redis.Options{ - Addr: addr, - Password: c.homeCfg.Password, - TLSConfig: tlsConfig, + Addr: addr, + Password: c.homeCfg.Password, + TLSConfig: tlsConfig, + DialTimeout: homeRedisOperationTimeout, + ReadTimeout: homeRedisOperationTimeout, + WriteTimeout: homeRedisOperationTimeout, + MaxRetries: -1, + DialerRetries: 1, + ContextTimeoutEnabled: true, }, nil } @@ -429,6 +437,25 @@ func (c *Client) failoverAfterReconnectFailure() (bool, string) { } c.reconnectFailures = 0 + return c.switchToNextNodeLocked() +} + +func (c *Client) failoverAfterSubscriptionTimeout() (bool, string) { + if c == nil { + return false, "" + } + c.mu.Lock() + defer c.mu.Unlock() + + if !c.clusterDiscoveryEnabledLocked() { + c.reconnectFailures = 0 + return false, "" + } + c.reconnectFailures = 0 + return c.switchToNextNodeLocked() +} + +func (c *Client) switchToNextNodeLocked() (bool, string) { currentHost := strings.TrimSpace(c.homeCfg.Host) currentPort := c.homeCfg.Port candidates := append([]clusterNode(nil), c.clusterNodes...) @@ -451,6 +478,13 @@ func (c *Client) failoverAfterReconnectFailure() (bool, string) { return false, "" } +func (c *Client) markSubscriptionTimeout() { + switched, addr := c.failoverAfterSubscriptionTimeout() + if switched { + log.Warnf("home subscription heartbeat timeout; switching to %s", addr) + } +} + func (c *Client) resetReconnectFailures() { if c == nil { return @@ -708,7 +742,7 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte } // Ensure the subscription is established before marking heartbeat OK. - if _, errReceive := pubsub.Receive(ctx); errReceive != nil { + if _, errReceive := pubsub.ReceiveTimeout(ctx, homeSubscriptionReceiveTimeout); errReceive != nil { _ = pubsub.Close() c.markReconnectFailure("subscribe") sleepWithContext(ctx, homeReconnectInterval) @@ -719,28 +753,52 @@ func (c *Client) StartConfigSubscriber(ctx context.Context, onConfig func([]byte c.heartbeatOK.Store(true) for { - msg, errMsg := pubsub.ReceiveMessage(ctx) + event, errMsg := pubsub.ReceiveTimeout(ctx, homeSubscriptionReceiveTimeout) if errMsg != nil { _ = pubsub.Close() c.heartbeatOK.Store(false) - c.markReconnectFailure("subscription") + if isTimeoutError(errMsg) { + c.markSubscriptionTimeout() + } else { + c.markReconnectFailure("subscription") + } sleepWithContext(ctx, homeReconnectInterval) break } - if msg == nil { - continue - } - if errApply := c.handleSubscriptionPayload(msg.Channel, msg.Payload, onConfig); errApply != nil { - if strings.EqualFold(strings.TrimSpace(msg.Channel), redisChannelCluster) { - log.Warn("failed to apply cluster update from home control center, ignoring") - } else { - log.Warn("failed to apply config update from home control center, ignoring") + switch msg := event.(type) { + case *redis.Message: + if msg == nil { + continue } + if errApply := c.handleSubscriptionPayload(msg.Channel, msg.Payload, onConfig); errApply != nil { + if strings.EqualFold(strings.TrimSpace(msg.Channel), redisChannelCluster) { + log.Warn("failed to apply cluster update from home control center, ignoring") + } else { + log.Warn("failed to apply config update from home control center, ignoring") + } + } + case *redis.Pong: + c.resetReconnectFailures() + case *redis.Subscription: + continue + default: + log.Debugf("home subscription returned unsupported message type %T", event) } } } } +func isTimeoutError(err error) bool { + if err == nil { + return false + } + if errors.Is(err, context.DeadlineExceeded) { + return true + } + var netErr net.Error + return errors.As(err, &netErr) && netErr.Timeout() +} + func sleepWithContext(ctx context.Context, d time.Duration) { if d <= 0 { return