From 43dba4ce07827fb6cbd7f3cf14b1953d3c1a472a Mon Sep 17 00:00:00 2001 From: 0xJacky Date: Thu, 4 Dec 2025 12:51:28 +0800 Subject: [PATCH] feat(nginx_log): enhance incremental indexing configuration and logic --- app.example.ini | 15 +- internal/cron/incremental_indexing.go | 199 ++++++++++-------- .../indexer/adaptive_optimization.go | 28 ++- .../indexer/adaptive_optimization_test.go | 29 ++- internal/nginx_log/indexer/parser.go | 18 +- internal/nginx_log/indexer/types.go | 17 +- settings/nginx_log.go | 16 +- 7 files changed, 210 insertions(+), 112 deletions(-) diff --git a/app.example.ini b/app.example.ini index 18c23d78..d33adb80 100644 --- a/app.example.ini +++ b/app.example.ini @@ -63,8 +63,17 @@ ReloadCmd = nginx -s reload RestartCmd = start-stop-daemon --start --quiet --pidfile /var/run/nginx.pid --exec /usr/sbin/nginx [nginx_log] -AdvancedIndexingEnabled = false -IndexPath = +; Enable or disable nginx access log indexing and analytics. +; When disabled, the UI will still work but log search/analytics features are turned off +; and CPU usage will be significantly lower. +IndexingEnabled = false +IndexPath = +; Interval (in minutes) for incremental indexing job. +; This controls how often nginx-ui scans access logs for new data and performs +; incremental indexing. Lower values keep analytics closer to real-time but +; increase background CPU usage. Higher values reduce CPU usage at the cost +; of more stale analytics data. +IncrementalIndexInterval = 15 [node] Name = Local @@ -77,7 +86,7 @@ BaseUrl = Token = Proxy = Model = gpt-4o -APIType = +APIType = EnableCodeCompletion = false CodeCompletionModel = gpt-4o-mini diff --git a/internal/cron/incremental_indexing.go b/internal/cron/incremental_indexing.go index d95dc11b..e8d8d7b5 100644 --- a/internal/cron/incremental_indexing.go +++ b/internal/cron/incremental_indexing.go @@ -8,6 +8,7 @@ import ( "github.com/0xJacky/Nginx-UI/internal/nginx_log" "github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer" "github.com/0xJacky/Nginx-UI/model" + "github.com/0xJacky/Nginx-UI/settings" "github.com/go-co-op/gocron/v2" "github.com/uozi-tech/cosy/logger" ) @@ -21,11 +22,15 @@ type logIndexProvider interface { func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) { logger.Info("Setting up incremental log indexing job") - // Run every 5 minutes to check for log file changes + // Determine interval from settings, falling back to a conservative default + interval := settings.NginxLogSettings.GetIncrementalIndexInterval() + + // Run periodically to check for log file changes using incremental indexing job, err := s.NewJob( - gocron.DurationJob(5*time.Minute), + gocron.DurationJob(interval), gocron.NewTask(performIncrementalIndexing), gocron.WithName("incremental_log_indexing"), + gocron.WithSingletonMode(gocron.LimitModeWait), // Prevent overlapping executions gocron.WithStartAt(gocron.WithStartImmediately()), ) @@ -33,7 +38,7 @@ func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) { return nil, err } - logger.Info("Incremental log indexing job scheduled to run every 5 minutes") + logger.Infof("Incremental log indexing job scheduled to run every %s", interval) return job, nil } @@ -73,20 +78,41 @@ func performIncrementalIndexing() { return log.Type == "access" }) + // Process files sequentially to avoid overwhelming the system + // This is more conservative but prevents concurrent file indexing from consuming too much CPU changedCount := 0 for _, log := range allLogs { // Check if file needs incremental indexing if needsIncrementalIndexing(log, persistence) { - if err := queueIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil { - logger.Errorf("Failed to queue incremental indexing for %s: %v", log.Path, err) + logger.Infof("Starting incremental indexing for file: %s", log.Path) + + // Set status to indexing + if err := setFileIndexStatus(log.Path, string(indexer.IndexStatusIndexing), logFileManager); err != nil { + logger.Errorf("Failed to set indexing status for %s: %v", log.Path, err) + continue + } + + // Perform incremental indexing synchronously (one file at a time) + if err := performSingleFileIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil { + logger.Errorf("Failed incremental indexing for %s: %v", log.Path, err) + // Set error status + if statusErr := setFileIndexStatus(log.Path, string(indexer.IndexStatusError), logFileManager); statusErr != nil { + logger.Errorf("Failed to set error status for %s: %v", log.Path, statusErr) + } } else { changedCount++ + // Set status to indexed + if err := setFileIndexStatus(log.Path, string(indexer.IndexStatusIndexed), logFileManager); err != nil { + logger.Errorf("Failed to set indexed status for %s: %v", log.Path, err) + } } } } if changedCount > 0 { - logger.Infof("Queued %d log files for incremental indexing", changedCount) + logger.Infof("Completed incremental indexing for %d log files", changedCount) + // Update searcher shards once after all files are processed + nginx_log.UpdateSearcherShards() } else { logger.Debug("No log files need incremental indexing") } @@ -114,6 +140,23 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logI fileModTime := fileInfo.ModTime() fileSize := fileInfo.Size() + // CRITICAL FIX: For large files (>100MB), add additional check to prevent excessive re-indexing + // If the file was recently indexed (within last 30 minutes), skip it even if size increased slightly + // This prevents the "infinite indexing" issue reported in #1455 + const largeFileThreshold = 100 * 1024 * 1024 // 100MB + const recentIndexThreshold = 30 * time.Minute + + if fileSize > largeFileThreshold && log.LastIndexed > 0 { + lastIndexTime := time.Unix(log.LastIndexed, 0) + timeSinceLastIndex := time.Since(lastIndexTime) + + if timeSinceLastIndex < recentIndexThreshold { + logger.Debugf("Skipping large file %s (%d bytes): recently indexed %v ago (threshold: %v)", + log.Path, fileSize, timeSinceLastIndex, recentIndexThreshold) + return false + } + } + if persistence != nil { if logIndex, err := persistence.GetLogIndex(log.Path); err == nil { if logIndex.NeedsIndexing(fileModTime, fileSize) { @@ -157,96 +200,66 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logI return false } -// queueIncrementalIndexing queues a file for incremental indexing -func queueIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error { - // Set the file status to queued - if err := setFileIndexStatus(logPath, string(indexer.IndexStatusQueued), logFileManager); err != nil { - return err - } - - // Queue the indexing job asynchronously - go func() { - defer func() { - // Ensure status is always updated, even on panic - if r := recover(); r != nil { - logger.Errorf("Recovered from panic during incremental indexing for %s: %v", logPath, r) - _ = setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager) - } - }() - - logger.Infof("Starting incremental indexing for file: %s", logPath) - - // Set status to indexing - if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexing), logFileManager); err != nil { - logger.Errorf("Failed to set indexing status for %s: %v", logPath, err) - return +// performSingleFileIncrementalIndexing performs incremental indexing for a single file synchronously +func performSingleFileIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error { + defer func() { + // Ensure status is always updated, even on panic + if r := recover(); r != nil { + logger.Errorf("Recovered from panic during incremental indexing for %s: %v", logPath, r) + _ = setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager) } - - // Perform incremental indexing - startTime := time.Now() - docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil) - - if err != nil { - logger.Errorf("Failed incremental indexing for %s: %v", logPath, err) - // Set error status - if statusErr := setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager); statusErr != nil { - logger.Errorf("Failed to set error status for %s: %v", logPath, statusErr) - } - return - } - - // Calculate total documents indexed - var totalDocsIndexed uint64 - for _, docCount := range docsCountMap { - totalDocsIndexed += docCount - } - - // Save indexing metadata - duration := time.Since(startTime) - - if lfm, ok := logFileManager.(*indexer.LogFileManager); ok { - persistence := lfm.GetPersistence() - var existingDocCount uint64 - - existingIndex, err := persistence.GetLogIndex(logPath) - if err != nil { - logger.Warnf("Could not get existing log index for %s: %v", logPath, err) - } - - // Determine if the file was rotated by checking if the current size is smaller than the last recorded size. - // This is a strong indicator of log rotation. - fileInfo, statErr := os.Stat(logPath) - isRotated := false - if statErr == nil && existingIndex != nil && fileInfo.Size() < existingIndex.LastSize { - isRotated = true - logger.Infof("Log rotation detected for %s: new size %d is smaller than last size %d. Resetting document count.", - logPath, fileInfo.Size(), existingIndex.LastSize) - } - - if existingIndex != nil && !isRotated { - // If it's a normal incremental update (not a rotation), we build upon the existing count. - existingDocCount = existingIndex.DocumentCount - } - // If the file was rotated, existingDocCount remains 0, effectively starting the count over for the new file. - - finalDocCount := existingDocCount + totalDocsIndexed - - if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil { - logger.Errorf("Failed to save incremental index metadata for %s: %v", logPath, err) - } - } - - // Set status to indexed - if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexed), logFileManager); err != nil { - logger.Errorf("Failed to set indexed status for %s: %v", logPath, err) - } - - // Update searcher shards - nginx_log.UpdateSearcherShards() - - logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed) }() + // Perform incremental indexing + startTime := time.Now() + docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil) + + if err != nil { + return fmt.Errorf("indexing failed: %w", err) + } + + // Calculate total documents indexed + var totalDocsIndexed uint64 + for _, docCount := range docsCountMap { + totalDocsIndexed += docCount + } + + // Save indexing metadata + duration := time.Since(startTime) + + if lfm, ok := logFileManager.(*indexer.LogFileManager); ok { + persistence := lfm.GetPersistence() + var existingDocCount uint64 + + existingIndex, err := persistence.GetLogIndex(logPath) + if err != nil { + logger.Warnf("Could not get existing log index for %s: %v", logPath, err) + } + + // Determine if the file was rotated by checking if the current size is smaller than the last recorded size. + // This is a strong indicator of log rotation. + fileInfo, statErr := os.Stat(logPath) + isRotated := false + if statErr == nil && existingIndex != nil && fileInfo.Size() < existingIndex.LastSize { + isRotated = true + logger.Infof("Log rotation detected for %s: new size %d is smaller than last size %d. Resetting document count.", + logPath, fileInfo.Size(), existingIndex.LastSize) + } + + if existingIndex != nil && !isRotated { + // If it's a normal incremental update (not a rotation), we build upon the existing count. + existingDocCount = existingIndex.DocumentCount + } + // If the file was rotated, existingDocCount remains 0, effectively starting the count over for the new file. + + finalDocCount := existingDocCount + totalDocsIndexed + + if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil { + return fmt.Errorf("failed to save metadata: %w", err) + } + } + + logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed) return nil } diff --git a/internal/nginx_log/indexer/adaptive_optimization.go b/internal/nginx_log/indexer/adaptive_optimization.go index 1f3fe95e..da0d1491 100644 --- a/internal/nginx_log/indexer/adaptive_optimization.go +++ b/internal/nginx_log/indexer/adaptive_optimization.go @@ -108,15 +108,31 @@ type BatchAdjustment struct { func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer { ctx, cancel := context.WithCancel(context.Background()) + // Derive worker range from the configured worker count. We deliberately + // treat the configured WorkerCount as the *maximum* concurrency the user + // (or defaults) allow, and let the optimizer scale down when CPU is + // saturated, then back up again, but never beyond this cap. + maxProcs := runtime.GOMAXPROCS(0) + initialWorkers := config.WorkerCount + if initialWorkers <= 0 { + if maxProcs > 0 { + initialWorkers = maxProcs + } else { + initialWorkers = 2 + } + } + minWorkers := max(2, initialWorkers/4) + ao := &AdaptiveOptimizer{ config: config, cpuMonitor: &CPUMonitor{ - targetUtilization: 0.75, // Target 75% CPU utilization (more conservative) - measurementInterval: 5 * time.Second, - adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive) - maxWorkers: runtime.GOMAXPROCS(0) * 6, // Allow scaling up to 6x CPU cores for I/O-bound workloads - minWorkers: max(2, runtime.GOMAXPROCS(0)/4), // Minimum 2 workers or 1/4 of cores for baseline performance - measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals + // Keep target utilization, but relax thresholds to reduce oscillation. + targetUtilization: 0.75, // Target 75% CPU utilization + measurementInterval: 5 * time.Second, // Sample every 5 seconds + adjustmentThreshold: 0.10, // Adjust if 10% deviation from target + maxWorkers: initialWorkers, // Never scale above configured WorkerCount + minWorkers: minWorkers, // Minimum 2 workers or 1/4 of configured workers + measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals maxSamples: 12, }, batchSizeController: &BatchSizeController{ diff --git a/internal/nginx_log/indexer/adaptive_optimization_test.go b/internal/nginx_log/indexer/adaptive_optimization_test.go index 02976446..baef96d2 100644 --- a/internal/nginx_log/indexer/adaptive_optimization_test.go +++ b/internal/nginx_log/indexer/adaptive_optimization_test.go @@ -83,7 +83,20 @@ func TestAdaptiveOptimizer_SetWorkerCountChangeCallback(t *testing.T) { } func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) { - ao := createTestAdaptiveOptimizer(4) + // Use higher initial worker count so there's room to increase + // Since maxWorkers is now set to config.WorkerCount, we need to start with a config + // where WorkerCount is higher than the current workers to test the increase logic + config := &Config{ + WorkerCount: 16, // Set max to 16 so we can increase from current + BatchSize: 1000, + } + + ao := NewAdaptiveOptimizer(config) + ao.SetActivityPoller(mockActivityPoller{busy: true}) + + // Manually set current workers to a lower value to allow increase + atomic.StoreInt64(&ao.workerCount, 8) + ao.config.WorkerCount = 16 // Keep max at 16 var actualOldCount, actualNewCount int var callbackCalled bool @@ -104,13 +117,17 @@ func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) { t.Error("Expected worker count change callback to be called") } - if actualOldCount != 4 { - t.Errorf("Expected old worker count 4, got %d", actualOldCount) + if actualOldCount != 8 { + t.Errorf("Expected old worker count 8, got %d", actualOldCount) } - // Should increase workers, but not more than max allowed - if actualNewCount <= 4 { - t.Errorf("Expected new worker count to be greater than 4, got %d", actualNewCount) + // Should increase workers, but not more than max allowed (16) + if actualNewCount <= 8 { + t.Errorf("Expected new worker count to be greater than 8, got %d", actualNewCount) + } + + if actualNewCount > 16 { + t.Errorf("Expected new worker count to not exceed max 16, got %d", actualNewCount) } // Verify config was updated diff --git a/internal/nginx_log/indexer/parser.go b/internal/nginx_log/indexer/parser.go index 8724f80b..d96032c3 100644 --- a/internal/nginx_log/indexer/parser.go +++ b/internal/nginx_log/indexer/parser.go @@ -5,6 +5,7 @@ import ( "compress/gzip" "context" "io" + "runtime" "strings" "sync" @@ -27,7 +28,22 @@ func InitLogParser() { config := parser.DefaultParserConfig() config.MaxLineLength = 16 * 1024 // 16KB for large log lines config.BatchSize = 15000 // Maximum batch size for highest frontend throughput - config.WorkerCount = 24 // Match CPU core count for high-throughput + + // Derive parser worker count from available CPUs, with sane limits so that + // small machines are not overwhelmed while larger hosts can still use + // parallel parsing effectively. + maxProcs := runtime.GOMAXPROCS(0) + if maxProcs <= 0 { + maxProcs = runtime.NumCPU() + } + workerCount := maxProcs + if workerCount < 4 { + workerCount = 4 + } + if workerCount > 16 { + workerCount = 16 + } + config.WorkerCount = workerCount // Note: Caching is handled by the CachedUserAgentParser // Initialize user agent parser with caching (10,000 cache size for production) diff --git a/internal/nginx_log/indexer/types.go b/internal/nginx_log/indexer/types.go index 2e38ae25..e2b69ca0 100644 --- a/internal/nginx_log/indexer/types.go +++ b/internal/nginx_log/indexer/types.go @@ -74,10 +74,23 @@ func DefaultIndexerConfig() *Config { baseBatchSize = 18000 // Standard systems (4-7 cores) - good throughput } + // Derive conservative, CPU-aware defaults to avoid oversubscribing small machines. + // Treat GOMAXPROCS as the upper bound for CPU-bound worker concurrency. + workerCount := maxProcs + if workerCount < 2 { + workerCount = 2 + } + + // Limit file-level concurrency to at most half of the logical CPUs by default. + fileGroupConcurrency := maxProcs / 2 + if fileGroupConcurrency < 2 { + fileGroupConcurrency = 2 + } + return &Config{ IndexPath: "./log-index", ShardCount: max(4, maxProcs/2), // Scale shards with CPU cores - WorkerCount: maxProcs * 3, // Optimized: 3x processors for better I/O-bound workload handling + WorkerCount: workerCount, // One worker per logical CPU by default (min 2) BatchSize: baseBatchSize, // Dynamically scaled based on CPU cores FlushInterval: 5 * time.Second, MaxQueueSize: baseBatchSize * 10, // Scale queue with batch size @@ -86,7 +99,7 @@ func DefaultIndexerConfig() *Config { MaxSegmentSize: 64 * 1024 * 1024, // 64MB OptimizeInterval: 30 * time.Minute, EnableMetrics: true, - FileGroupConcurrency: max(4, maxProcs), // Default: use CPU count for file-level parallelism + FileGroupConcurrency: fileGroupConcurrency, // Default: up to 50% of logical CPUs for file-level parallelism } } diff --git a/settings/nginx_log.go b/settings/nginx_log.go index 587411d5..fa118197 100644 --- a/settings/nginx_log.go +++ b/settings/nginx_log.go @@ -1,8 +1,22 @@ package settings +import "time" + type NginxLog struct { IndexingEnabled bool `json:"indexing_enabled"` IndexPath string `json:"index_path"` + // IncrementalIndexInterval controls how often the incremental indexing job runs, in minutes. + // When set to 0 or a negative value, a conservative default will be used. + IncrementalIndexInterval int `json:"incremental_index_interval"` } -var NginxLogSettings = &NginxLog{} \ No newline at end of file +var NginxLogSettings = &NginxLog{} + +// GetIncrementalIndexInterval returns the effective incremental indexing interval. +// Defaults to 15 minutes when not configured or configured with an invalid value. +func (n *NginxLog) GetIncrementalIndexInterval() time.Duration { + if n == nil || n.IncrementalIndexInterval <= 0 { + return 15 * time.Minute + } + return time.Duration(n.IncrementalIndexInterval) * time.Minute +}