Fix monitorNewFiles for NFS + Remove dead tails from tail map (#3508)

* xx

* xx

* Tests

* fix tests

* XX

* Fix race condition in TestLiveAcquisition implementation

* Better comments for IsTailing and RemoveTail

* lint

* linter

* unmarshal DiscoveryPollInterval to time.Duration

* []byte -> string

* prefer void assignment to nolint

* extract method, add test

* excludedByRE() -> isExcluded()

* fix windows test

* fix regression - tail files from the end if they are detected when the application starts

---------

Co-authored-by: marco <marco@crowdsec.net>
This commit is contained in:
David 2025-04-30 15:05:17 +02:00 committed by GitHub
parent 582a192c1e
commit 8949309223
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 400 additions and 230 deletions

View file

@ -2,6 +2,7 @@ package fileacquisition
import (
"bufio"
"cmp"
"compress/gzip"
"context"
"errors"
@ -29,6 +30,8 @@ import (
"github.com/crowdsecurity/crowdsec/pkg/types"
)
const defaultPollInterval = 30 * time.Second
var linesRead = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cs_filesource_hits_total",
@ -38,12 +41,14 @@ var linesRead = prometheus.NewCounterVec(
type FileConfiguration struct {
Filenames []string
ExcludeRegexps []string `yaml:"exclude_regexps"`
ExcludeRegexps []string `yaml:"exclude_regexps"`
Filename string
ForceInotify bool `yaml:"force_inotify"`
MaxBufferSize int `yaml:"max_buffer_size"`
PollWithoutInotify *bool `yaml:"poll_without_inotify"`
configuration.DataSourceCommonCfg `yaml:",inline"`
ForceInotify bool `yaml:"force_inotify"`
MaxBufferSize int `yaml:"max_buffer_size"`
PollWithoutInotify *bool `yaml:"poll_without_inotify"`
DiscoveryPollEnable bool `yaml:"discovery_poll_enable"`
DiscoveryPollInterval time.Duration `yaml:"discovery_poll_interval"`
configuration.DataSourceCommonCfg `yaml:",inline"`
}
type FileSource struct {
@ -149,20 +154,7 @@ func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLeve
}
for _, file := range files {
// check if file is excluded
excluded := false
for _, pattern := range f.exclude_regexps {
if pattern.MatchString(file) {
excluded = true
f.logger.Infof("Skipping file %s as it matches exclude pattern %s", file, pattern)
break
}
}
if excluded {
if f.isExcluded(file) {
continue
}
@ -328,239 +320,217 @@ func (f *FileSource) StreamingAcquisition(ctx context.Context, out chan types.Ev
})
for _, file := range f.files {
// before opening the file, check if we need to specifically avoid it. (XXX)
skip := false
for _, pattern := range f.exclude_regexps {
if pattern.MatchString(file) {
f.logger.Infof("file %s matches exclusion pattern %s, skipping", file, pattern.String())
skip = true
break
}
if err := f.setupTailForFile(file, out, true, t); err != nil {
f.logger.Errorf("Error setting up tail for %s: %s", file, err)
}
if skip {
continue
}
// cf. https://github.com/crowdsecurity/crowdsec/issues/1168
// do not rely on stat, reclose file immediately as it's opened by Tail
fd, err := os.Open(file)
if err != nil {
f.logger.Errorf("unable to read %s : %s", file, err)
continue
}
if err = fd.Close(); err != nil {
f.logger.Errorf("unable to close %s : %s", file, err)
continue
}
fi, err := os.Stat(file)
if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err)
}
if fi.IsDir() {
f.logger.Warnf("%s is a directory, ignoring it.", file)
continue
}
pollFile := false
if f.config.PollWithoutInotify != nil {
pollFile = *f.config.PollWithoutInotify
} else {
networkFS, fsType, err := types.IsNetworkFS(file)
if err != nil {
f.logger.Warningf("Could not get fs type for %s : %s", file, err)
}
f.logger.Debugf("fs for %s is network: %t (%s)", file, networkFS, fsType)
if networkFS {
f.logger.Warnf("Disabling inotify polling on %s as it is on a network share. You can manually set poll_without_inotify to true to make this message disappear, or to false to enforce inotify poll", file)
pollFile = true
}
}
filink, err := os.Lstat(file)
if err != nil {
f.logger.Errorf("Could not lstat() new file %s, ignoring it : %s", file, err)
continue
}
if filink.Mode()&os.ModeSymlink == os.ModeSymlink && !pollFile {
f.logger.Warnf("File %s is a symlink, but inotify polling is enabled. Crowdsec will not be able to detect rotation. Consider setting poll_without_inotify to true in your configuration", file)
}
tail, err := tail.TailFile(file, tail.Config{ReOpen: true, Follow: true, Poll: pollFile, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}, Logger: log.NewEntry(log.StandardLogger())})
if err != nil {
f.logger.Errorf("Could not start tailing file %s : %s", file, err)
continue
}
f.tailMapMutex.Lock()
f.tails[file] = true
f.tailMapMutex.Unlock()
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/file/live/fsnotify")
return f.tailFile(out, t, tail)
})
}
return nil
}
func (f *FileSource) Dump() interface{} {
func (f *FileSource) Dump() any {
return f
}
// checkAndTailFile validates and sets up tailing for a given file. It performs the following checks:
// 1. Verifies if the file exists and is not a directory
// 2. Checks if the filename matches any of the configured patterns
// 3. Sets up file tailing if the file is valid and matches patterns
//
// Parameters:
// - filename: The path to the file to check and potentially tail
// - logger: A log.Entry for contextual logging
// - out: Channel to send file events to
// - t: A tomb.Tomb for graceful shutdown handling
//
// Returns an error if any validation fails or if tailing setup fails
func (f *FileSource) checkAndTailFile(filename string, logger *log.Entry, out chan types.Event, t *tomb.Tomb) error {
// Check if it's a directory
fi, err := os.Stat(filename)
if err != nil {
logger.Errorf("Could not stat() file %s, ignoring it : %s", filename, err)
return err
}
if fi.IsDir() {
return nil
}
logger.Debugf("Processing file %s", filename)
// Check if file matches any of our patterns
matched := false
for _, pattern := range f.config.Filenames {
logger.Debugf("Matching %s with %s", pattern, filename)
matched, err = filepath.Match(pattern, filename)
if err != nil {
logger.Errorf("Could not match pattern : %s", err)
continue
}
if matched {
logger.Debugf("Matched %s with %s", pattern, filename)
break
}
}
if !matched {
return nil
}
// Setup the tail if needed
if err := f.setupTailForFile(filename, out, false, t); err != nil {
logger.Errorf("Error setting up tail for file %s: %s", filename, err)
return err
}
return nil
}
func (f *FileSource) monitorNewFiles(out chan types.Event, t *tomb.Tomb) error {
logger := f.logger.WithField("goroutine", "inotify")
// Setup polling if enabled
var tickerChan <-chan time.Time
var ticker *time.Ticker
if f.config.DiscoveryPollEnable {
interval := cmp.Or(f.config.DiscoveryPollInterval, defaultPollInterval)
ticker = time.NewTicker(interval)
tickerChan = ticker.C
defer ticker.Stop()
}
for {
select {
case event, ok := <-f.watcher.Events:
if !ok {
return nil
}
if event.Op&fsnotify.Create != fsnotify.Create {
continue
}
_ = f.checkAndTailFile(event.Name, logger, out, t)
fi, err := os.Stat(event.Name)
if err != nil {
logger.Errorf("Could not stat() new file %s, ignoring it : %s", event.Name, err)
continue
}
if fi.IsDir() {
continue
}
logger.Debugf("Detected new file %s", event.Name)
matched := false
case <-tickerChan: // Will never trigger if tickerChan is nil
// Poll for all configured patterns
for _, pattern := range f.config.Filenames {
logger.Debugf("Matching %s with %s", pattern, event.Name)
matched, err = filepath.Match(pattern, event.Name)
files, err := filepath.Glob(pattern)
if err != nil {
logger.Errorf("Could not match pattern : %s", err)
logger.Errorf("Error globbing pattern %s during poll: %s", pattern, err)
continue
}
if matched {
logger.Debugf("Matched %s with %s", pattern, event.Name)
break
for _, file := range files {
_ = f.checkAndTailFile(file, logger, out, t)
}
}
if !matched {
continue
}
// before opening the file, check if we need to specifically avoid it. (XXX)
skip := false
for _, pattern := range f.exclude_regexps {
if pattern.MatchString(event.Name) {
f.logger.Infof("file %s matches exclusion pattern %s, skipping", event.Name, pattern.String())
skip = true
break
}
}
if skip {
continue
}
f.tailMapMutex.RLock()
if f.tails[event.Name] {
f.tailMapMutex.RUnlock()
// we already have a tail on it, do not start a new one
logger.Debugf("Already tailing file %s, not creating a new tail", event.Name)
break
}
f.tailMapMutex.RUnlock()
// cf. https://github.com/crowdsecurity/crowdsec/issues/1168
// do not rely on stat, reclose file immediately as it's opened by Tail
fd, err := os.Open(event.Name)
if err != nil {
f.logger.Errorf("unable to read %s : %s", event.Name, err)
continue
}
if err = fd.Close(); err != nil {
f.logger.Errorf("unable to close %s : %s", event.Name, err)
continue
}
pollFile := false
if f.config.PollWithoutInotify != nil {
pollFile = *f.config.PollWithoutInotify
} else {
networkFS, fsType, err := types.IsNetworkFS(event.Name)
if err != nil {
f.logger.Warningf("Could not get fs type for %s : %s", event.Name, err)
}
f.logger.Debugf("fs for %s is network: %t (%s)", event.Name, networkFS, fsType)
if networkFS {
pollFile = true
}
}
filink, err := os.Lstat(event.Name)
if err != nil {
logger.Errorf("Could not lstat() new file %s, ignoring it : %s", event.Name, err)
continue
}
if filink.Mode()&os.ModeSymlink == os.ModeSymlink && !pollFile {
logger.Warnf("File %s is a symlink, but inotify polling is enabled. Crowdsec will not be able to detect rotation. Consider setting poll_without_inotify to true in your configuration", event.Name)
}
// Slightly different parameters for Location, as we want to read the first lines of the newly created file
tail, err := tail.TailFile(event.Name, tail.Config{ReOpen: true, Follow: true, Poll: pollFile, Location: &tail.SeekInfo{Offset: 0, Whence: io.SeekStart}})
if err != nil {
logger.Errorf("Could not start tailing file %s : %s", event.Name, err)
break
}
f.tailMapMutex.Lock()
f.tails[event.Name] = true
f.tailMapMutex.Unlock()
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/tailfile")
return f.tailFile(out, t, tail)
})
case err, ok := <-f.watcher.Errors:
if !ok {
return nil
}
logger.Errorf("Error while monitoring folder: %s", err)
case <-t.Dying():
err := f.watcher.Close()
if err != nil {
return fmt.Errorf("could not remove all inotify watches: %w", err)
}
return nil
}
}
}
func (f *FileSource) setupTailForFile(file string, out chan types.Event, seekEnd bool, t *tomb.Tomb) error {
logger := f.logger.WithField("file", file)
if f.isExcluded(file) {
return nil
}
// Check if we're already tailing
f.tailMapMutex.RLock()
if f.tails[file] {
f.tailMapMutex.RUnlock()
logger.Debugf("Already tailing file %s, not creating a new tail", file)
return nil
}
f.tailMapMutex.RUnlock()
// Validate file
fd, err := os.Open(file)
if err != nil {
return fmt.Errorf("unable to read %s : %s", file, err)
}
if err = fd.Close(); err != nil {
return fmt.Errorf("unable to close %s : %s", file, err)
}
fi, err := os.Stat(file)
if err != nil {
return fmt.Errorf("could not stat file %s : %w", file, err)
}
if fi.IsDir() {
logger.Warnf("%s is a directory, ignoring it.", file)
return nil
}
// Determine polling mode
pollFile := false
if f.config.PollWithoutInotify != nil {
pollFile = *f.config.PollWithoutInotify
} else {
networkFS, fsType, err := types.IsNetworkFS(file)
if err != nil {
logger.Warningf("Could not get fs type for %s : %s", file, err)
}
logger.Debugf("fs for %s is network: %t (%s)", file, networkFS, fsType)
if networkFS {
logger.Warnf("Disabling inotify polling on %s as it is on a network share. You can manually set poll_without_inotify to true to make this message disappear, or to false to enforce inotify poll", file)
pollFile = true
}
}
// Check symlink status
filink, err := os.Lstat(file)
if err != nil {
return fmt.Errorf("could not lstat() file %s: %w", file, err)
}
if filink.Mode()&os.ModeSymlink == os.ModeSymlink && !pollFile {
logger.Warnf("File %s is a symlink, but inotify polling is enabled. Crowdsec will not be able to detect rotation. Consider setting poll_without_inotify to true in your configuration", file)
}
// Create the tailer with appropriate configuration
seekInfo := &tail.SeekInfo{Offset: 0, Whence: io.SeekEnd}
if f.config.Mode == configuration.CAT_MODE {
seekInfo.Whence = io.SeekStart
}
if seekEnd {
seekInfo.Whence = io.SeekEnd
}
tail, err := tail.TailFile(file, tail.Config{
ReOpen: true,
Follow: true,
Poll: pollFile,
Location: seekInfo,
Logger: log.NewEntry(log.StandardLogger()),
})
if err != nil {
return fmt.Errorf("could not start tailing file %s : %w", file, err)
}
f.tailMapMutex.Lock()
f.tails[file] = true
f.tailMapMutex.Unlock()
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/tailfile")
return f.tailFile(out, t, tail)
})
return nil
}
func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tail) error {
logger := f.logger.WithField("tail", tail.Filename)
logger.Debug("-> start tailing")
@ -586,6 +556,12 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
logger.Warning(errMsg)
// Just remove the dead tailer from our map and return
// monitorNewFiles will pick up the file again if it's recreated
f.tailMapMutex.Lock()
delete(f.tails, tail.Filename)
f.tailMapMutex.Unlock()
return nil
case line := <-tail.Lines:
if line == nil {
@ -683,7 +659,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
linesRead.With(prometheus.Labels{"source": filename}).Inc()
// we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]any)}
}
}
@ -698,3 +674,31 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
return nil
}
// IsTailing returns whether a given file is currently being tailed. For testing purposes.
// It is case sensitive and path delimiter sensitive (filename must match exactly what the filename would look being OS specific)
func (f *FileSource) IsTailing(filename string) bool {
f.tailMapMutex.RLock()
defer f.tailMapMutex.RUnlock()
return f.tails[filename]
}
// RemoveTail is used for testing to simulate a dead tailer. For testing purposes.
// It is case sensitive and path delimiter sensitive (filename must match exactly what the filename would look being OS specific)
func (f *FileSource) RemoveTail(filename string) {
f.tailMapMutex.Lock()
defer f.tailMapMutex.Unlock()
delete(f.tails, filename)
}
// isExcluded returns the first matching regexp from the list of excluding patterns,
// or nil if the file is not excluded.
func (f *FileSource) isExcluded(path string) bool {
for _, re := range f.exclude_regexps {
if re.MatchString(path) {
f.logger.WithField("file", path).Infof("Skipping file: matches exclude regex %q", re)
return true
}
}
return false
}

