diff --git a/cmd/server/main.go b/cmd/server/main.go index e735b144c..b10bc9c8d 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -418,6 +418,7 @@ func main() { } } redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled) + redisqueue.SetRetentionSeconds(cfg.RedisUsageQueueRetentionSeconds) coreauth.SetQuotaCooldownDisabled(cfg.DisableCooling) if err = logging.ConfigureLogOutput(cfg); err != nil { diff --git a/config.example.yaml b/config.example.yaml index 172e961f6..d7d5a9f56 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -66,6 +66,10 @@ error-logs-max-files: 10 # When false, disable in-memory usage statistics aggregation usage-statistics-enabled: false +# How long (in seconds) Redis usage queue items are retained in memory for the RESP interface (LPOP/RPOP). +# Default: 60. Max: 3600. +redis-usage-queue-retention-seconds: 60 + # Proxy URL. Supports socks5/http/https protocols. Example: socks5://user:pass@192.168.1.1:1080/ # Per-entry proxy-url also supports "direct" or "none" to bypass both the global proxy-url and environment proxies explicitly. proxy-url: "" diff --git a/internal/api/server.go b/internal/api/server.go index 176bc2a38..2e89ac5a3 100644 --- a/internal/api/server.go +++ b/internal/api/server.go @@ -1000,6 +1000,10 @@ func (s *Server) UpdateClients(cfg *config.Config) { redisqueue.SetUsageStatisticsEnabled(cfg.UsageStatisticsEnabled) } + if oldCfg == nil || oldCfg.RedisUsageQueueRetentionSeconds != cfg.RedisUsageQueueRetentionSeconds { + redisqueue.SetRetentionSeconds(cfg.RedisUsageQueueRetentionSeconds) + } + if s.requestLogger != nil && (oldCfg == nil || oldCfg.ErrorLogsMaxFiles != cfg.ErrorLogsMaxFiles) { if setter, ok := s.requestLogger.(interface{ SetErrorLogsMaxFiles(int) }); ok { setter.SetErrorLogsMaxFiles(cfg.ErrorLogsMaxFiles) diff --git a/internal/config/config.go b/internal/config/config.go index 39c91127a..46ce4f509 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -65,6 +65,11 @@ type Config struct { // UsageStatisticsEnabled toggles in-memory usage aggregation; when false, usage data is discarded. UsageStatisticsEnabled bool `yaml:"usage-statistics-enabled" json:"usage-statistics-enabled"` + // RedisUsageQueueRetentionSeconds controls how long (in seconds) usage queue items + // are retained in memory for the Redis RESP interface (LPOP/RPOP). + // Default: 60. Max: 3600. + RedisUsageQueueRetentionSeconds int `yaml:"redis-usage-queue-retention-seconds" json:"redis-usage-queue-retention-seconds"` + // DisableCooling disables quota cooldown scheduling when true. DisableCooling bool `yaml:"disable-cooling" json:"disable-cooling"` @@ -609,6 +614,7 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { cfg.LogsMaxTotalSizeMB = 0 cfg.ErrorLogsMaxFiles = 10 cfg.UsageStatisticsEnabled = false + cfg.RedisUsageQueueRetentionSeconds = 60 cfg.DisableCooling = false cfg.DisableImageGeneration = DisableImageGenerationOff cfg.Pprof.Enable = false @@ -671,6 +677,13 @@ func LoadConfigOptional(configFile string, optional bool) (*Config, error) { cfg.ErrorLogsMaxFiles = 10 } + if cfg.RedisUsageQueueRetentionSeconds <= 0 { + cfg.RedisUsageQueueRetentionSeconds = 60 + } else if cfg.RedisUsageQueueRetentionSeconds > 3600 { + log.WithField("value", cfg.RedisUsageQueueRetentionSeconds).Warn("redis-usage-queue-retention-seconds too large; clamping to 3600") + cfg.RedisUsageQueueRetentionSeconds = 3600 + } + if cfg.MaxRetryCredentials < 0 { cfg.MaxRetryCredentials = 0 } diff --git a/internal/redisqueue/queue.go b/internal/redisqueue/queue.go index 8a4b6742f..2fea58391 100644 --- a/internal/redisqueue/queue.go +++ b/internal/redisqueue/queue.go @@ -6,7 +6,10 @@ import ( "time" ) -const retentionWindow = time.Minute +const ( + defaultRetentionSeconds int64 = 60 + maxRetentionSeconds int64 = 3600 +) type queueItem struct { enqueuedAt time.Time @@ -20,10 +23,15 @@ type queue struct { } var ( - enabled atomic.Bool - global queue + enabled atomic.Bool + retentionSeconds atomic.Int64 + global queue ) +func init() { + retentionSeconds.Store(defaultRetentionSeconds) +} + func SetEnabled(value bool) { enabled.Store(value) if !value { @@ -35,6 +43,16 @@ func Enabled() bool { return enabled.Load() } +func SetRetentionSeconds(value int) { + normalized := int64(value) + if normalized <= 0 { + normalized = defaultRetentionSeconds + } else if normalized > maxRetentionSeconds { + normalized = maxRetentionSeconds + } + retentionSeconds.Store(normalized) +} + func Enqueue(payload []byte) { if !Enabled() { return @@ -110,7 +128,11 @@ func (q *queue) pruneLocked(now time.Time) { return } - cutoff := now.Add(-retentionWindow) + windowSeconds := retentionSeconds.Load() + if windowSeconds <= 0 { + windowSeconds = defaultRetentionSeconds + } + cutoff := now.Add(-time.Duration(windowSeconds) * time.Second) for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) { q.head++ } diff --git a/internal/watcher/diff/config_diff.go b/internal/watcher/diff/config_diff.go index 2be9aa908..b414ed5ad 100644 --- a/internal/watcher/diff/config_diff.go +++ b/internal/watcher/diff/config_diff.go @@ -39,6 +39,9 @@ func BuildConfigChangeDetails(oldCfg, newCfg *config.Config) []string { if oldCfg.UsageStatisticsEnabled != newCfg.UsageStatisticsEnabled { changes = append(changes, fmt.Sprintf("usage-statistics-enabled: %t -> %t", oldCfg.UsageStatisticsEnabled, newCfg.UsageStatisticsEnabled)) } + if oldCfg.RedisUsageQueueRetentionSeconds != newCfg.RedisUsageQueueRetentionSeconds { + changes = append(changes, fmt.Sprintf("redis-usage-queue-retention-seconds: %d -> %d", oldCfg.RedisUsageQueueRetentionSeconds, newCfg.RedisUsageQueueRetentionSeconds)) + } if oldCfg.DisableCooling != newCfg.DisableCooling { changes = append(changes, fmt.Sprintf("disable-cooling: %t -> %t", oldCfg.DisableCooling, newCfg.DisableCooling)) }