diff --git a/.golangci.yml b/.golangci.yml index 12d35ed87..7df08cf71 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -192,7 +192,6 @@ linters-settings: - unnamedResult - sloppyReassign - appendCombine - - captLocal - typeUnparen - commentFormatting - deferInLoop # diff --git a/cmd/crowdsec-cli/clidecision/decisions.go b/cmd/crowdsec-cli/clidecision/decisions.go index 307cabffe..b5865bab6 100644 --- a/cmd/crowdsec-cli/clidecision/decisions.go +++ b/cmd/crowdsec-cli/clidecision/decisions.go @@ -170,7 +170,7 @@ func (cli *cliDecisions) NewCommand() *cobra.Command { return cmd } -func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOpts, NoSimu *bool, contained *bool, printMachine bool) error { +func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOpts, noSimu *bool, contained *bool, printMachine bool) error { var err error *filter.ScopeEquals, err = clialert.SanitizeScope(*filter.ScopeEquals, *filter.IPEquals, *filter.RangeEquals) @@ -181,7 +181,7 @@ func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOp filter.ActiveDecisionEquals = new(bool) *filter.ActiveDecisionEquals = true - if NoSimu != nil && *NoSimu { + if noSimu != nil && *noSimu { filter.IncludeSimulated = new(bool) } /* nullify the empty entries to avoid bad filter */ diff --git a/pkg/acquisition/acquisition.go b/pkg/acquisition/acquisition.go index 06a491859..d39282705 100644 --- a/pkg/acquisition/acquisition.go +++ b/pkg/acquisition/acquisition.go @@ -365,13 +365,13 @@ func copyEvent(evt types.Event, line string) types.Event { return evtCopy } -func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) { +func transform(transformChan chan types.Event, output chan types.Event, acquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) { defer trace.CatchPanic("crowdsec/acquis") logger.Infof("transformer started") for { select { - case <-AcquisTomb.Dying(): + case <-acquisTomb.Dying(): logger.Debugf("transformer is dying") return case evt := <-transformChan: @@ -420,7 +420,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo } } -func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error { +func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, acquisTomb *tomb.Tomb) error { // Don't wait if we have no sources, as it will hang forever if len(sources) == 0 { return nil @@ -430,7 +430,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ subsrc := sources[i] // ensure its a copy log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc) - AcquisTomb.Go(func() error { + acquisTomb.Go(func() error { defer trace.CatchPanic("crowdsec/acquis") var err error @@ -449,21 +449,21 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ "datasource": subsrc.GetName(), }) - AcquisTomb.Go(func() error { - transform(outChan, output, AcquisTomb, transformRuntime, transformLogger) + acquisTomb.Go(func() error { + transform(outChan, output, acquisTomb, transformRuntime, transformLogger) return nil }) } if subsrc.GetMode() == configuration.TAIL_MODE { - err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb) + err = subsrc.StreamingAcquisition(ctx, outChan, acquisTomb) } else { - err = subsrc.OneShotAcquisition(ctx, outChan, AcquisTomb) + err = subsrc.OneShotAcquisition(ctx, outChan, acquisTomb) } if err != nil { // if one of the acqusition returns an error, we kill the others to properly shutdown - AcquisTomb.Kill(err) + acquisTomb.Kill(err) } return nil @@ -471,7 +471,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ } /*return only when acquisition is over (cat) or never (tail)*/ - err := AcquisTomb.Wait() + err := acquisTomb.Wait() return err } diff --git a/pkg/acquisition/modules/appsec/appsec.go b/pkg/acquisition/modules/appsec/appsec.go index 2f7861b32..86dbfe38b 100644 --- a/pkg/acquisition/modules/appsec/appsec.go +++ b/pkg/acquisition/modules/appsec/appsec.go @@ -155,14 +155,14 @@ func (w *AppsecSource) GetAggregMetrics() []prometheus.Collector { return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram} } -func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { err := w.UnmarshalConfig(yamlConfig) if err != nil { return fmt.Errorf("unable to parse appsec configuration: %w", err) } w.logger = logger - w.metricsLevel = MetricsLevel + w.metricsLevel = metricsLevel w.logger.Tracef("Appsec configuration: %+v", w.config) if w.config.AuthCacheDuration == nil { @@ -180,7 +180,7 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe w.InChan = make(chan appsec.ParsedRequest) appsecCfg := appsec.AppsecConfig{Logger: w.logger.WithField("component", "appsec_config")} - //we keep the datasource name + // we keep the datasource name appsecCfg.Name = w.config.Name // let's load the associated appsec_config: @@ -275,6 +275,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types. for _, runner := range w.AppsecRunners { runner.outChan = out + t.Go(func() error { defer trace.CatchPanic("crowdsec/acquis/appsec/live/runner") return runner.Run(t) @@ -285,16 +286,20 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types. if w.config.ListenSocket != "" { w.logger.Infof("creating unix socket %s", w.config.ListenSocket) _ = os.RemoveAll(w.config.ListenSocket) + listener, err := net.Listen("unix", w.config.ListenSocket) if err != nil { return fmt.Errorf("appsec server failed: %w", err) } + defer listener.Close() + if w.config.CertFilePath != "" && w.config.KeyFilePath != "" { err = w.server.ServeTLS(listener, w.config.CertFilePath, w.config.KeyFilePath) } else { err = w.server.Serve(listener) } + if err != nil && !errors.Is(err, http.ErrServerClosed) { return fmt.Errorf("appsec server failed: %w", err) } @@ -304,8 +309,10 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types. }) t.Go(func() error { var err error + if w.config.ListenAddr != "" { w.logger.Infof("creating TCP server on %s", w.config.ListenAddr) + if w.config.CertFilePath != "" && w.config.KeyFilePath != "" { err = w.server.ListenAndServeTLS(w.config.CertFilePath, w.config.KeyFilePath) } else { @@ -324,6 +331,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types. // xx let's clean up the appsec runners :) appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails) w.server.Shutdown(ctx) + return nil }) @@ -354,11 +362,13 @@ func (w *AppsecSource) IsAuth(apiKey string) bool { } req.Header.Add("X-Api-Key", apiKey) + resp, err := client.Do(req) if err != nil { log.Errorf("Error performing request: %s", err) return false } + defer resp.Body.Close() return resp.StatusCode == http.StatusOK @@ -371,17 +381,21 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) { apiKey := r.Header.Get(appsec.APIKeyHeaderName) clientIP := r.Header.Get(appsec.IPHeaderName) remoteIP := r.RemoteAddr + if apiKey == "" { w.logger.Errorf("Unauthorized request from '%s' (real IP = %s)", remoteIP, clientIP) rw.WriteHeader(http.StatusUnauthorized) + return } + expiration, exists := w.AuthCache.Get(apiKey) // if the apiKey is not in cache or has expired, just recheck the auth if !exists || time.Now().After(expiration) { if !w.IsAuth(apiKey) { rw.WriteHeader(http.StatusUnauthorized) w.logger.Errorf("Unauthorized request from '%s' (real IP = %s)", remoteIP, clientIP) + return } @@ -394,8 +408,10 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) { if err != nil { w.logger.Errorf("%s", err) rw.WriteHeader(http.StatusInternalServerError) + return } + parsedRequest.AppsecEngine = w.config.Name logger := w.logger.WithFields(log.Fields{ diff --git a/pkg/acquisition/modules/cloudwatch/cloudwatch.go b/pkg/acquisition/modules/cloudwatch/cloudwatch.go index ba267c905..5739ebc31 100644 --- a/pkg/acquisition/modules/cloudwatch/cloudwatch.go +++ b/pkg/acquisition/modules/cloudwatch/cloudwatch.go @@ -154,13 +154,13 @@ func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { err := cw.UnmarshalConfig(yamlConfig) if err != nil { return err } - cw.metricsLevel = MetricsLevel + cw.metricsLevel = metricsLevel cw.logger = logger.WithField("group", cw.Config.GroupName) @@ -330,9 +330,12 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha LastIngestionTime := time.Unix(0, *event.LastIngestionTime*int64(time.Millisecond)) if LastIngestionTime.Before(oldest) { cw.logger.Tracef("stop iteration, %s reached oldest age, stop (%s < %s)", *event.LogStreamName, LastIngestionTime, time.Now().UTC().Add(-*cw.Config.MaxStreamAge)) + hasMoreStreams = false + return false } + cw.logger.Tracef("stream %s is elligible for monitoring", *event.LogStreamName) // the stream has been updated recently, check if we should monitor it var expectMode int @@ -341,6 +344,7 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha } else { expectMode = types.TIMEMACHINE } + monitorStream := LogStreamTailConfig{ GroupName: cw.Config.GroupName, StreamName: *event.LogStreamName, @@ -354,16 +358,20 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha out <- monitorStream } } + if lastPage { cw.logger.Tracef("reached last page") + hasMoreStreams = false } + return true }, ) if err != nil { return fmt.Errorf("while describing group %s: %w", cw.Config.GroupName, err) } + cw.logger.Tracef("after DescribeLogStreamsPagesWithContext") } } @@ -373,12 +381,14 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha // LogStreamManager receives the potential streams to monitor, and starts a go routine when needed func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStreamTailConfig, outChan chan types.Event) error { cw.logger.Debugf("starting to monitor streams for %s", cw.Config.GroupName) + pollDeadStreamInterval := time.NewTicker(def_PollDeadStreamInterval) for { select { case newStream := <-in: //nolint:govet // copylocks won't matter if the tomb is not initialized shouldCreate := true + cw.logger.Tracef("received new streams to monitor : %s/%s", newStream.GroupName, newStream.StreamName) if cw.Config.StreamName != nil && newStream.StreamName != *cw.Config.StreamName { @@ -402,12 +412,16 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr if !stream.t.Alive() { cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName) cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...) + if cw.metricsLevel != configuration.METRICS_NONE { openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec() } + break } + shouldCreate = false + break } } @@ -417,19 +431,23 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr if cw.metricsLevel != configuration.METRICS_NONE { openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc() } + newStream.t = tomb.Tomb{} newStream.logger = cw.logger.WithField("stream", newStream.StreamName) cw.logger.Debugf("starting tail of stream %s", newStream.StreamName) newStream.t.Go(func() error { return cw.TailLogStream(ctx, &newStream, outChan) }) + cw.monitoredStreams = append(cw.monitoredStreams, &newStream) } case <-pollDeadStreamInterval.C: newMonitoredStreams := cw.monitoredStreams[:0] + for idx, stream := range cw.monitoredStreams { if !cw.monitoredStreams[idx].t.Alive() { cw.logger.Debugf("remove dead stream %s", stream.StreamName) + if cw.metricsLevel != configuration.METRICS_NONE { openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec() } @@ -437,20 +455,25 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr newMonitoredStreams = append(newMonitoredStreams, stream) } } + cw.monitoredStreams = newMonitoredStreams case <-cw.t.Dying(): cw.logger.Infof("LogStreamManager for %s is dying, %d alive streams", cw.Config.GroupName, len(cw.monitoredStreams)) + for idx, stream := range cw.monitoredStreams { if cw.monitoredStreams[idx].t.Alive() { cw.logger.Debugf("killing stream %s", stream.StreamName) cw.monitoredStreams[idx].t.Kill(nil) + if err := cw.monitoredStreams[idx].t.Wait(); err != nil { cw.logger.Debugf("error while waiting for death of %s : %s", stream.StreamName, err) } } } + cw.monitoredStreams = nil cw.logger.Debugf("routine cleanup done, return") + return nil } } @@ -458,12 +481,14 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error { var startFrom *string + lastReadMessage := time.Now().UTC() ticker := time.NewTicker(cfg.PollStreamInterval) // resume at existing index if we already had streamIndexMutex.Lock() v := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] streamIndexMutex.Unlock() + if v != "" { cfg.logger.Debugf("restarting on index %s", v) startFrom = &v @@ -474,7 +499,9 @@ func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTai select { case <-ticker.C: cfg.logger.Tracef("entering loop") + hasMorePages := true + for hasMorePages { /*for the first call, we only consume the last item*/ cfg.logger.Tracef("calling GetLogEventsPagesWithContext") @@ -489,36 +516,44 @@ func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTai func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool { cfg.logger.Tracef("%d results, last:%t", len(page.Events), lastPage) startFrom = page.NextForwardToken + if page.NextForwardToken != nil { streamIndexMutex.Lock() cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] = *page.NextForwardToken streamIndexMutex.Unlock() } + if lastPage { /*wait another ticker to check on new log availability*/ cfg.logger.Tracef("last page") + hasMorePages = false } + if len(page.Events) > 0 { lastReadMessage = time.Now().UTC() } + for _, event := range page.Events { evt, err := cwLogToEvent(event, cfg) if err != nil { cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err) } else { cfg.logger.Debugf("pushing message : %s", evt.Line.Raw) + if cw.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc() } outChan <- evt } } + return true }, ) if err != nil { newerr := fmt.Errorf("while reading %s/%s: %w", cfg.GroupName, cfg.StreamName, err) cfg.logger.Warningf("err : %s", newerr) + return newerr } cfg.logger.Tracef("done reading GetLogEventsPagesWithContext") diff --git a/pkg/acquisition/modules/docker/docker.go b/pkg/acquisition/modules/docker/docker.go index 798eba294..582da3d53 100644 --- a/pkg/acquisition/modules/docker/docker.go +++ b/pkg/acquisition/modules/docker/docker.go @@ -136,9 +136,9 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { d.logger = logger - d.metricsLevel = MetricsLevel + d.metricsLevel = metricsLevel err := d.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/acquisition/modules/file/file.go b/pkg/acquisition/modules/file/file.go index 9f439b0c8..697a3d35d 100644 --- a/pkg/acquisition/modules/file/file.go +++ b/pkg/acquisition/modules/file/file.go @@ -102,9 +102,9 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { f.logger = logger - f.metricsLevel = MetricsLevel + f.metricsLevel = metricsLevel err := f.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/acquisition/modules/http/http.go b/pkg/acquisition/modules/http/http.go index 3e4f26915..97e220570 100644 --- a/pkg/acquisition/modules/http/http.go +++ b/pkg/acquisition/modules/http/http.go @@ -157,9 +157,9 @@ func (hc *HttpConfiguration) Validate() error { return nil } -func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { h.logger = logger - h.metricsLevel = MetricsLevel + h.metricsLevel = metricsLevel err := h.UnmarshalConfig(yamlConfig) if err != nil { @@ -339,12 +339,14 @@ func (h *HTTPSource) RunServer(out chan types.Event, t *tomb.Tomb) error { if r.Method != http.MethodPost { h.logger.Errorf("method not allowed: %s", r.Method) http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed) + return } if err := authorizeRequest(r, &h.Config); err != nil { h.logger.Errorf("failed to authorize request from '%s': %s", r.RemoteAddr, err) http.Error(w, "Unauthorized", http.StatusUnauthorized) + return } diff --git a/pkg/acquisition/modules/journalctl/journalctl.go b/pkg/acquisition/modules/journalctl/journalctl.go index 47d90e2b3..f72878d9b 100644 --- a/pkg/acquisition/modules/journalctl/journalctl.go +++ b/pkg/acquisition/modules/journalctl/journalctl.go @@ -210,9 +210,9 @@ func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { j.logger = logger - j.metricsLevel = MetricsLevel + j.metricsLevel = metricsLevel err := j.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/acquisition/modules/kafka/kafka.go b/pkg/acquisition/modules/kafka/kafka.go index 77fc44e31..f213b8581 100644 --- a/pkg/acquisition/modules/kafka/kafka.go +++ b/pkg/acquisition/modules/kafka/kafka.go @@ -85,9 +85,9 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error { return err } -func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { k.logger = logger - k.metricsLevel = MetricsLevel + k.metricsLevel = metricsLevel k.logger.Debugf("start configuring %s source", dataSourceName) @@ -160,6 +160,7 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err)) continue } + k.logger.Tracef("got message: %s", string(m.Value)) l := types.Line{ Raw: string(m.Value), @@ -170,9 +171,11 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err Module: k.GetName(), } k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l) + if k.metricsLevel != configuration.METRICS_NONE { linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc() } + evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true) evt.Line = l out <- evt diff --git a/pkg/acquisition/modules/kinesis/kinesis.go b/pkg/acquisition/modules/kinesis/kinesis.go index b166a706c..16c91ad06 100644 --- a/pkg/acquisition/modules/kinesis/kinesis.go +++ b/pkg/acquisition/modules/kinesis/kinesis.go @@ -161,9 +161,9 @@ func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { k.logger = logger - k.metricsLevel = MetricsLevel + k.metricsLevel = metricsLevel err := k.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go index aaa83a3bb..f8c325b5c 100644 --- a/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go +++ b/pkg/acquisition/modules/kubernetesaudit/k8s_audit.go @@ -97,9 +97,9 @@ func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error { +func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, metricsLevel int) error { ka.logger = logger - ka.metricsLevel = MetricsLevel + ka.metricsLevel = metricsLevel err := ka.UnmarshalConfig(config) if err != nil { diff --git a/pkg/acquisition/modules/loki/loki.go b/pkg/acquisition/modules/loki/loki.go index c57e6a67c..47493d8cd 100644 --- a/pkg/acquisition/modules/loki/loki.go +++ b/pkg/acquisition/modules/loki/loki.go @@ -120,10 +120,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (l *LokiSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error { +func (l *LokiSource) Configure(config []byte, logger *log.Entry, metricsLevel int) error { l.Config = LokiConfiguration{} l.logger = logger - l.metricsLevel = MetricsLevel + l.metricsLevel = metricsLevel err := l.UnmarshalConfig(config) if err != nil { return err diff --git a/pkg/acquisition/modules/syslog/syslog.go b/pkg/acquisition/modules/syslog/syslog.go index fb6a04600..df805d08c 100644 --- a/pkg/acquisition/modules/syslog/syslog.go +++ b/pkg/acquisition/modules/syslog/syslog.go @@ -124,10 +124,10 @@ func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { s.logger = logger s.logger.Infof("Starting syslog datasource configuration") - s.metricsLevel = MetricsLevel + s.metricsLevel = metricsLevel err := s.UnmarshalConfig(yamlConfig) if err != nil { return err diff --git a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go index 8283bcc21..22186ea96 100644 --- a/pkg/acquisition/modules/wineventlog/wineventlog_windows.go +++ b/pkg/acquisition/modules/wineventlog/wineventlog_windows.go @@ -287,9 +287,9 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error { return nil } -func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error { +func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error { w.logger = logger - w.metricsLevel = MetricsLevel + w.metricsLevel = metricsLevel err := w.UnmarshalConfig(yamlConfig) if err != nil { diff --git a/pkg/apiclient/client.go b/pkg/apiclient/client.go index 47d97a283..ec473beca 100644 --- a/pkg/apiclient/client.go +++ b/pkg/apiclient/client.go @@ -125,8 +125,8 @@ func NewClient(config *Config) (*ApiClient, error) { return c, nil } -func NewDefaultClient(URL *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) { - transport, baseURL := createTransport(URL) +func NewDefaultClient(url *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) { + transport, baseURL := createTransport(url) if client == nil { client = &http.Client{} diff --git a/pkg/csprofiles/csprofiles.go b/pkg/csprofiles/csprofiles.go index 52cda1ed2..c509fb448 100644 --- a/pkg/csprofiles/csprofiles.go +++ b/pkg/csprofiles/csprofiles.go @@ -96,17 +96,17 @@ func NewProfile(profilesCfg []*csconfig.ProfileCfg) ([]*Runtime, error) { return profilesRuntime, nil } -func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*models.Decision, error) { +func (profile *Runtime) GenerateDecisionFromProfile(alert *models.Alert) ([]*models.Decision, error) { var decisions []*models.Decision - for _, refDecision := range Profile.Cfg.Decisions { + for _, refDecision := range profile.Cfg.Decisions { decision := models.Decision{} /*the reference decision from profile is in simulated mode */ if refDecision.Simulated != nil && *refDecision.Simulated { decision.Simulated = new(bool) *decision.Simulated = true /*the event is already in simulation mode */ - } else if Alert.Simulated != nil && *Alert.Simulated { + } else if alert.Simulated != nil && *alert.Simulated { decision.Simulated = new(bool) *decision.Simulated = true } @@ -116,7 +116,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod if refDecision.Scope != nil && *refDecision.Scope != "" { *decision.Scope = *refDecision.Scope } else { - *decision.Scope = *Alert.Source.Scope + *decision.Scope = *alert.Source.Scope } /*some fields are populated from the reference object : duration, scope, type*/ @@ -125,19 +125,19 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod *decision.Duration = *refDecision.Duration } - if Profile.Cfg.DurationExpr != "" && Profile.RuntimeDurationExpr != nil { + if profile.Cfg.DurationExpr != "" && profile.RuntimeDurationExpr != nil { profileDebug := false - if Profile.Cfg.Debug != nil && *Profile.Cfg.Debug { + if profile.Cfg.Debug != nil && *profile.Cfg.Debug { profileDebug = true } - duration, err := exprhelpers.Run(Profile.RuntimeDurationExpr, map[string]interface{}{"Alert": Alert}, Profile.Logger, profileDebug) + duration, err := exprhelpers.Run(profile.RuntimeDurationExpr, map[string]interface{}{"Alert": alert}, profile.Logger, profileDebug) if err != nil { - Profile.Logger.Warningf("Failed to run duration_expr : %v", err) + profile.Logger.Warningf("Failed to run duration_expr : %v", err) } else { durationStr := fmt.Sprint(duration) if _, err := time.ParseDuration(durationStr); err != nil { - Profile.Logger.Warningf("Failed to parse expr duration result '%s'", duration) + profile.Logger.Warningf("Failed to parse expr duration result '%s'", duration) } else { *decision.Duration = durationStr } @@ -149,7 +149,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod /*for the others, let's populate it from the alert and its source*/ decision.Value = new(string) - *decision.Value = *Alert.Source.Value + *decision.Value = *alert.Source.Value decision.Origin = new(string) *decision.Origin = types.CrowdSecOrigin @@ -158,7 +158,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod } decision.Scenario = new(string) - *decision.Scenario = *Alert.Scenario + *decision.Scenario = *alert.Scenario decisions = append(decisions, &decision) } @@ -166,21 +166,21 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod } // EvaluateProfile is going to evaluate an Alert against a profile to generate Decisions -func (Profile *Runtime) EvaluateProfile(Alert *models.Alert) ([]*models.Decision, bool, error) { +func (profile *Runtime) EvaluateProfile(alert *models.Alert) ([]*models.Decision, bool, error) { var decisions []*models.Decision matched := false - for eIdx, expression := range Profile.RuntimeFilters { + for eIdx, expression := range profile.RuntimeFilters { debugProfile := false - if Profile.Cfg.Debug != nil && *Profile.Cfg.Debug { + if profile.Cfg.Debug != nil && *profile.Cfg.Debug { debugProfile = true } - output, err := exprhelpers.Run(expression, map[string]interface{}{"Alert": Alert}, Profile.Logger, debugProfile) + output, err := exprhelpers.Run(expression, map[string]interface{}{"Alert": alert}, profile.Logger, debugProfile) if err != nil { - Profile.Logger.Warningf("failed to run profile expr for %s: %v", Profile.Cfg.Name, err) - return nil, matched, fmt.Errorf("while running expression %s: %w", Profile.Cfg.Filters[eIdx], err) + profile.Logger.Warningf("failed to run profile expr for %s: %v", profile.Cfg.Name, err) + return nil, matched, fmt.Errorf("while running expression %s: %w", profile.Cfg.Filters[eIdx], err) } switch out := output.(type) { @@ -188,22 +188,22 @@ func (Profile *Runtime) EvaluateProfile(Alert *models.Alert) ([]*models.Decision if out { matched = true /*the expression matched, create the associated decision*/ - subdecisions, err := Profile.GenerateDecisionFromProfile(Alert) + subdecisions, err := profile.GenerateDecisionFromProfile(alert) if err != nil { - return nil, matched, fmt.Errorf("while generating decision from profile %s: %w", Profile.Cfg.Name, err) + return nil, matched, fmt.Errorf("while generating decision from profile %s: %w", profile.Cfg.Name, err) } decisions = append(decisions, subdecisions...) } else { - Profile.Logger.Debugf("Profile %s filter is unsuccessful", Profile.Cfg.Name) + profile.Logger.Debugf("Profile %s filter is unsuccessful", profile.Cfg.Name) - if Profile.Cfg.OnFailure == "break" { + if profile.Cfg.OnFailure == "break" { break } } default: - return nil, matched, fmt.Errorf("unexpected type %t (%v) while running '%s'", output, output, Profile.Cfg.Filters[eIdx]) + return nil, matched, fmt.Errorf("unexpected type %t (%v) while running '%s'", output, output, profile.Cfg.Filters[eIdx]) } } diff --git a/pkg/database/flush.go b/pkg/database/flush.go index 8f646ddc9..4a3a93a40 100644 --- a/pkg/database/flush.go +++ b/pkg/database/flush.go @@ -222,7 +222,7 @@ func (c *Client) FlushAgentsAndBouncers(ctx context.Context, agentsCfg *csconfig return nil } -func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) error { +func (c *Client) FlushAlerts(ctx context.Context, maxAge string, maxItems int) error { var ( deletedByAge int deletedByNbItem int @@ -247,22 +247,22 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts) - if MaxAge != "" { + if maxAge != "" { filter := map[string][]string{ - "created_before": {MaxAge}, + "created_before": {maxAge}, } nbDeleted, err := c.DeleteAlertWithFilter(ctx, filter) if err != nil { c.Log.Warningf("FlushAlerts (max age): %s", err) - return fmt.Errorf("unable to flush alerts with filter until=%s: %w", MaxAge, err) + return fmt.Errorf("unable to flush alerts with filter until=%s: %w", maxAge, err) } c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted) deletedByAge = nbDeleted } - if MaxItems > 0 { + if maxItems > 0 { // We get the highest id for the alerts // We subtract MaxItems to avoid deleting alerts that are not old enough // This gives us the oldest alert that we want to keep @@ -282,7 +282,7 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e } if len(lastAlert) != 0 { - maxid := lastAlert[0].ID - MaxItems + maxid := lastAlert[0].ID - maxItems c.Log.Debugf("FlushAlerts (max id): %d", maxid) @@ -299,12 +299,12 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e if deletedByNbItem > 0 { c.Log.Infof("flushed %d/%d alerts because the max number of alerts has been reached (%d max)", - deletedByNbItem, totalAlerts, MaxItems) + deletedByNbItem, totalAlerts, maxItems) } if deletedByAge > 0 { c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more", - deletedByAge, totalAlerts, MaxAge) + deletedByAge, totalAlerts, maxAge) } return nil diff --git a/pkg/exprhelpers/crowdsec_cti.go b/pkg/exprhelpers/crowdsec_cti.go index 9b9eac4b9..900bd7824 100644 --- a/pkg/exprhelpers/crowdsec_cti.go +++ b/pkg/exprhelpers/crowdsec_cti.go @@ -29,29 +29,29 @@ var ( var ctiClient *cticlient.CrowdsecCTIClient -func InitCrowdsecCTI(Key *string, TTL *time.Duration, Size *int, LogLevel *log.Level) error { - if Key == nil || *Key == "" { +func InitCrowdsecCTI(key *string, ttl *time.Duration, size *int, logLevel *log.Level) error { + if key == nil || *key == "" { log.Warningf("CTI API key not set or empty, CTI will not be available") return cticlient.ErrDisabled } - CTIApiKey = *Key - if Size == nil { - Size = new(int) - *Size = 1000 + CTIApiKey = *key + if size == nil { + size = new(int) + *size = 1000 } - if TTL == nil { - TTL = new(time.Duration) - *TTL = 5 * time.Minute + if ttl == nil { + ttl = new(time.Duration) + *ttl = 5 * time.Minute } clog := log.New() if err := types.ConfigureLogger(clog); err != nil { return fmt.Errorf("while configuring datasource logger: %w", err) } - if LogLevel != nil { - clog.SetLevel(*LogLevel) + if logLevel != nil { + clog.SetLevel(*logLevel) } subLogger := clog.WithField("type", "crowdsec-cti") - CrowdsecCTIInitCache(*Size, *TTL) + CrowdsecCTIInitCache(*size, *ttl) ctiClient = cticlient.NewCrowdsecCTIClient(cticlient.WithAPIKey(CTIApiKey), cticlient.WithLogger(subLogger)) CTIApiEnabled = true return nil diff --git a/pkg/exprhelpers/helpers.go b/pkg/exprhelpers/helpers.go index 9bc991a8f..96de0020c 100644 --- a/pkg/exprhelpers/helpers.go +++ b/pkg/exprhelpers/helpers.go @@ -129,32 +129,34 @@ func Init(databaseClient *database.Client) error { dataFileRegex = make(map[string][]*regexp.Regexp) dataFileRe2 = make(map[string][]*re2.Regexp) dbClient = databaseClient + XMLCacheInit() + return nil } -func RegexpCacheInit(filename string, CacheCfg types.DataSource) error { +func RegexpCacheInit(filename string, cacheCfg types.DataSource) error { // cache is explicitly disabled - if CacheCfg.Cache != nil && !*CacheCfg.Cache { + if cacheCfg.Cache != nil && !*cacheCfg.Cache { return nil } // cache is implicitly disabled if no cache config is provided - if CacheCfg.Strategy == nil && CacheCfg.TTL == nil && CacheCfg.Size == nil { + if cacheCfg.Strategy == nil && cacheCfg.TTL == nil && cacheCfg.Size == nil { return nil } // cache is enabled - if CacheCfg.Size == nil { - CacheCfg.Size = ptr.Of(50) + if cacheCfg.Size == nil { + cacheCfg.Size = ptr.Of(50) } - gc := gcache.New(*CacheCfg.Size) + gc := gcache.New(*cacheCfg.Size) - if CacheCfg.Strategy == nil { - CacheCfg.Strategy = ptr.Of("LRU") + if cacheCfg.Strategy == nil { + cacheCfg.Strategy = ptr.Of("LRU") } - switch *CacheCfg.Strategy { + switch *cacheCfg.Strategy { case "LRU": gc = gc.LRU() case "LFU": @@ -162,11 +164,11 @@ func RegexpCacheInit(filename string, CacheCfg types.DataSource) error { case "ARC": gc = gc.ARC() default: - return fmt.Errorf("unknown cache strategy '%s'", *CacheCfg.Strategy) + return fmt.Errorf("unknown cache strategy '%s'", *cacheCfg.Strategy) } - if CacheCfg.TTL != nil { - gc.Expiration(*CacheCfg.TTL) + if cacheCfg.TTL != nil { + gc.Expiration(*cacheCfg.TTL) } cache := gc.Build() @@ -240,6 +242,7 @@ func Distinct(params ...any) (any, error) { if rt := reflect.TypeOf(params[0]).Kind(); rt != reflect.Slice && rt != reflect.Array { return nil, nil } + array := params[0].([]interface{}) if array == nil { return []interface{}{}, nil @@ -254,6 +257,7 @@ func Distinct(params ...any) (any, error) { ret = append(ret, val) } } + return ret, nil } @@ -282,8 +286,10 @@ func flatten(args []interface{}, v reflect.Value) []interface{} { } func existsInFileMaps(filename string, ftype string) (bool, error) { - ok := false var err error + + ok := false + switch ftype { case "regex", "regexp": if fflag.Re2RegexpInfileSupport.IsEnabled() { @@ -296,10 +302,11 @@ func existsInFileMaps(filename string, ftype string) (bool, error) { default: err = fmt.Errorf("unknown data type '%s' for : '%s'", ftype, filename) } + return ok, err } -//Expr helpers +// Expr helpers // func Get(arr []string, index int) string { func Get(params ...any) (any, error) { @@ -315,10 +322,12 @@ func Get(params ...any) (any, error) { func Atof(params ...any) (any, error) { x := params[0].(string) log.Debugf("debug atof %s", x) + ret, err := strconv.ParseFloat(x, 64) if err != nil { log.Warningf("Atof : can't convert float '%s' : %v", x, err) } + return ret, nil } @@ -340,22 +349,28 @@ func Distance(params ...any) (any, error) { long1 := params[1].(string) lat2 := params[2].(string) long2 := params[3].(string) + lat1f, err := strconv.ParseFloat(lat1, 64) if err != nil { log.Warningf("lat1 is not a float : %v", err) + return 0.0, fmt.Errorf("lat1 is not a float : %v", err) } + long1f, err := strconv.ParseFloat(long1, 64) if err != nil { log.Warningf("long1 is not a float : %v", err) + return 0.0, fmt.Errorf("long1 is not a float : %v", err) } + lat2f, err := strconv.ParseFloat(lat2, 64) if err != nil { log.Warningf("lat2 is not a float : %v", err) return 0.0, fmt.Errorf("lat2 is not a float : %v", err) } + long2f, err := strconv.ParseFloat(long2, 64) if err != nil { log.Warningf("long2 is not a float : %v", err) @@ -363,7 +378,7 @@ func Distance(params ...any) (any, error) { return 0.0, fmt.Errorf("long2 is not a float : %v", err) } - //either set of coordinates is 0,0, return 0 to avoid FPs + // either set of coordinates is 0,0, return 0 to avoid FPs if (lat1f == 0.0 && long1f == 0.0) || (lat2f == 0.0 && long2f == 0.0) { log.Warningf("one of the coordinates is 0,0, returning 0") return 0.0, nil @@ -373,6 +388,7 @@ func Distance(params ...any) (any, error) { second := haversine.Coord{Lat: lat2f, Lon: long2f} _, km := haversine.Distance(first, second) + return km, nil } diff --git a/pkg/leakybucket/bayesian.go b/pkg/leakybucket/bayesian.go index 357d51f59..30e1b396e 100644 --- a/pkg/leakybucket/bayesian.go +++ b/pkg/leakybucket/bayesian.go @@ -31,9 +31,9 @@ type BayesianBucket struct { DumbProcessor } -func updateProbability(prior, probGivenEvil, ProbGivenBenign float32) float32 { +func updateProbability(prior, probGivenEvil, probGivenBenign float32) float32 { numerator := probGivenEvil * prior - denominator := numerator + ProbGivenBenign*(1-prior) + denominator := numerator + probGivenBenign*(1-prior) return numerator / denominator } diff --git a/pkg/leakybucket/overflow_filter.go b/pkg/leakybucket/overflow_filter.go index 01dd491ed..b37e431fa 100644 --- a/pkg/leakybucket/overflow_filter.go +++ b/pkg/leakybucket/overflow_filter.go @@ -36,10 +36,10 @@ func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error) { return &u, nil } -func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) { +func (u *OverflowFilter) OnBucketOverflow(bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) { return func(l *Leaky, s types.RuntimeAlert, q *types.Queue) (types.RuntimeAlert, *types.Queue) { el, err := exprhelpers.Run(u.FilterRuntime, map[string]interface{}{ - "queue": q, "signal": s, "leaky": l}, l.logger, Bucket.Debug) + "queue": q, "signal": s, "leaky": l}, l.logger, bucket.Debug) if err != nil { l.logger.Errorf("Failed running overflow filter: %s", err) return s, q