refactor: cache index

This commit is contained in:
Jacky 2025-04-04 02:00:18 +00:00
parent 5d8d96fd4f
commit 269397e114
No known key found for this signature in database
GPG key ID: 215C21B10DF38B4D
20 changed files with 532 additions and 364 deletions

View file

@ -21,8 +21,8 @@ func Init() {
logger.Fatal("initializing local cache err", err)
}
// Initialize the nginx log scanner
InitNginxLogScanner()
// Initialize the config scanner
InitScanner()
}
func Set(key string, value interface{}, ttl time.Duration) {

View file

@ -8,76 +8,82 @@ import (
"sync"
"time"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/fsnotify/fsnotify"
"github.com/uozi-tech/cosy/logger"
)
// NginxLogCache represents a cached log entry from nginx configuration
type NginxLogCache struct {
Path string `json:"path"` // Path to the log file
Type string `json:"type"` // Type of log: "access" or "error"
Name string `json:"name"` // Name of the log file
// ScanCallback is a function that gets called during config scanning
// It receives the config file path and contents
type ScanCallback func(configPath string, content []byte) error
// Scanner is responsible for scanning and watching nginx config files
type Scanner struct {
watcher *fsnotify.Watcher // File system watcher
scanTicker *time.Ticker // Ticker for periodic scanning
initialized bool // Whether the scanner has been initialized
scanning bool // Whether a scan is currently in progress
scanMutex sync.RWMutex // Mutex for protecting the scanning state
statusChan chan bool // Channel to broadcast scanning status changes
subscribers map[chan bool]struct{} // Set of subscribers
subscriberMux sync.RWMutex // Mutex for protecting the subscribers map
}
// NginxLogScanner is responsible for scanning and watching nginx config files for log directives
type NginxLogScanner struct {
logCache map[string]*NginxLogCache // Map of log path to cache entry
cacheMutex sync.RWMutex // Mutex for protecting the cache
watcher *fsnotify.Watcher // File system watcher
scanTicker *time.Ticker // Ticker for periodic scanning
initialized bool // Whether the scanner has been initialized
scanning bool // Whether a scan is currently in progress
scanMutex sync.RWMutex // Mutex for protecting the scanning state
statusChan chan bool // Channel to broadcast scanning status changes
subscribers map[chan bool]struct{} // Set of subscribers
subscriberMux sync.RWMutex // Mutex for protecting the subscribers map
// Global variables
var (
// scanner is the singleton instance of Scanner
scanner *Scanner
configScannerInitMux sync.Mutex
// This regex matches: include directives in nginx config files
includeRegex = regexp.MustCompile(`include\s+([^;]+);`)
// Global callbacks that will be executed during config file scanning
scanCallbacks []ScanCallback
scanCallbacksMutex sync.RWMutex
)
func init() {
// Initialize the callbacks slice
scanCallbacks = make([]ScanCallback, 0)
}
// Add regex constants at package level
var (
// logScanner is the singleton instance of NginxLogScanner
logScanner *NginxLogScanner
scannerInitMux sync.Mutex
)
// Compile the regular expressions for matching log directives
var (
// This regex matches: access_log or error_log, followed by a path, and optional parameters ending with semicolon
logDirectiveRegex = regexp.MustCompile(`(?m)(access_log|error_log)\s+([^\s;]+)(?:\s+[^;]+)?;`)
)
// InitNginxLogScanner initializes the nginx log scanner
func InitNginxLogScanner() {
scanner := GetNginxLogScanner()
err := scanner.Initialize()
// InitScanner initializes the config scanner
func InitScanner() {
s := GetScanner()
err := s.Initialize()
if err != nil {
logger.Error("Failed to initialize nginx log scanner:", err)
logger.Error("Failed to initialize config scanner:", err)
}
}
// GetNginxLogScanner returns the singleton instance of NginxLogScanner
func GetNginxLogScanner() *NginxLogScanner {
scannerInitMux.Lock()
defer scannerInitMux.Unlock()
// GetScanner returns the singleton instance of Scanner
func GetScanner() *Scanner {
configScannerInitMux.Lock()
defer configScannerInitMux.Unlock()
if logScanner == nil {
logScanner = &NginxLogScanner{
logCache: make(map[string]*NginxLogCache),
if scanner == nil {
scanner = &Scanner{
statusChan: make(chan bool, 10), // Buffer to prevent blocking
subscribers: make(map[chan bool]struct{}),
}
// Start broadcaster goroutine
go logScanner.broadcastStatus()
go scanner.broadcastStatus()
}
return logScanner
return scanner
}
// RegisterCallback adds a callback function to be executed during scans
// This function can be called before Scanner is initialized
func RegisterCallback(callback ScanCallback) {
scanCallbacksMutex.Lock()
defer scanCallbacksMutex.Unlock()
scanCallbacks = append(scanCallbacks, callback)
}
// broadcastStatus listens for status changes and broadcasts to all subscribers
func (s *NginxLogScanner) broadcastStatus() {
func (s *Scanner) broadcastStatus() {
for status := range s.statusChan {
s.subscriberMux.RLock()
for ch := range s.subscribers {
@ -92,9 +98,9 @@ func (s *NginxLogScanner) broadcastStatus() {
}
}
// SubscribeStatusChanges allows a client to subscribe to scanning status changes
func SubscribeStatusChanges() chan bool {
s := GetNginxLogScanner()
// SubscribeScanningStatus allows a client to subscribe to scanning status changes
func SubscribeScanningStatus() chan bool {
s := GetScanner()
ch := make(chan bool, 5) // Buffer to prevent blocking
// Add to subscribers
@ -116,9 +122,9 @@ func SubscribeStatusChanges() chan bool {
return ch
}
// UnsubscribeStatusChanges removes a subscriber from receiving status updates
func UnsubscribeStatusChanges(ch chan bool) {
s := GetNginxLogScanner()
// UnsubscribeScanningStatus removes a subscriber from receiving status updates
func UnsubscribeScanningStatus(ch chan bool) {
s := GetScanner()
s.subscriberMux.Lock()
delete(s.subscribers, ch)
@ -128,8 +134,8 @@ func UnsubscribeStatusChanges(ch chan bool) {
close(ch)
}
// Initialize sets up the log scanner and starts watching for file changes
func (s *NginxLogScanner) Initialize() error {
// Initialize sets up the scanner and starts watching for file changes
func (s *Scanner) Initialize() error {
if s.initialized {
return nil
}
@ -209,7 +215,7 @@ func (s *NginxLogScanner) Initialize() error {
}
// watchForChanges handles the fsnotify events and triggers rescans when necessary
func (s *NginxLogScanner) watchForChanges() {
func (s *Scanner) watchForChanges() {
for {
select {
case event, ok := <-s.watcher.Events:
@ -228,7 +234,7 @@ func (s *NginxLogScanner) watchForChanges() {
}
}
// Process file changes - no .conf restriction anymore
// Process file changes
if !event.Has(fsnotify.Remove) {
logger.Debug("Config file changed:", event.Name)
// Give the system a moment to finish writing the file
@ -239,9 +245,7 @@ func (s *NginxLogScanner) watchForChanges() {
logger.Error("Failed to scan changed file:", err)
}
} else {
// For removed files, we need to clean up any log entries that came from this file
// This would require tracking which logs came from which config files
// For now, we'll do a full rescan which is simpler but less efficient
// For removed files, we need a full rescan
err := s.ScanAllConfigs()
if err != nil {
logger.Error("Failed to rescan configs after file removal:", err)
@ -257,8 +261,8 @@ func (s *NginxLogScanner) watchForChanges() {
}
}
// scanSingleFile scans a single file and updates the log cache accordingly
func (s *NginxLogScanner) scanSingleFile(filePath string) error {
// scanSingleFile scans a single file and executes all registered callbacks
func (s *Scanner) scanSingleFile(filePath string) error {
// Set scanning state to true
s.scanMutex.Lock()
wasScanning := s.scanning
@ -278,134 +282,30 @@ func (s *NginxLogScanner) scanSingleFile(filePath string) error {
s.scanMutex.Unlock()
}()
// Create a temporary cache for new entries from this file
newEntries := make(map[string]*NginxLogCache)
// Scan the file
err := s.scanConfigFile(filePath, newEntries)
if err != nil {
return err
}
// Update the main cache with new entries
s.cacheMutex.Lock()
for path, entry := range newEntries {
s.logCache[path] = entry
}
s.cacheMutex.Unlock()
return nil
}
// ScanAllConfigs scans all nginx config files for log directives
func (s *NginxLogScanner) ScanAllConfigs() error {
// Set scanning state to true
s.scanMutex.Lock()
wasScanning := s.scanning
s.scanning = true
if !wasScanning {
// Only broadcast if status changed from not scanning to scanning
s.statusChan <- true
}
s.scanMutex.Unlock()
// Ensure we reset scanning state when done
defer func() {
s.scanMutex.Lock()
s.scanning = false
// Broadcast the completion
s.statusChan <- false
s.scanMutex.Unlock()
}()
// Initialize a new cache to replace the old one
newCache := make(map[string]*NginxLogCache)
// Get the main config file
mainConfigPath := nginx.GetConfPath("", "nginx.conf")
err := s.scanConfigFile(mainConfigPath, newCache)
if err != nil {
logger.Error("Failed to scan main config:", err)
}
// Scan sites-available directory - no .conf restriction anymore
sitesAvailablePath := nginx.GetConfPath("sites-available", "")
sitesAvailableFiles, err := os.ReadDir(sitesAvailablePath)
if err == nil {
for _, file := range sitesAvailableFiles {
if !file.IsDir() {
configPath := filepath.Join(sitesAvailablePath, file.Name())
err := s.scanConfigFile(configPath, newCache)
if err != nil {
logger.Error("Failed to scan config:", configPath, err)
}
}
}
}
// Scan stream-available directory if it exists
streamAvailablePath := nginx.GetConfPath("stream-available", "")
streamAvailableFiles, err := os.ReadDir(streamAvailablePath)
if err == nil {
for _, file := range streamAvailableFiles {
if !file.IsDir() {
configPath := filepath.Join(streamAvailablePath, file.Name())
err := s.scanConfigFile(configPath, newCache)
if err != nil {
logger.Error("Failed to scan stream config:", configPath, err)
}
}
}
}
// Replace the old cache with the new one
s.cacheMutex.Lock()
s.logCache = newCache
s.cacheMutex.Unlock()
return nil
}
// scanConfigFile scans a single config file for log directives using regex
func (s *NginxLogScanner) scanConfigFile(configPath string, cache map[string]*NginxLogCache) error {
// Open the file
file, err := os.Open(configPath)
file, err := os.Open(filePath)
if err != nil {
return err
}
defer file.Close()
// Read the entire file content
content, err := os.ReadFile(configPath)
content, err := os.ReadFile(filePath)
if err != nil {
return err
}
// Find all matches of log directives
matches := logDirectiveRegex.FindAllSubmatch(content, -1)
for _, match := range matches {
if len(match) >= 3 {
directiveType := string(match[1]) // "access_log" or "error_log"
logPath := string(match[2]) // The log file path
// Validate the log path
if isValidLogPath(logPath) {
logType := "access"
if directiveType == "error_log" {
logType = "error"
}
cache[logPath] = &NginxLogCache{
Path: logPath,
Type: logType,
Name: filepath.Base(logPath),
}
}
// Execute all registered callbacks
scanCallbacksMutex.RLock()
for _, callback := range scanCallbacks {
err := callback(filePath, content)
if err != nil {
logger.Error("Callback error for file", filePath, ":", err)
}
}
scanCallbacksMutex.RUnlock()
// Look for include directives to process included files
includeRegex := regexp.MustCompile(`include\s+([^;]+);`)
includeMatches := includeRegex.FindAllSubmatch(content, -1)
for _, match := range includeMatches {
@ -430,7 +330,7 @@ func (s *NginxLogScanner) scanConfigFile(configPath string, cache map[string]*Ng
for _, matchedFile := range matchedFiles {
fileInfo, err := os.Stat(matchedFile)
if err == nil && !fileInfo.IsDir() {
err = s.scanConfigFile(matchedFile, cache)
err = s.scanSingleFile(matchedFile)
if err != nil {
logger.Error("Failed to scan included file:", matchedFile, err)
}
@ -446,7 +346,7 @@ func (s *NginxLogScanner) scanConfigFile(configPath string, cache map[string]*Ng
fileInfo, err := os.Stat(includePath)
if err == nil && !fileInfo.IsDir() {
err = s.scanConfigFile(includePath, cache)
err = s.scanSingleFile(includePath)
if err != nil {
logger.Error("Failed to scan included file:", includePath, err)
}
@ -458,77 +358,69 @@ func (s *NginxLogScanner) scanConfigFile(configPath string, cache map[string]*Ng
return nil
}
// isLogPathUnderWhiteList checks if the log path is under one of the paths in LogDirWhiteList
// This is a duplicate of the function in nginx_log package to avoid import cycle
func isLogPathUnderWhiteList(path string) bool {
// deep copy
logDirWhiteList := append([]string{}, settings.NginxSettings.LogDirWhiteList...)
accessLogPath := nginx.GetAccessLogPath()
errorLogPath := nginx.GetErrorLogPath()
if accessLogPath != "" {
logDirWhiteList = append(logDirWhiteList, filepath.Dir(accessLogPath))
}
if errorLogPath != "" {
logDirWhiteList = append(logDirWhiteList, filepath.Dir(errorLogPath))
// ScanAllConfigs scans all nginx config files and executes all registered callbacks
func (s *Scanner) ScanAllConfigs() error {
// Set scanning state to true
s.scanMutex.Lock()
wasScanning := s.scanning
s.scanning = true
if !wasScanning {
// Only broadcast if status changed from not scanning to scanning
s.statusChan <- true
}
s.scanMutex.Unlock()
for _, whitePath := range logDirWhiteList {
if helper.IsUnderDirectory(path, whitePath) {
return true
}
}
return false
}
// Ensure we reset scanning state when done
defer func() {
s.scanMutex.Lock()
s.scanning = false
// Broadcast the completion
s.statusChan <- false
s.scanMutex.Unlock()
}()
// isValidLogPath checks if a log path is valid:
// 1. It must be a regular file or a symlink to a regular file
// 2. It must not point to a console or special device
// 3. It must be under the whitelist directories
func isValidLogPath(logPath string) bool {
// First check if the path is under the whitelist
if !isLogPathUnderWhiteList(logPath) {
logger.Warn("Log path is not under whitelist:", logPath)
return false
}
// Check if the path exists
fileInfo, err := os.Lstat(logPath)
// Get the main config file
mainConfigPath := nginx.GetConfPath("", "nginx.conf")
err := s.scanSingleFile(mainConfigPath)
if err != nil {
// If file doesn't exist, it might be created later
// We'll assume it's valid for now
return true
logger.Error("Failed to scan main config:", err)
}
// If it's a symlink, follow it
if fileInfo.Mode()&os.ModeSymlink != 0 {
linkTarget, err := os.Readlink(logPath)
if err != nil {
return false
// Scan sites-available directory
sitesAvailablePath := nginx.GetConfPath("sites-available", "")
sitesAvailableFiles, err := os.ReadDir(sitesAvailablePath)
if err == nil {
for _, file := range sitesAvailableFiles {
if !file.IsDir() {
configPath := filepath.Join(sitesAvailablePath, file.Name())
err := s.scanSingleFile(configPath)
if err != nil {
logger.Error("Failed to scan config:", configPath, err)
}
}
}
// Make absolute path if the link target is relative
if !filepath.IsAbs(linkTarget) {
linkTarget = filepath.Join(filepath.Dir(logPath), linkTarget)
}
// Check the target file
targetInfo, err := os.Stat(linkTarget)
if err != nil {
return false
}
// Only accept regular files as targets
return targetInfo.Mode().IsRegular()
}
// For non-symlinks, just check if it's a regular file
return fileInfo.Mode().IsRegular()
// Scan stream-available directory if it exists
streamAvailablePath := nginx.GetConfPath("stream-available", "")
streamAvailableFiles, err := os.ReadDir(streamAvailablePath)
if err == nil {
for _, file := range streamAvailableFiles {
if !file.IsDir() {
configPath := filepath.Join(streamAvailablePath, file.Name())
err := s.scanSingleFile(configPath)
if err != nil {
logger.Error("Failed to scan stream config:", configPath, err)
}
}
}
}
return nil
}
// Shutdown cleans up resources used by the scanner
func (s *NginxLogScanner) Shutdown() {
func (s *Scanner) Shutdown() {
if s.watcher != nil {
s.watcher.Close()
}
@ -551,34 +443,9 @@ func (s *NginxLogScanner) Shutdown() {
close(s.statusChan)
}
// GetAllLogPaths returns all cached log paths
func GetAllLogPaths(filters ...func(*NginxLogCache) bool) []*NginxLogCache {
s := GetNginxLogScanner()
s.cacheMutex.RLock()
defer s.cacheMutex.RUnlock()
result := make([]*NginxLogCache, 0, len(s.logCache))
for _, cache := range s.logCache {
flag := true
if len(filters) > 0 {
for _, filter := range filters {
if !filter(cache) {
flag = false
break
}
}
}
if flag {
result = append(result, cache)
}
}
return result
}
// IsScanning returns whether a scan is currently in progress
func IsScanning() bool {
s := GetNginxLogScanner()
// IsScanningInProgress returns whether a scan is currently in progress
func IsScanningInProgress() bool {
s := GetScanner()
s.scanMutex.RLock()
defer s.scanMutex.RUnlock()
return s.scanning