feat(nginx_log): enhance incremental indexing configuration and logic

This commit is contained in:
0xJacky
2025-12-04 12:51:28 +08:00
parent ba5ea3d1aa
commit 43dba4ce07
7 changed files with 210 additions and 112 deletions

View File

@@ -63,8 +63,17 @@ ReloadCmd = nginx -s reload
RestartCmd = start-stop-daemon --start --quiet --pidfile /var/run/nginx.pid --exec /usr/sbin/nginx
[nginx_log]
AdvancedIndexingEnabled = false
IndexPath =
; Enable or disable nginx access log indexing and analytics.
; When disabled, the UI will still work but log search/analytics features are turned off
; and CPU usage will be significantly lower.
IndexingEnabled = false
IndexPath =
; Interval (in minutes) for incremental indexing job.
; This controls how often nginx-ui scans access logs for new data and performs
; incremental indexing. Lower values keep analytics closer to real-time but
; increase background CPU usage. Higher values reduce CPU usage at the cost
; of more stale analytics data.
IncrementalIndexInterval = 15
[node]
Name = Local
@@ -77,7 +86,7 @@ BaseUrl =
Token =
Proxy =
Model = gpt-4o
APIType =
APIType =
EnableCodeCompletion = false
CodeCompletionModel = gpt-4o-mini

View File