View file

@ -1,8 +1,10 @@
package fileacquisition_test
import (
"context"
"fmt"
"os"
"path/filepath"
"runtime"
"testing"
"time"
@ -394,13 +396,19 @@ force_inotify: true`, testPattern),
actualLines := 0
if tc.expectedLines != 0 {
var stopReading bool
defer func() { stopReading = true }()
go func() {
for {
select {
case <-out:
actualLines++
case <-time.After(2 * time.Second):
return
default:
if stopReading {
return
}
// Small sleep to prevent tight loop
time.Sleep(100 * time.Millisecond)
}
}
}()
@ -410,21 +418,41 @@ force_inotify: true`, testPattern),
cstest.RequireErrorContains(t, err, tc.expectedErr)
if tc.expectedLines != 0 {
fd, err := os.Create("test_files/stream.log")
// f.IsTailing is path delimiter sensitive
streamLogFile := filepath.Join("test_files", "stream.log")
fd, err := os.Create(streamLogFile)
require.NoError(t, err, "could not create test file")
// wait for the file to be tailed
waitingForTail := true
for waitingForTail {
select {
case <-time.After(2 * time.Second):
t.Fatal("Timeout waiting for file to be tailed")
default:
if !f.IsTailing(streamLogFile) {
time.Sleep(50 * time.Millisecond)
continue
}
waitingForTail = false
}
}
for i := range 5 {
_, err = fmt.Fprintf(fd, "%d\n", i)
if err != nil {
os.Remove("test_files/stream.log")
os.Remove(streamLogFile)
t.Fatalf("could not write test file : %s", err)
}
}
fd.Close()
// we sleep to make sure we detect the new file
time.Sleep(3 * time.Second)
os.Remove("test_files/stream.log")
// sleep to ensure the tail events are processed
time.Sleep(2 * time.Second)
os.Remove(streamLogFile)
assert.Equal(t, tc.expectedLines, actualLines)
}
@ -454,20 +482,158 @@ exclude_regexps: ["\\.gz$"]`
subLogger := logger.WithField("type", "file")
f := fileacquisition.FileSource{}
if err := f.Configure([]byte(config), subLogger, configuration.METRICS_NONE); err != nil {
subLogger.Fatalf("unexpected error: %s", err)
}
err := f.Configure([]byte(config), subLogger, configuration.METRICS_NONE)
require.NoError(t, err)
expectedLogOutput := "Skipping file test_files/test.log.gz as it matches exclude pattern"
if runtime.GOOS == "windows" {
expectedLogOutput = `Skipping file test_files\test.log.gz as it matches exclude pattern \.gz`
}
if hook.LastEntry() == nil {
t.Fatalf("expected output %s, but got nothing", expectedLogOutput)
}
assert.Contains(t, hook.LastEntry().Message, expectedLogOutput)
require.NotNil(t, hook.LastEntry())
assert.Contains(t, hook.LastEntry().Message, `Skipping file: matches exclude regex "\\.gz`)
assert.Equal(t, filepath.Join("test_files", "test.log.gz"), hook.LastEntry().Data["file"])
hook.Reset()
}
func TestDiscoveryPollConfiguration(t *testing.T) {
tests := []struct {
name string
config string
wantErr string
}{
{
name: "valid discovery poll config",
config: `
filenames:
- "tests/test.log"
discovery_poll_enable: true
discovery_poll_interval: "30s"
mode: tail
`,
wantErr: "",
},
{
name: "invalid poll interval",
config: `
filenames:
- "tests/test.log"
discovery_poll_enable: true
discovery_poll_interval: "invalid"
mode: tail
`,
wantErr: "cannot unmarshal !!str `invalid` into time.Duration",
},
{
name: "polling disabled",
config: `
filenames:
- "tests/test.log"
discovery_poll_enable: false
mode: tail
`,
wantErr: "",
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
f := &fileacquisition.FileSource{}
err := f.Configure([]byte(tc.config), log.NewEntry(log.New()), configuration.METRICS_NONE)
cstest.RequireErrorContains(t, err, tc.wantErr)
})
}
}
func TestDiscoveryPolling(t *testing.T) {
dir := t.TempDir()
pattern := filepath.Join(dir, "*.log")
yamlConfig := fmt.Sprintf(`
filenames:
- '%s'
discovery_poll_enable: true
discovery_poll_interval: "1s"
exclude_regexps: ["\\.ignore$"]
mode: tail
`, pattern)
fmt.Printf("Config: %s\n", yamlConfig)
config := []byte(yamlConfig)
f := &fileacquisition.FileSource{}
err := f.Configure(config, log.NewEntry(log.New()), configuration.METRICS_NONE)
require.NoError(t, err)
// Create channel for events
eventChan := make(chan types.Event)
tomb := tomb.Tomb{}
// Start acquisition
err = f.StreamingAcquisition(context.Background(), eventChan, &tomb)
require.NoError(t, err)
// Create a test file
testFile := filepath.Join(dir, "test.log")
err = os.WriteFile(testFile, []byte("test line\n"), 0o644)
require.NoError(t, err)
ignoredFile := filepath.Join(dir, ".ignored")
err = os.WriteFile(ignoredFile, []byte("test line\n"), 0o644)
require.NoError(t, err)
// Wait for polling to detect the file
time.Sleep(4 * time.Second)
require.True(t, f.IsTailing(testFile), "File should be tailed after polling")
require.False(t, f.IsTailing(ignoredFile), "File should be ignored after polling")
// Cleanup
tomb.Kill(nil)
tomb.Wait()
}
func TestFileResurrectionViaPolling(t *testing.T) {
dir := t.TempDir()
ctx := t.Context()
testFile := filepath.Join(dir, "test.log")
err := os.WriteFile(testFile, []byte("test line\n"), 0o644)
require.NoError(t, err)
pattern := filepath.Join(dir, "*.log")
yamlConfig := fmt.Sprintf(`
filenames:
- '%s'
discovery_poll_enable: true
discovery_poll_interval: "1s"
mode: tail
`, pattern)
fmt.Printf("Config: %s\n", yamlConfig)
config := []byte(yamlConfig)
f := &fileacquisition.FileSource{}
err = f.Configure(config, log.NewEntry(log.New()), configuration.METRICS_NONE)
require.NoError(t, err)
eventChan := make(chan types.Event)
tomb := tomb.Tomb{}
err = f.StreamingAcquisition(ctx, eventChan, &tomb)
require.NoError(t, err)
// Wait for initial tail setup
time.Sleep(100 * time.Millisecond)
// Simulate tailer death by removing it from the map
f.RemoveTail(testFile)
isTailed := f.IsTailing(testFile)
require.False(t, isTailed, "File should be removed from the map")
// Wait for polling to resurrect the file
time.Sleep(2 * time.Second)
// Verify file is being tailed again
isTailed = f.IsTailing(testFile)
require.True(t, isTailed, "File should be resurrected via polling")
// Cleanup
tomb.Kill(nil)
tomb.Wait()
}