Files
nginx-ui/internal/nginx_log/modern_services.go

699 lines
20 KiB
Go

package nginx_log
import (
"context"
"fmt"
"os"
"path/filepath"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/analytics"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/indexer"
"github.com/0xJacky/Nginx-UI/internal/nginx_log/searcher"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/blevesearch/bleve/v2"
"github.com/uozi-tech/cosy/logger"
cSettings "github.com/uozi-tech/cosy/settings"
)
// Global instances for new services
var (
globalSearcher *searcher.Searcher
globalAnalytics analytics.Service
globalIndexer *indexer.ParallelIndexer
globalLogFileManager *indexer.LogFileManager
servicesInitialized bool
servicesInitializing bool
servicesMutex sync.RWMutex
shutdownCancel context.CancelFunc
isShuttingDown bool
lastShardUpdateAttempt int64
)
// Fallback storage when IndexingEnabled is disabled
var (
fallbackCache = make(map[string]*NginxLogCache)
fallbackCacheMutex sync.RWMutex
)
// InitializeServices initializes the new modular services
func InitializeServices(ctx context.Context) {
servicesMutex.Lock()
// Check if indexing is enabled
if !settings.NginxLogSettings.IndexingEnabled {
servicesMutex.Unlock()
logger.Info("Indexing is disabled, skipping nginx_log services initialization")
return
}
if servicesInitialized {
servicesMutex.Unlock()
logger.Info("Modern nginx log services already initialized, skipping")
return
}
if servicesInitializing {
servicesMutex.Unlock()
logger.Info("Modern nginx log services are initializing, skipping duplicate request")
return
}
servicesInitializing = true
servicesMutex.Unlock()
logger.Info("Initializing modern nginx log services...")
// Create a cancellable context for services
serviceCtx, cancel := context.WithCancel(ctx)
searcherInstance, analyticsInstance, indexerInstance, logFileManagerInstance, err := initializeWithDefaults(serviceCtx)
if err != nil {
cancel()
servicesMutex.Lock()
servicesInitializing = false
servicesMutex.Unlock()
logger.Errorf("Failed to initialize modern services: %v", err)
return
}
servicesMutex.Lock()
globalSearcher = searcherInstance
globalAnalytics = analyticsInstance
globalIndexer = indexerInstance
globalLogFileManager = logFileManagerInstance
shutdownCancel = cancel
servicesInitialized = true
servicesInitializing = false
servicesMutex.Unlock()
logger.Info("Modern nginx log services initialization completed")
// Load existing shards after services are visible without blocking readers during initialization.
UpdateSearcherShards()
// Initialize task scheduler after services are ready
go InitTaskScheduler(serviceCtx)
// Monitor context for shutdown
go func() {
logger.Info("Started nginx_log shutdown monitor goroutine")
<-serviceCtx.Done()
logger.Info("Context cancelled, initiating shutdown...")
// Use the same shutdown logic as manual stop
StopServices()
logger.Info("Nginx_log shutdown monitor goroutine completed")
}()
}
// initializeWithDefaults creates services with default configuration.
func initializeWithDefaults(ctx context.Context) (*searcher.Searcher, analytics.Service, *indexer.ParallelIndexer, *indexer.LogFileManager, error) {
logger.Info("Initializing services with default configuration")
// Initialize global log parser singleton before starting indexer/searcher
indexer.InitLogParser()
// Create empty searcher (will be populated when indexes are available)
searcherConfig := searcher.DefaultSearcherConfig()
searcherInstance := searcher.NewSearcher(searcherConfig, []bleve.Index{})
// Initialize analytics with empty searcher
analyticsInstance := analytics.NewService(searcherInstance)
// Initialize parallel indexer with shard manager
indexerConfig := indexer.DefaultIndexerConfig()
// Use config directory for index path
indexerConfig.IndexPath = getConfigDirIndexPath()
shardManager := indexer.NewGroupedShardManager(indexerConfig)
indexerInstance := indexer.NewParallelIndexer(indexerConfig, shardManager)
// Start the indexer
if err := indexerInstance.Start(ctx); err != nil {
logger.Errorf("Failed to start parallel indexer: %v", err)
return nil, nil, nil, nil, fmt.Errorf("failed to start parallel indexer: %w", err)
}
// Initialize log file manager
logFileManagerInstance := indexer.NewLogFileManager()
// Inject indexer for precise doc counting before persisting
logFileManagerInstance.SetIndexer(indexerInstance)
return searcherInstance, analyticsInstance, indexerInstance, logFileManagerInstance, nil
}
// getConfigDirIndexPath returns the index path relative to the config file directory
func getConfigDirIndexPath() string {
// Use custom path if configured
if settings.NginxLogSettings.IndexPath != "" {
indexPath := settings.NginxLogSettings.IndexPath
// Ensure the directory exists
if err := os.MkdirAll(indexPath, 0755); err != nil {
logger.Warnf("Failed to create custom index directory at %s: %v, using default", indexPath, err)
} else {
logger.Infof("Using custom index path: %s", indexPath)
return indexPath
}
}
// Get the config file path from cosy settings
if cSettings.ConfPath != "" {
configDir := filepath.Dir(cSettings.ConfPath)
indexPath := filepath.Join(configDir, "log-index")
// Ensure the directory exists
if err := os.MkdirAll(indexPath, 0755); err != nil {
logger.Warnf("Failed to create index directory at %s: %v, using default", indexPath, err)
return "./log-index"
}
return indexPath
}
// Fallback to default relative path
logger.Warn("Config file path not available, using default index path")
return "./log-index"
}
// GetSearcher returns the global searcher instance
func GetSearcher() *searcher.Searcher {
servicesMutex.RLock()
defer servicesMutex.RUnlock()
if !servicesInitialized {
logger.Warn("Modern services not initialized, returning nil")
return nil
}
if globalSearcher == nil {
logger.Warn("GetSearcher: globalSearcher is nil even though services are initialized")
return nil
}
// Check searcher health status
isHealthy := globalSearcher.IsHealthy()
isRunning := globalSearcher.IsRunning()
logger.Debugf("GetSearcher: returning searcher, isHealthy: %v, isRunning: %v", isHealthy, isRunning)
// Auto-heal: if the searcher is running but unhealthy (likely zero shards),
// and the indexer is initialized, trigger an async shard swap (throttled).
if !isHealthy && isRunning && globalIndexer != nil {
now := time.Now().UnixNano()
prev := atomic.LoadInt64(&lastShardUpdateAttempt)
if now-prev > int64(5*time.Second) {
if atomic.CompareAndSwapInt64(&lastShardUpdateAttempt, prev, now) {
logger.Debugf("GetSearcher: unhealthy detected, scheduling UpdateSearcherShards()")
go UpdateSearcherShards()
}
}
}
return globalSearcher
}
// GetAnalytics returns the global analytics service instance
func GetAnalytics() analytics.Service {
servicesMutex.RLock()
defer servicesMutex.RUnlock()
if !servicesInitialized {
logger.Warn("Modern services not initialized, returning nil")
return nil
}
return globalAnalytics
}
// GetIndexer returns the global indexer instance
func GetIndexer() *indexer.ParallelIndexer {
servicesMutex.RLock()
defer servicesMutex.RUnlock()
if !servicesInitialized {
return nil
}
return globalIndexer
}
// GetLogFileManager returns the global log file manager instance
func GetLogFileManager() *indexer.LogFileManager {
servicesMutex.RLock()
defer servicesMutex.RUnlock()
if !servicesInitialized {
// Only warn during actual operations, not during initialization
return nil
}
if globalLogFileManager == nil {
logger.Warnf("[nginx_log] GetLogFileManager: globalLogFileManager is nil even though servicesInitialized=true")
return nil
}
return globalLogFileManager
}
// NginxLogCache Type aliases for backward compatibility
type NginxLogCache = indexer.NginxLogCache
type NginxLogWithIndex = indexer.NginxLogWithIndex
// Constants for backward compatibility
const (
IndexStatusIndexed = string(indexer.IndexStatusIndexed)
IndexStatusIndexing = string(indexer.IndexStatusIndexing)
IndexStatusNotIndexed = string(indexer.IndexStatusNotIndexed)
)
// Legacy compatibility functions for log cache system
// AddLogPath adds a log path to the log cache with the source config file
func AddLogPath(path, logType, name, configFile string) {
if manager := GetLogFileManager(); manager != nil {
manager.AddLogPath(path, logType, name, configFile)
return
}
// Fallback storage
fallbackCacheMutex.Lock()
defer fallbackCacheMutex.Unlock()
fallbackCache[path] = &NginxLogCache{
Path: path,
Type: logType,
Name: name,
ConfigFile: configFile,
}
}
// RemoveLogPathsFromConfig removes all log paths associated with a specific config file
func RemoveLogPathsFromConfig(configFile string) {
if manager := GetLogFileManager(); manager != nil {
manager.RemoveLogPathsFromConfig(configFile)
return
}
// Fallback removal
fallbackCacheMutex.Lock()
defer fallbackCacheMutex.Unlock()
for p, entry := range fallbackCache {
if entry.ConfigFile == configFile {
delete(fallbackCache, p)
}
}
}
// GetAllLogPaths returns all cached log paths, optionally filtered
func GetAllLogPaths(filters ...func(*NginxLogCache) bool) []*NginxLogCache {
if manager := GetLogFileManager(); manager != nil {
return manager.GetAllLogPaths(filters...)
}
// Fallback list
fallbackCacheMutex.RLock()
defer fallbackCacheMutex.RUnlock()
var logs []*NginxLogCache
for _, entry := range fallbackCache {
include := true
for _, f := range filters {
if !f(entry) {
include = false
break
}
}
if include {
// Create a copy to avoid external mutation
e := *entry
logs = append(logs, &e)
}
}
return logs
}
// GetAllLogsWithIndex returns all cached log paths with their index status
func GetAllLogsWithIndex(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
if manager := GetLogFileManager(); manager != nil {
return manager.GetAllLogsWithIndex(filters...)
}
// Fallback: produce basic entries without indexing metadata
fallbackCacheMutex.RLock()
defer fallbackCacheMutex.RUnlock()
result := make([]*NginxLogWithIndex, 0, len(fallbackCache))
for _, c := range fallbackCache {
lw := &NginxLogWithIndex{
Path: c.Path,
Type: c.Type,
Name: c.Name,
ConfigFile: c.ConfigFile,
IndexStatus: IndexStatusNotIndexed,
}
include := true
for _, f := range filters {
if !f(lw) {
include = false
break
}
}
if include {
result = append(result, lw)
}
}
return result
}
// GetAllLogsWithIndexGrouped returns logs grouped by their base name
func GetAllLogsWithIndexGrouped(filters ...func(*NginxLogWithIndex) bool) []*NginxLogWithIndex {
if manager := GetLogFileManager(); manager != nil {
return manager.GetAllLogsWithIndexGrouped(filters...)
}
// Fallback grouping by base log name (handle simple rotation patterns)
fallbackCacheMutex.RLock()
defer fallbackCacheMutex.RUnlock()
grouped := make(map[string]*NginxLogWithIndex)
for _, c := range fallbackCache {
base := getBaseLogNameBasic(c.Path)
if existing, ok := grouped[base]; ok {
// Preserve most recent non-indexed default; nothing to aggregate in basic mode
_ = existing
continue
}
grouped[base] = &NginxLogWithIndex{
Path: base,
Type: c.Type,
Name: filepath.Base(base),
ConfigFile: c.ConfigFile,
IndexStatus: IndexStatusNotIndexed,
}
}
// Build slice and apply filters
keys := make([]string, 0, len(grouped))
for k := range grouped {
keys = append(keys, k)
}
sort.Strings(keys)
result := make([]*NginxLogWithIndex, 0, len(keys))
for _, k := range keys {
v := grouped[k]
include := true
for _, f := range filters {
if !f(v) {
include = false
break
}
}
if include {
result = append(result, v)
}
}
return result
}
// --- Fallback helpers ---
// getBaseLogNameBasic attempts to derive the base log file for a rotated file name.
// Mirrors the logic used by the indexer, simplified for basic mode.
func getBaseLogNameBasic(filePath string) string {
dir := filepath.Dir(filePath)
filename := filepath.Base(filePath)
// Remove compression extensions
for _, ext := range []string{".gz", ".bz2", ".xz", ".lz4"} {
filename = strings.TrimSuffix(filename, ext)
}
// Check YYYY.MM.DD at end
parts := strings.Split(filename, ".")
if len(parts) >= 4 {
lastThree := strings.Join(parts[len(parts)-3:], ".")
if matched, _ := regexp.MatchString(`^\d{4}\.\d{2}\.\d{2}$`, lastThree); matched {
base := strings.Join(parts[:len(parts)-3], ".")
return filepath.Join(dir, base)
}
}
// Single-part date suffix (YYYYMMDD / YYYY-MM-DD / YYMMDD)
if len(parts) >= 2 {
last := parts[len(parts)-1]
if isFullDatePatternBasic(last) {
base := strings.Join(parts[:len(parts)-1], ".")
return filepath.Join(dir, base)
}
}
// Numbered rotation: access.log.1
if m := regexp.MustCompile(`^(.+)\.(\d{1,3})$`).FindStringSubmatch(filename); len(m) > 1 {
base := m[1]
return filepath.Join(dir, base)
}
// Middle-numbered rotation: access.1.log
if m := regexp.MustCompile(`^(.+)\.(\d{1,3})\.log$`).FindStringSubmatch(filename); len(m) > 1 {
base := m[1] + ".log"
return filepath.Join(dir, base)
}
// Fallback: return original path
return filePath
}
func isFullDatePatternBasic(s string) bool {
patterns := []string{
`^\d{8}$`, // YYYYMMDD
`^\d{4}-\d{2}-\d{2}$`, // YYYY-MM-DD
`^\d{6}$`, // YYMMDD
}
for _, p := range patterns {
if matched, _ := regexp.MatchString(p, s); matched {
return true
}
}
return false
}
// SetIndexingStatus sets the indexing status for a specific file path
func SetIndexingStatus(path string, isIndexing bool) {
if manager := GetLogFileManager(); manager != nil {
manager.SetIndexingStatus(path, isIndexing)
}
}
// GetIndexingFiles returns a list of files currently being indexed
func GetIndexingFiles() []string {
if manager := GetLogFileManager(); manager != nil {
return manager.GetIndexingFiles()
}
return []string{}
}
// UpdateSearcherShards fetches all shards from the indexer and performs zero-downtime shard updates.
// Uses Bleve IndexAlias.Swap() for atomic shard replacement without recreating the searcher.
// This function is safe for concurrent use and maintains service availability during index rebuilds.
func UpdateSearcherShards() {
// Schedule async update to avoid blocking indexing operations
logger.Debugf("UpdateSearcherShards: Scheduling async shard update")
go updateSearcherShardsAsync()
}
// updateSearcherShardsAsync performs the actual shard update asynchronously
func updateSearcherShardsAsync() {
// Small delay to let indexing operations complete
time.Sleep(500 * time.Millisecond)
logger.Debugf("updateSearcherShardsAsync: Attempting to acquire write lock...")
servicesMutex.Lock()
logger.Debugf("updateSearcherShardsAsync: Write lock acquired")
defer func() {
logger.Debugf("updateSearcherShardsAsync: Releasing write lock...")
servicesMutex.Unlock()
}()
updateSearcherShardsLocked()
}
// updateSearcherShardsLocked performs the actual update logic assumes the caller holds the lock.
// Uses Bleve IndexAlias.Swap() for zero-downtime shard updates following official best practices.
func updateSearcherShardsLocked() {
if !servicesInitialized || globalIndexer == nil {
logger.Warn("Cannot update searcher shards, services not fully initialized.")
return
}
// Check if indexer is healthy before getting shards
if !globalIndexer.IsHealthy() {
logger.Warn("Cannot update searcher shards, indexer is not healthy")
return
}
newShards := globalIndexer.GetAllShards()
logger.Infof("Retrieved %d new shards from indexer for hot-swap update", len(newShards))
// If no searcher exists yet, create the initial one (first time setup)
if globalSearcher == nil {
logger.Info("Creating initial searcher with IndexAlias")
searcherConfig := searcher.DefaultSearcherConfig()
globalSearcher = searcher.NewSearcher(searcherConfig, newShards)
if globalSearcher == nil {
logger.Error("Failed to create initial searcher instance")
return
}
// Create analytics service with the initial searcher
globalAnalytics = analytics.NewService(globalSearcher)
isHealthy := globalSearcher.IsHealthy()
isRunning := globalSearcher.IsRunning()
logger.Infof("Initial searcher created successfully, isHealthy: %v, isRunning: %v", isHealthy, isRunning)
return
}
// For subsequent updates, use hot-swap through IndexAlias
// This follows Bleve best practices for zero-downtime index updates
if globalSearcher != nil {
ds := globalSearcher
oldShards := ds.GetShards()
logger.Debugf("updateSearcherShardsLocked: About to call SwapShards...")
// Perform atomic shard swap using IndexAlias
if err := ds.SwapShards(newShards); err != nil {
logger.Errorf("Failed to swap shards atomically: %v", err)
return
}
logger.Debugf("updateSearcherShardsLocked: SwapShards completed successfully")
logger.Infof("Successfully swapped %d old shards with %d new shards using IndexAlias",
len(oldShards), len(newShards))
// Verify searcher health after swap
isHealthy := globalSearcher.IsHealthy()
isRunning := globalSearcher.IsRunning()
logger.Infof("Post-swap searcher status: isHealthy: %v, isRunning: %v", isHealthy, isRunning)
// Note: We do NOT recreate the analytics service here since the searcher interface remains the same
// The Counter will automatically use the new shards through the same IndexAlias
} else {
logger.Warn("globalSearcher is not a Searcher, cannot perform hot-swap")
}
}
// StopServices stops all running modern services
func StopServices() {
servicesMutex.Lock()
defer servicesMutex.Unlock()
if !servicesInitialized {
logger.Debug("Modern nginx log services not initialized, nothing to stop")
return
}
if isShuttingDown {
logger.Debug("Modern nginx log services already shutting down")
return
}
logger.Debug("Stopping modern nginx log services...")
isShuttingDown = true
// Cancel the service context to trigger graceful shutdown
if shutdownCancel != nil {
shutdownCancel()
// Wait a bit for graceful shutdown
time.Sleep(500 * time.Millisecond)
}
// Stop all services
if globalIndexer != nil {
if err := globalIndexer.Stop(); err != nil {
logger.Errorf("Failed to stop indexer: %v", err)
}
globalIndexer = nil
}
if globalAnalytics != nil {
if err := globalAnalytics.Stop(); err != nil {
logger.Errorf("Failed to stop analytics service: %v", err)
}
globalAnalytics = nil
}
if globalSearcher != nil {
if err := globalSearcher.Stop(); err != nil {
logger.Errorf("Failed to stop searcher: %v", err)
}
globalSearcher = nil
}
// Reset state
globalLogFileManager = nil
servicesInitialized = false
shutdownCancel = nil
isShuttingDown = false
logger.Debug("Modern nginx log services stopped")
}
// DestroyAllIndexes completely removes all indexed data from disk.
func DestroyAllIndexes(ctx context.Context) error {
servicesMutex.RLock()
defer servicesMutex.RUnlock()
if !servicesInitialized || globalIndexer == nil {
logger.Debug("Cannot destroy indexes, services not initialized.")
return fmt.Errorf("services not initialized")
}
return globalIndexer.DestroyAllIndexes(ctx)
}
// MigrateFallbackCache migrates all entries from fallback cache to LogFileManager
// This is used when enabling advanced indexing after the application has started
func MigrateFallbackCache() {
fallbackCacheMutex.RLock()
entries := make([]*NginxLogCache, 0, len(fallbackCache))
for _, entry := range fallbackCache {
// Create a copy to avoid race conditions
e := *entry
entries = append(entries, &e)
}
fallbackCacheMutex.RUnlock()
if len(entries) == 0 {
logger.Debug("No fallback cache entries to migrate")
return
}
manager := GetLogFileManager()
if manager == nil {
logger.Warn("Cannot migrate fallback cache: LogFileManager not initialized")
return
}
logger.Infof("Migrating %d log paths from fallback cache to LogFileManager", len(entries))
for _, entry := range entries {
manager.AddLogPath(entry.Path, entry.Type, entry.Name, entry.ConfigFile)
}
// Clear fallback cache after successful migration
fallbackCacheMutex.Lock()
fallbackCache = make(map[string]*NginxLogCache)
fallbackCacheMutex.Unlock()
logger.Info("Fallback cache migration completed successfully")
}