@@ -8,6 +8,7 @@ import (
"github.com/0xJacky/Nginx-UI/internal/nginx_log"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/go-co-op/gocron/v2"
"github.com/uozi-tech/cosy/logger"
)
@@ -21,11 +22,15 @@ type logIndexProvider interface {
func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) {
logger.Info("Setting up incremental log indexing job")
// Run every 5 minutes to check for log file changes
// Determine interval from settings, falling back to a conservative default
interval := settings.NginxLogSettings.GetIncrementalIndexInterval()
// Run periodically to check for log file changes using incremental indexing
job, err := s.NewJob(
gocron.DurationJob(5*time.Minute),
gocron.DurationJob(interval),
gocron.NewTask(performIncrementalIndexing),
gocron.WithName("incremental_log_indexing"),
gocron.WithSingletonMode(gocron.LimitModeWait), // Prevent overlapping executions
gocron.WithStartAt(gocron.WithStartImmediately()),
)
@@ -33,7 +38,7 @@ func setupIncrementalIndexingJob(s gocron.Scheduler) (gocron.Job, error) {
return nil, err
}
logger.Info("Incremental log indexing job scheduled to run every 5 minutes")
logger.Infof("Incremental log indexing job scheduled to run every %s", interval)
return job, nil
}
@@ -73,20 +78,41 @@ func performIncrementalIndexing() {
return log.Type == "access"
})
// Process files sequentially to avoid overwhelming the system
// This is more conservative but prevents concurrent file indexing from consuming too much CPU
changedCount := 0
for _, log := range allLogs {
// Check if file needs incremental indexing
if needsIncrementalIndexing(log, persistence) {
if err := queueIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil {
logger.Errorf("Failed to queue incremental indexing for %s: %v", log.Path, err)
logger.Infof("Starting incremental indexing for file: %s", log.Path)
// Set status to indexing
if err := setFileIndexStatus(log.Path, string(indexer.IndexStatusIndexing), logFileManager); err != nil {
logger.Errorf("Failed to set indexing status for %s: %v", log.Path, err)
continue
}
// Perform incremental indexing synchronously (one file at a time)
if err := performSingleFileIncrementalIndexing(log.Path, modernIndexer, logFileManager); err != nil {
logger.Errorf("Failed incremental indexing for %s: %v", log.Path, err)
// Set error status
if statusErr := setFileIndexStatus(log.Path, string(indexer.IndexStatusError), logFileManager); statusErr != nil {
logger.Errorf("Failed to set error status for %s: %v", log.Path, statusErr)
}
} else {
changedCount++
// Set status to indexed
if err := setFileIndexStatus(log.Path, string(indexer.IndexStatusIndexed), logFileManager); err != nil {
logger.Errorf("Failed to set indexed status for %s: %v", log.Path, err)
}
}
}
}
if changedCount > 0 {
logger.Infof("Queued %d log files for incremental indexing", changedCount)
logger.Infof("Completed incremental indexing for %d log files", changedCount)
// Update searcher shards once after all files are processed
nginx_log.UpdateSearcherShards()
} else {
logger.Debug("No log files need incremental indexing")
}
@@ -114,6 +140,23 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logI
fileModTime := fileInfo.ModTime()
fileSize := fileInfo.Size()
// CRITICAL FIX: For large files (>100MB), add additional check to prevent excessive re-indexing
// If the file was recently indexed (within last 30 minutes), skip it even if size increased slightly
// This prevents the "infinite indexing" issue reported in #1455
const largeFileThreshold = 100 * 1024 * 1024 // 100MB
const recentIndexThreshold = 30 * time.Minute
if fileSize > largeFileThreshold && log.LastIndexed > 0 {
lastIndexTime := time.Unix(log.LastIndexed, 0)
timeSinceLastIndex := time.Since(lastIndexTime)
if timeSinceLastIndex < recentIndexThreshold {
logger.Debugf("Skipping large file %s (%d bytes): recently indexed %v ago (threshold: %v)",
log.Path, fileSize, timeSinceLastIndex, recentIndexThreshold)
return false
}
}
if persistence != nil {
if logIndex, err := persistence.GetLogIndex(log.Path); err == nil {
if logIndex.NeedsIndexing(fileModTime, fileSize) {
@@ -157,96 +200,66 @@ func needsIncrementalIndexing(log *nginx_log.NginxLogWithIndex, persistence logI
return false
}
// queueIncrementalIndexing queues a file for incremental indexing
func queueIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error {
// Set the file status to queued
if err := setFileIndexStatus(logPath, string(indexer.IndexStatusQueued), logFileManager); err != nil {
return err
}
// Queue the indexing job asynchronously
go func() {
defer func() {
// Ensure status is always updated, even on panic
if r := recover(); r != nil {
logger.Errorf("Recovered from panic during incremental indexing for %s: %v", logPath, r)
_ = setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager)
}
}()
logger.Infof("Starting incremental indexing for file: %s", logPath)
// Set status to indexing
if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexing), logFileManager); err != nil {
logger.Errorf("Failed to set indexing status for %s: %v", logPath, err)
return
// performSingleFileIncrementalIndexing performs incremental indexing for a single file synchronously
func performSingleFileIncrementalIndexing(logPath string, modernIndexer interface{}, logFileManager interface{}) error {
defer func() {
// Ensure status is always updated, even on panic
if r := recover(); r != nil {
logger.Errorf("Recovered from panic during incremental indexing for %s: %v", logPath, r)
_ = setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager)
}
// Perform incremental indexing
startTime := time.Now()
docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil)
if err != nil {
logger.Errorf("Failed incremental indexing for %s: %v", logPath, err)
// Set error status
if statusErr := setFileIndexStatus(logPath, string(indexer.IndexStatusError), logFileManager); statusErr != nil {
logger.Errorf("Failed to set error status for %s: %v", logPath, statusErr)
}
return
}
// Calculate total documents indexed
var totalDocsIndexed uint64
for _, docCount := range docsCountMap {
totalDocsIndexed += docCount
}
// Save indexing metadata
duration := time.Since(startTime)
if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
persistence := lfm.GetPersistence()
var existingDocCount uint64
existingIndex, err := persistence.GetLogIndex(logPath)
if err != nil {
logger.Warnf("Could not get existing log index for %s: %v", logPath, err)
}
// Determine if the file was rotated by checking if the current size is smaller than the last recorded size.
// This is a strong indicator of log rotation.
fileInfo, statErr := os.Stat(logPath)
isRotated := false
if statErr == nil && existingIndex != nil && fileInfo.Size() < existingIndex.LastSize {
isRotated = true
logger.Infof("Log rotation detected for %s: new size %d is smaller than last size %d. Resetting document count.",
logPath, fileInfo.Size(), existingIndex.LastSize)
}
if existingIndex != nil && !isRotated {
// If it's a normal incremental update (not a rotation), we build upon the existing count.
existingDocCount = existingIndex.DocumentCount
}
// If the file was rotated, existingDocCount remains 0, effectively starting the count over for the new file.
finalDocCount := existingDocCount + totalDocsIndexed
if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil {
logger.Errorf("Failed to save incremental index metadata for %s: %v", logPath, err)
}
}
// Set status to indexed
if err := setFileIndexStatus(logPath, string(indexer.IndexStatusIndexed), logFileManager); err != nil {
logger.Errorf("Failed to set indexed status for %s: %v", logPath, err)
}
// Update searcher shards
nginx_log.UpdateSearcherShards()
logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed)
}()
// Perform incremental indexing
startTime := time.Now()
docsCountMap, minTime, maxTime, err := modernIndexer.(*indexer.ParallelIndexer).IndexSingleFileIncrementally(logPath, nil)
if err != nil {
return fmt.Errorf("indexing failed: %w", err)
}
// Calculate total documents indexed
var totalDocsIndexed uint64
for _, docCount := range docsCountMap {
totalDocsIndexed += docCount
}
// Save indexing metadata
duration := time.Since(startTime)
if lfm, ok := logFileManager.(*indexer.LogFileManager); ok {
persistence := lfm.GetPersistence()
var existingDocCount uint64
existingIndex, err := persistence.GetLogIndex(logPath)
if err != nil {
logger.Warnf("Could not get existing log index for %s: %v", logPath, err)
}
// Determine if the file was rotated by checking if the current size is smaller than the last recorded size.
// This is a strong indicator of log rotation.
fileInfo, statErr := os.Stat(logPath)
isRotated := false
if statErr == nil && existingIndex != nil && fileInfo.Size() < existingIndex.LastSize {
isRotated = true
logger.Infof("Log rotation detected for %s: new size %d is smaller than last size %d. Resetting document count.",
logPath, fileInfo.Size(), existingIndex.LastSize)
}
if existingIndex != nil && !isRotated {
// If it's a normal incremental update (not a rotation), we build upon the existing count.
existingDocCount = existingIndex.DocumentCount
}
// If the file was rotated, existingDocCount remains 0, effectively starting the count over for the new file.
finalDocCount := existingDocCount + totalDocsIndexed
if err := lfm.SaveIndexMetadata(logPath, finalDocCount, startTime, duration, minTime, maxTime); err != nil {
return fmt.Errorf("failed to save metadata: %w", err)
}
}
logger.Infof("Successfully completed incremental indexing for %s, Documents: %d", logPath, totalDocsIndexed)
return nil
}

