refact pkg/acquisition: extract loop method

This commit is contained in:
marco 2025-05-07 15:41:43 +02:00
parent ce6018fbbf
commit 17234963c0
3 changed files with 113 additions and 97 deletions

View file

@ -120,7 +120,7 @@ func LoadAcquisition(cConfig *csconfig.Config) ([]acquisition.DataSource, error)
return nil, fmt.Errorf("failed to configure datasource for %s: %w", flags.OneShotDSN, err)
}
} else {
dataSources, err = acquisition.LoadAcquisitionFromFile(cConfig.Crowdsec, cConfig.Prometheus)
dataSources, err = acquisition.LoadAcquisitionFromFiles(cConfig.Crowdsec, cConfig.Prometheus)
if err != nil {
return nil, err
}

View file

@ -215,111 +215,127 @@ func GetMetricsLevelFromPromCfg(prom *csconfig.PrometheusCfg) int {
return configuration.METRICS_FULL
}
// LoadAcquisitionFromFile unmarshals the configuration item and checks its availability
func LoadAcquisitionFromFile(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
// sourcesFromFile reads and parses one acquisition file into DataSources.
func sourcesFromFile(acquisFile string, metrics_level int) ([]DataSource, error) {
var sources []DataSource
log.Infof("loading acquisition file : %s", acquisFile)
yamlFile, err := os.Open(acquisFile)
if err != nil {
return nil, err
}
defer yamlFile.Close()
acquisContent, err := io.ReadAll(yamlFile)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err)
}
expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv)
dec := yaml.NewDecoder(strings.NewReader(expandedAcquis))
dec.SetStrict(true)
idx := -1
for {
var sub configuration.DataSourceCommonCfg
idx += 1
err = dec.Decode(&sub)
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err)
}
log.Tracef("End of yaml file")
break
}
// for backward compat ('type' was not mandatory, detect it)
if guessType := detectBackwardCompatAcquis(sub); guessType != "" {
log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType)
if sub.Source != "" && sub.Source != guessType {
log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType)
}
sub.Source = guessType
}
// it's an empty item, skip it
if len(sub.Labels) == 0 {
if sub.Source == "" {
log.Debugf("skipping empty item in %s", acquisFile)
continue
}
if sub.Source != "docker" {
// docker is the only source that can be empty
return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx)
}
}
if sub.Source == "" {
return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx)
}
// pre-check that the source is valid
_, err := GetDataSourceIface(sub.Source)
if err != nil {
return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err)
}
uniqueId := uuid.NewString()
sub.UniqueId = uniqueId
src, err := DataSourceConfigure(sub, metrics_level)
if err != nil {
var dserr *DataSourceUnavailableError
if errors.As(err, &dserr) {
log.Error(err)
continue
}
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
}
if sub.TransformExpr != "" {
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
if err != nil {
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
}
transformRuntimes[uniqueId] = vm
}
sources = append(sources, src)
}
return sources, nil
}
// LoadAcquisitionFromFiles unmarshals the configuration item and checks its availability
func LoadAcquisitionFromFiles(config *csconfig.CrowdsecServiceCfg, prom *csconfig.PrometheusCfg) ([]DataSource, error) {
var allSources []DataSource
metrics_level := GetMetricsLevelFromPromCfg(prom)
for _, acquisFile := range config.AcquisitionFiles {
log.Infof("loading acquisition file : %s", acquisFile)
yamlFile, err := os.Open(acquisFile)
sources, err := sourcesFromFile(acquisFile, metrics_level)
if err != nil {
return nil, err
}
defer yamlFile.Close()
acquisContent, err := io.ReadAll(yamlFile)
if err != nil {
return nil, fmt.Errorf("failed to read %s: %w", acquisFile, err)
}
expandedAcquis := csstring.StrictExpand(string(acquisContent), os.LookupEnv)
dec := yaml.NewDecoder(strings.NewReader(expandedAcquis))
dec.SetStrict(true)
idx := -1
for {
var sub configuration.DataSourceCommonCfg
idx += 1
err = dec.Decode(&sub)
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to parse %s: %w", acquisFile, err)
}
log.Tracef("End of yaml file")
break
}
// for backward compat ('type' was not mandatory, detect it)
if guessType := detectBackwardCompatAcquis(sub); guessType != "" {
log.Debugf("datasource type missing in %s (position %d): detected 'source=%s'", acquisFile, idx, guessType)
if sub.Source != "" && sub.Source != guessType {
log.Warnf("datasource type mismatch in %s (position %d): found '%s' but should probably be '%s'", acquisFile, idx, sub.Source, guessType)
}
sub.Source = guessType
}
// it's an empty item, skip it
if len(sub.Labels) == 0 {
if sub.Source == "" {
log.Debugf("skipping empty item in %s", acquisFile)
continue
}
if sub.Source != "docker" {
// docker is the only source that can be empty
return nil, fmt.Errorf("missing labels in %s (position %d)", acquisFile, idx)
}
}
if sub.Source == "" {
return nil, fmt.Errorf("data source type is empty ('source') in %s (position %d)", acquisFile, idx)
}
// pre-check that the source is valid
_, err := GetDataSourceIface(sub.Source)
if err != nil {
return nil, fmt.Errorf("in file %s (position %d) - %w", acquisFile, idx, err)
}
uniqueId := uuid.NewString()
sub.UniqueId = uniqueId
src, err := DataSourceConfigure(sub, metrics_level)
if err != nil {
var dserr *DataSourceUnavailableError
if errors.As(err, &dserr) {
log.Error(err)
continue
}
return nil, fmt.Errorf("while configuring datasource of type %s from %s (position %d): %w", sub.Source, acquisFile, idx, err)
}
if sub.TransformExpr != "" {
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
if err != nil {
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
}
transformRuntimes[uniqueId] = vm
}
sources = append(sources, src)
}
allSources = append(allSources, sources...)
}
return sources, nil
return allSources, nil
}
func GetMetrics(sources []DataSource, aggregated bool) error {

View file

@ -215,7 +215,7 @@ wowo: ajsajasjas
}
}
func TestLoadAcquisitionFromFile(t *testing.T) {
func TestLoadAcquisitionFromFiles(t *testing.T) {
appendMockSource()
t.Setenv("TEST_ENV", "test_value2")
@ -293,7 +293,7 @@ func TestLoadAcquisitionFromFile(t *testing.T) {
}
for _, tc := range tests {
t.Run(tc.TestName, func(t *testing.T) {
dss, err := LoadAcquisitionFromFile(&tc.Config, nil)
dss, err := LoadAcquisitionFromFiles(&tc.Config, nil)
cstest.RequireErrorContains(t, err, tc.ExpectedError)
if tc.ExpectedError != "" {