View File

@@ -108,15 +108,31 @@ type BatchAdjustment struct {
func NewAdaptiveOptimizer(config *Config) *AdaptiveOptimizer {
ctx, cancel := context.WithCancel(context.Background())
// Derive worker range from the configured worker count. We deliberately
// treat the configured WorkerCount as the *maximum* concurrency the user
// (or defaults) allow, and let the optimizer scale down when CPU is
// saturated, then back up again, but never beyond this cap.
maxProcs := runtime.GOMAXPROCS(0)
initialWorkers := config.WorkerCount
if initialWorkers <= 0 {
if maxProcs > 0 {
initialWorkers = maxProcs
} else {
initialWorkers = 2
}
}
minWorkers := max(2, initialWorkers/4)
ao := &AdaptiveOptimizer{
config: config,
cpuMonitor: &CPUMonitor{
targetUtilization: 0.75, // Target 75% CPU utilization (more conservative)
measurementInterval: 5 * time.Second,
adjustmentThreshold: 0.10, // Adjust if 10% deviation from target (more sensitive)
maxWorkers: runtime.GOMAXPROCS(0) * 6, // Allow scaling up to 6x CPU cores for I/O-bound workloads
minWorkers: max(2, runtime.GOMAXPROCS(0)/4), // Minimum 2 workers or 1/4 of cores for baseline performance
measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
// Keep target utilization, but relax thresholds to reduce oscillation.
targetUtilization: 0.75, // Target 75% CPU utilization
measurementInterval: 5 * time.Second, // Sample every 5 seconds
adjustmentThreshold: 0.10, // Adjust if 10% deviation from target
maxWorkers: initialWorkers, // Never scale above configured WorkerCount
minWorkers: minWorkers, // Minimum 2 workers or 1/4 of configured workers
measurements: make([]float64, 0, 12), // 1 minute history at 5s intervals
maxSamples: 12,
},
batchSizeController: &BatchSizeController{

View File

@@ -83,7 +83,20 @@ func TestAdaptiveOptimizer_SetWorkerCountChangeCallback(t *testing.T) {
}
func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) {
ao := createTestAdaptiveOptimizer(4)
// Use higher initial worker count so there's room to increase
// Since maxWorkers is now set to config.WorkerCount, we need to start with a config
// where WorkerCount is higher than the current workers to test the increase logic
config := &Config{
WorkerCount: 16, // Set max to 16 so we can increase from current
BatchSize: 1000,
}
ao := NewAdaptiveOptimizer(config)
ao.SetActivityPoller(mockActivityPoller{busy: true})
// Manually set current workers to a lower value to allow increase
atomic.StoreInt64(&ao.workerCount, 8)
ao.config.WorkerCount = 16 // Keep max at 16
var actualOldCount, actualNewCount int
var callbackCalled bool
@@ -104,13 +117,17 @@ func TestAdaptiveOptimizer_suggestWorkerIncrease(t *testing.T) {
t.Error("Expected worker count change callback to be called")
}
if actualOldCount != 4 {
t.Errorf("Expected old worker count 4, got %d", actualOldCount)
if actualOldCount != 8 {
t.Errorf("Expected old worker count 8, got %d", actualOldCount)
}
// Should increase workers, but not more than max allowed
if actualNewCount <= 4 {
t.Errorf("Expected new worker count to be greater than 4, got %d", actualNewCount)
// Should increase workers, but not more than max allowed (16)
if actualNewCount <= 8 {
t.Errorf("Expected new worker count to be greater than 8, got %d", actualNewCount)
}
if actualNewCount > 16 {
t.Errorf("Expected new worker count to not exceed max 16, got %d", actualNewCount)
}
// Verify config was updated

View File

@@ -5,6 +5,7 @@ import (
"compress/gzip"
"context"
"io"
"runtime"
"strings"
"sync"
@@ -27,7 +28,22 @@ func InitLogParser() {
config := parser.DefaultParserConfig()
config.MaxLineLength = 16 * 1024 // 16KB for large log lines
config.BatchSize = 15000 // Maximum batch size for highest frontend throughput
config.WorkerCount = 24 // Match CPU core count for high-throughput
// Derive parser worker count from available CPUs, with sane limits so that
// small machines are not overwhelmed while larger hosts can still use
// parallel parsing effectively.
maxProcs := runtime.GOMAXPROCS(0)
if maxProcs <= 0 {
maxProcs = runtime.NumCPU()
}
workerCount := maxProcs
if workerCount < 4 {
workerCount = 4
}
if workerCount > 16 {
workerCount = 16
}
config.WorkerCount = workerCount
// Note: Caching is handled by the CachedUserAgentParser
// Initialize user agent parser with caching (10,000 cache size for production)

View File

@@ -74,10 +74,23 @@ func DefaultIndexerConfig() *Config {
baseBatchSize = 18000 // Standard systems (4-7 cores) - good throughput
}
// Derive conservative, CPU-aware defaults to avoid oversubscribing small machines.
// Treat GOMAXPROCS as the upper bound for CPU-bound worker concurrency.
workerCount := maxProcs
if workerCount < 2 {
workerCount = 2
}
// Limit file-level concurrency to at most half of the logical CPUs by default.
fileGroupConcurrency := maxProcs / 2
if fileGroupConcurrency < 2 {
fileGroupConcurrency = 2
}
return &Config{
IndexPath: "./log-index",
ShardCount: max(4, maxProcs/2), // Scale shards with CPU cores
WorkerCount: maxProcs * 3, // Optimized: 3x processors for better I/O-bound workload handling
WorkerCount: workerCount, // One worker per logical CPU by default (min 2)
BatchSize: baseBatchSize, // Dynamically scaled based on CPU cores
FlushInterval: 5 * time.Second,
MaxQueueSize: baseBatchSize * 10, // Scale queue with batch size
@@ -86,7 +99,7 @@ func DefaultIndexerConfig() *Config {
MaxSegmentSize: 64 * 1024 * 1024, // 64MB
OptimizeInterval: 30 * time.Minute,
EnableMetrics: true,
FileGroupConcurrency: max(4, maxProcs), // Default: use CPU count for file-level parallelism
FileGroupConcurrency: fileGroupConcurrency, // Default: up to 50% of logical CPUs for file-level parallelism
}
}

View File

@@ -1,8 +1,22 @@
package settings
import "time"
type NginxLog struct {
IndexingEnabled bool `json:"indexing_enabled"`
IndexPath string `json:"index_path"`
// IncrementalIndexInterval controls how often the incremental indexing job runs, in minutes.
// When set to 0 or a negative value, a conservative default will be used.
IncrementalIndexInterval int `json:"incremental_index_interval"`
}
var NginxLogSettings = &NginxLog{}
var NginxLogSettings = &NginxLog{}
// GetIncrementalIndexInterval returns the effective incremental indexing interval.
// Defaults to 15 minutes when not configured or configured with an invalid value.
func (n *NginxLog) GetIncrementalIndexInterval() time.Duration {
if n == nil || n.IncrementalIndexInterval <= 0 {
return 15 * time.Minute
}
return time.Duration(n.IncrementalIndexInterval) * time.Minute
}