lint: gocritic/captLocal (don't capitalize local variables) (#3402)

* lint: gocritic/captLocal (don't capitalize local variables)

* lint (whitespace)
This commit is contained in:
mmetc 2025-01-16 14:03:53 +01:00 committed by GitHub
parent b582730d06
commit fe931af5ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 172 additions and 101 deletions

View file

@ -192,7 +192,6 @@ linters-settings:
- unnamedResult
- sloppyReassign
- appendCombine
- captLocal
- typeUnparen
- commentFormatting
- deferInLoop #

View file

@ -170,7 +170,7 @@ func (cli *cliDecisions) NewCommand() *cobra.Command {
return cmd
}
func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOpts, NoSimu *bool, contained *bool, printMachine bool) error {
func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOpts, noSimu *bool, contained *bool, printMachine bool) error {
var err error
*filter.ScopeEquals, err = clialert.SanitizeScope(*filter.ScopeEquals, *filter.IPEquals, *filter.RangeEquals)
@ -181,7 +181,7 @@ func (cli *cliDecisions) list(ctx context.Context, filter apiclient.AlertsListOp
filter.ActiveDecisionEquals = new(bool)
*filter.ActiveDecisionEquals = true
if NoSimu != nil && *NoSimu {
if noSimu != nil && *noSimu {
filter.IncludeSimulated = new(bool)
}
/* nullify the empty entries to avoid bad filter */

View file

@ -365,13 +365,13 @@ func copyEvent(evt types.Event, line string) types.Event {
return evtCopy
}
func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
func transform(transformChan chan types.Event, output chan types.Event, acquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
defer trace.CatchPanic("crowdsec/acquis")
logger.Infof("transformer started")
for {
select {
case <-AcquisTomb.Dying():
case <-acquisTomb.Dying():
logger.Debugf("transformer is dying")
return
case evt := <-transformChan:
@ -420,7 +420,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
}
}
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, acquisTomb *tomb.Tomb) error {
// Don't wait if we have no sources, as it will hang forever
if len(sources) == 0 {
return nil
@ -430,7 +430,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ
subsrc := sources[i] // ensure its a copy
log.Debugf("starting one source %d/%d ->> %T", i, len(sources), subsrc)
AcquisTomb.Go(func() error {
acquisTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis")
var err error
@ -449,21 +449,21 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ
"datasource": subsrc.GetName(),
})
AcquisTomb.Go(func() error {
transform(outChan, output, AcquisTomb, transformRuntime, transformLogger)
acquisTomb.Go(func() error {
transform(outChan, output, acquisTomb, transformRuntime, transformLogger)
return nil
})
}
if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb)
err = subsrc.StreamingAcquisition(ctx, outChan, acquisTomb)
} else {
err = subsrc.OneShotAcquisition(ctx, outChan, AcquisTomb)
err = subsrc.OneShotAcquisition(ctx, outChan, acquisTomb)
}
if err != nil {
// if one of the acqusition returns an error, we kill the others to properly shutdown
AcquisTomb.Kill(err)
acquisTomb.Kill(err)
}
return nil
@ -471,7 +471,7 @@ func StartAcquisition(ctx context.Context, sources []DataSource, output chan typ
}
/*return only when acquisition is over (cat) or never (tail)*/
err := AcquisTomb.Wait()
err := acquisTomb.Wait()
return err
}

View file

@ -155,14 +155,14 @@ func (w *AppsecSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{AppsecReqCounter, AppsecBlockCounter, AppsecRuleHits, AppsecOutbandParsingHistogram, AppsecInbandParsingHistogram, AppsecGlobalParsingHistogram}
}
func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
err := w.UnmarshalConfig(yamlConfig)
if err != nil {
return fmt.Errorf("unable to parse appsec configuration: %w", err)
}
w.logger = logger
w.metricsLevel = MetricsLevel
w.metricsLevel = metricsLevel
w.logger.Tracef("Appsec configuration: %+v", w.config)
if w.config.AuthCacheDuration == nil {
@ -180,7 +180,7 @@ func (w *AppsecSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLe
w.InChan = make(chan appsec.ParsedRequest)
appsecCfg := appsec.AppsecConfig{Logger: w.logger.WithField("component", "appsec_config")}
//we keep the datasource name
// we keep the datasource name
appsecCfg.Name = w.config.Name
// let's load the associated appsec_config:
@ -275,6 +275,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
for _, runner := range w.AppsecRunners {
runner.outChan = out
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/appsec/live/runner")
return runner.Run(t)
@ -285,16 +286,20 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
if w.config.ListenSocket != "" {
w.logger.Infof("creating unix socket %s", w.config.ListenSocket)
_ = os.RemoveAll(w.config.ListenSocket)
listener, err := net.Listen("unix", w.config.ListenSocket)
if err != nil {
return fmt.Errorf("appsec server failed: %w", err)
}
defer listener.Close()
if w.config.CertFilePath != "" && w.config.KeyFilePath != "" {
err = w.server.ServeTLS(listener, w.config.CertFilePath, w.config.KeyFilePath)
} else {
err = w.server.Serve(listener)
}
if err != nil && !errors.Is(err, http.ErrServerClosed) {
return fmt.Errorf("appsec server failed: %w", err)
}
@ -304,8 +309,10 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
})
t.Go(func() error {
var err error
if w.config.ListenAddr != "" {
w.logger.Infof("creating TCP server on %s", w.config.ListenAddr)
if w.config.CertFilePath != "" && w.config.KeyFilePath != "" {
err = w.server.ListenAndServeTLS(w.config.CertFilePath, w.config.KeyFilePath)
} else {
@ -324,6 +331,7 @@ func (w *AppsecSource) StreamingAcquisition(ctx context.Context, out chan types.
// xx let's clean up the appsec runners :)
appsec.AppsecRulesDetails = make(map[int]appsec.RulesDetails)
w.server.Shutdown(ctx)
return nil
})
@ -354,11 +362,13 @@ func (w *AppsecSource) IsAuth(apiKey string) bool {
}
req.Header.Add("X-Api-Key", apiKey)
resp, err := client.Do(req)
if err != nil {
log.Errorf("Error performing request: %s", err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
@ -371,17 +381,21 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) {
apiKey := r.Header.Get(appsec.APIKeyHeaderName)
clientIP := r.Header.Get(appsec.IPHeaderName)
remoteIP := r.RemoteAddr
if apiKey == "" {
w.logger.Errorf("Unauthorized request from '%s' (real IP = %s)", remoteIP, clientIP)
rw.WriteHeader(http.StatusUnauthorized)
return
}
expiration, exists := w.AuthCache.Get(apiKey)
// if the apiKey is not in cache or has expired, just recheck the auth
if !exists || time.Now().After(expiration) {
if !w.IsAuth(apiKey) {
rw.WriteHeader(http.StatusUnauthorized)
w.logger.Errorf("Unauthorized request from '%s' (real IP = %s)", remoteIP, clientIP)
return
}
@ -394,8 +408,10 @@ func (w *AppsecSource) appsecHandler(rw http.ResponseWriter, r *http.Request) {
if err != nil {
w.logger.Errorf("%s", err)
rw.WriteHeader(http.StatusInternalServerError)
return
}
parsedRequest.AppsecEngine = w.config.Name
logger := w.logger.WithFields(log.Fields{

View file

@ -154,13 +154,13 @@ func (cw *CloudwatchSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (cw *CloudwatchSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
err := cw.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
cw.metricsLevel = MetricsLevel
cw.metricsLevel = metricsLevel
cw.logger = logger.WithField("group", cw.Config.GroupName)
@ -330,9 +330,12 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha
LastIngestionTime := time.Unix(0, *event.LastIngestionTime*int64(time.Millisecond))
if LastIngestionTime.Before(oldest) {
cw.logger.Tracef("stop iteration, %s reached oldest age, stop (%s < %s)", *event.LogStreamName, LastIngestionTime, time.Now().UTC().Add(-*cw.Config.MaxStreamAge))
hasMoreStreams = false
return false
}
cw.logger.Tracef("stream %s is elligible for monitoring", *event.LogStreamName)
// the stream has been updated recently, check if we should monitor it
var expectMode int
@ -341,6 +344,7 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha
} else {
expectMode = types.TIMEMACHINE
}
monitorStream := LogStreamTailConfig{
GroupName: cw.Config.GroupName,
StreamName: *event.LogStreamName,
@ -354,16 +358,20 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha
out <- monitorStream
}
}
if lastPage {
cw.logger.Tracef("reached last page")
hasMoreStreams = false
}
return true
},
)
if err != nil {
return fmt.Errorf("while describing group %s: %w", cw.Config.GroupName, err)
}
cw.logger.Tracef("after DescribeLogStreamsPagesWithContext")
}
}
@ -373,12 +381,14 @@ func (cw *CloudwatchSource) WatchLogGroupForStreams(ctx context.Context, out cha
// LogStreamManager receives the potential streams to monitor, and starts a go routine when needed
func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStreamTailConfig, outChan chan types.Event) error {
cw.logger.Debugf("starting to monitor streams for %s", cw.Config.GroupName)
pollDeadStreamInterval := time.NewTicker(def_PollDeadStreamInterval)
for {
select {
case newStream := <-in: //nolint:govet // copylocks won't matter if the tomb is not initialized
shouldCreate := true
cw.logger.Tracef("received new streams to monitor : %s/%s", newStream.GroupName, newStream.StreamName)
if cw.Config.StreamName != nil && newStream.StreamName != *cw.Config.StreamName {
@ -402,12 +412,16 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr
if !stream.t.Alive() {
cw.logger.Debugf("stream %s already exists, but is dead", newStream.StreamName)
cw.monitoredStreams = append(cw.monitoredStreams[:idx], cw.monitoredStreams[idx+1:]...)
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Dec()
}
break
}
shouldCreate = false
break
}
}
@ -417,19 +431,23 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": newStream.GroupName}).Inc()
}
newStream.t = tomb.Tomb{}
newStream.logger = cw.logger.WithField("stream", newStream.StreamName)
cw.logger.Debugf("starting tail of stream %s", newStream.StreamName)
newStream.t.Go(func() error {
return cw.TailLogStream(ctx, &newStream, outChan)
})
cw.monitoredStreams = append(cw.monitoredStreams, &newStream)
}
case <-pollDeadStreamInterval.C:
newMonitoredStreams := cw.monitoredStreams[:0]
for idx, stream := range cw.monitoredStreams {
if !cw.monitoredStreams[idx].t.Alive() {
cw.logger.Debugf("remove dead stream %s", stream.StreamName)
if cw.metricsLevel != configuration.METRICS_NONE {
openedStreams.With(prometheus.Labels{"group": cw.monitoredStreams[idx].GroupName}).Dec()
}
@ -437,20 +455,25 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr
newMonitoredStreams = append(newMonitoredStreams, stream)
}
}
cw.monitoredStreams = newMonitoredStreams
case <-cw.t.Dying():
cw.logger.Infof("LogStreamManager for %s is dying, %d alive streams", cw.Config.GroupName, len(cw.monitoredStreams))
for idx, stream := range cw.monitoredStreams {
if cw.monitoredStreams[idx].t.Alive() {
cw.logger.Debugf("killing stream %s", stream.StreamName)
cw.monitoredStreams[idx].t.Kill(nil)
if err := cw.monitoredStreams[idx].t.Wait(); err != nil {
cw.logger.Debugf("error while waiting for death of %s : %s", stream.StreamName, err)
}
}
}
cw.monitoredStreams = nil
cw.logger.Debugf("routine cleanup done, return")
return nil
}
}
@ -458,12 +481,14 @@ func (cw *CloudwatchSource) LogStreamManager(ctx context.Context, in chan LogStr
func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTailConfig, outChan chan types.Event) error {
var startFrom *string
lastReadMessage := time.Now().UTC()
ticker := time.NewTicker(cfg.PollStreamInterval)
// resume at existing index if we already had
streamIndexMutex.Lock()
v := cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName]
streamIndexMutex.Unlock()
if v != "" {
cfg.logger.Debugf("restarting on index %s", v)
startFrom = &v
@ -474,7 +499,9 @@ func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTai
select {
case <-ticker.C:
cfg.logger.Tracef("entering loop")
hasMorePages := true
for hasMorePages {
/*for the first call, we only consume the last item*/
cfg.logger.Tracef("calling GetLogEventsPagesWithContext")
@ -489,36 +516,44 @@ func (cw *CloudwatchSource) TailLogStream(ctx context.Context, cfg *LogStreamTai
func(page *cloudwatchlogs.GetLogEventsOutput, lastPage bool) bool {
cfg.logger.Tracef("%d results, last:%t", len(page.Events), lastPage)
startFrom = page.NextForwardToken
if page.NextForwardToken != nil {
streamIndexMutex.Lock()
cw.streamIndexes[cfg.GroupName+"+"+cfg.StreamName] = *page.NextForwardToken
streamIndexMutex.Unlock()
}
if lastPage { /*wait another ticker to check on new log availability*/
cfg.logger.Tracef("last page")
hasMorePages = false
}
if len(page.Events) > 0 {
lastReadMessage = time.Now().UTC()
}
for _, event := range page.Events {
evt, err := cwLogToEvent(event, cfg)
if err != nil {
cfg.logger.Warningf("cwLogToEvent error, discarded event : %s", err)
} else {
cfg.logger.Debugf("pushing message : %s", evt.Line.Raw)
if cw.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"group": cfg.GroupName, "stream": cfg.StreamName}).Inc()
}
outChan <- evt
}
}
return true
},
)
if err != nil {
newerr := fmt.Errorf("while reading %s/%s: %w", cfg.GroupName, cfg.StreamName, err)
cfg.logger.Warningf("err : %s", newerr)
return newerr
}
cfg.logger.Tracef("done reading GetLogEventsPagesWithContext")

View file

@ -136,9 +136,9 @@ func (d *DockerSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (d *DockerSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
d.logger = logger
d.metricsLevel = MetricsLevel
d.metricsLevel = metricsLevel
err := d.UnmarshalConfig(yamlConfig)
if err != nil {

View file

@ -102,9 +102,9 @@ func (f *FileSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (f *FileSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
f.logger = logger
f.metricsLevel = MetricsLevel
f.metricsLevel = metricsLevel
err := f.UnmarshalConfig(yamlConfig)
if err != nil {

View file

@ -157,9 +157,9 @@ func (hc *HttpConfiguration) Validate() error {
return nil
}
func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
h.logger = logger
h.metricsLevel = MetricsLevel
h.metricsLevel = metricsLevel
err := h.UnmarshalConfig(yamlConfig)
if err != nil {
@ -339,12 +339,14 @@ func (h *HTTPSource) RunServer(out chan types.Event, t *tomb.Tomb) error {
if r.Method != http.MethodPost {
h.logger.Errorf("method not allowed: %s", r.Method)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
if err := authorizeRequest(r, &h.Config); err != nil {
h.logger.Errorf("failed to authorize request from '%s': %s", r.RemoteAddr, err)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}

View file

@ -210,9 +210,9 @@ func (j *JournalCtlSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (j *JournalCtlSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
j.logger = logger
j.metricsLevel = MetricsLevel
j.metricsLevel = metricsLevel
err := j.UnmarshalConfig(yamlConfig)
if err != nil {

View file

@ -85,9 +85,9 @@ func (k *KafkaSource) UnmarshalConfig(yamlConfig []byte) error {
return err
}
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (k *KafkaSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
k.logger = logger
k.metricsLevel = MetricsLevel
k.metricsLevel = metricsLevel
k.logger.Debugf("start configuring %s source", dataSourceName)
@ -160,6 +160,7 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err
k.logger.Errorln(fmt.Errorf("while reading %s message: %w", dataSourceName, err))
continue
}
k.logger.Tracef("got message: %s", string(m.Value))
l := types.Line{
Raw: string(m.Value),
@ -170,9 +171,11 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err
Module: k.GetName(),
}
k.logger.Tracef("line with message read from topic '%s': %+v", k.Config.Topic, l)
if k.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
}
evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt

View file

@ -161,9 +161,9 @@ func (k *KinesisSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (k *KinesisSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
k.logger = logger
k.metricsLevel = MetricsLevel
k.metricsLevel = metricsLevel
err := k.UnmarshalConfig(yamlConfig)
if err != nil {

View file

@ -97,9 +97,9 @@ func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error {
func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, metricsLevel int) error {
ka.logger = logger
ka.metricsLevel = MetricsLevel
ka.metricsLevel = metricsLevel
err := ka.UnmarshalConfig(config)
if err != nil {

View file

@ -120,10 +120,10 @@ func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (l *LokiSource) Configure(config []byte, logger *log.Entry, MetricsLevel int) error {
func (l *LokiSource) Configure(config []byte, logger *log.Entry, metricsLevel int) error {
l.Config = LokiConfiguration{}
l.logger = logger
l.metricsLevel = MetricsLevel
l.metricsLevel = metricsLevel
err := l.UnmarshalConfig(config)
if err != nil {
return err

View file

@ -124,10 +124,10 @@ func (s *SyslogSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (s *SyslogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
s.logger = logger
s.logger.Infof("Starting syslog datasource configuration")
s.metricsLevel = MetricsLevel
s.metricsLevel = metricsLevel
err := s.UnmarshalConfig(yamlConfig)
if err != nil {
return err

View file

@ -287,9 +287,9 @@ func (w *WinEventLogSource) UnmarshalConfig(yamlConfig []byte) error {
return nil
}
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
func (w *WinEventLogSource) Configure(yamlConfig []byte, logger *log.Entry, metricsLevel int) error {
w.logger = logger
w.metricsLevel = MetricsLevel
w.metricsLevel = metricsLevel
err := w.UnmarshalConfig(yamlConfig)
if err != nil {

View file

@ -125,8 +125,8 @@ func NewClient(config *Config) (*ApiClient, error) {
return c, nil
}
func NewDefaultClient(URL *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) {
transport, baseURL := createTransport(URL)
func NewDefaultClient(url *url.URL, prefix string, userAgent string, client *http.Client) (*ApiClient, error) {
transport, baseURL := createTransport(url)
if client == nil {
client = &http.Client{}

View file

@ -96,17 +96,17 @@ func NewProfile(profilesCfg []*csconfig.ProfileCfg) ([]*Runtime, error) {
return profilesRuntime, nil
}
func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*models.Decision, error) {
func (profile *Runtime) GenerateDecisionFromProfile(alert *models.Alert) ([]*models.Decision, error) {
var decisions []*models.Decision
for _, refDecision := range Profile.Cfg.Decisions {
for _, refDecision := range profile.Cfg.Decisions {
decision := models.Decision{}
/*the reference decision from profile is in simulated mode */
if refDecision.Simulated != nil && *refDecision.Simulated {
decision.Simulated = new(bool)
*decision.Simulated = true
/*the event is already in simulation mode */
} else if Alert.Simulated != nil && *Alert.Simulated {
} else if alert.Simulated != nil && *alert.Simulated {
decision.Simulated = new(bool)
*decision.Simulated = true
}
@ -116,7 +116,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod
if refDecision.Scope != nil && *refDecision.Scope != "" {
*decision.Scope = *refDecision.Scope
} else {
*decision.Scope = *Alert.Source.Scope
*decision.Scope = *alert.Source.Scope
}
/*some fields are populated from the reference object : duration, scope, type*/
@ -125,19 +125,19 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod
*decision.Duration = *refDecision.Duration
}
if Profile.Cfg.DurationExpr != "" && Profile.RuntimeDurationExpr != nil {
if profile.Cfg.DurationExpr != "" && profile.RuntimeDurationExpr != nil {
profileDebug := false
if Profile.Cfg.Debug != nil && *Profile.Cfg.Debug {
if profile.Cfg.Debug != nil && *profile.Cfg.Debug {
profileDebug = true
}
duration, err := exprhelpers.Run(Profile.RuntimeDurationExpr, map[string]interface{}{"Alert": Alert}, Profile.Logger, profileDebug)
duration, err := exprhelpers.Run(profile.RuntimeDurationExpr, map[string]interface{}{"Alert": alert}, profile.Logger, profileDebug)
if err != nil {
Profile.Logger.Warningf("Failed to run duration_expr : %v", err)
profile.Logger.Warningf("Failed to run duration_expr : %v", err)
} else {
durationStr := fmt.Sprint(duration)
if _, err := time.ParseDuration(durationStr); err != nil {
Profile.Logger.Warningf("Failed to parse expr duration result '%s'", duration)
profile.Logger.Warningf("Failed to parse expr duration result '%s'", duration)
} else {
*decision.Duration = durationStr
}
@ -149,7 +149,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod
/*for the others, let's populate it from the alert and its source*/
decision.Value = new(string)
*decision.Value = *Alert.Source.Value
*decision.Value = *alert.Source.Value
decision.Origin = new(string)
*decision.Origin = types.CrowdSecOrigin
@ -158,7 +158,7 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod
}
decision.Scenario = new(string)
*decision.Scenario = *Alert.Scenario
*decision.Scenario = *alert.Scenario
decisions = append(decisions, &decision)
}
@ -166,21 +166,21 @@ func (Profile *Runtime) GenerateDecisionFromProfile(Alert *models.Alert) ([]*mod
}
// EvaluateProfile is going to evaluate an Alert against a profile to generate Decisions
func (Profile *Runtime) EvaluateProfile(Alert *models.Alert) ([]*models.Decision, bool, error) {
func (profile *Runtime) EvaluateProfile(alert *models.Alert) ([]*models.Decision, bool, error) {
var decisions []*models.Decision
matched := false
for eIdx, expression := range Profile.RuntimeFilters {
for eIdx, expression := range profile.RuntimeFilters {
debugProfile := false
if Profile.Cfg.Debug != nil && *Profile.Cfg.Debug {
if profile.Cfg.Debug != nil && *profile.Cfg.Debug {
debugProfile = true
}
output, err := exprhelpers.Run(expression, map[string]interface{}{"Alert": Alert}, Profile.Logger, debugProfile)
output, err := exprhelpers.Run(expression, map[string]interface{}{"Alert": alert}, profile.Logger, debugProfile)
if err != nil {
Profile.Logger.Warningf("failed to run profile expr for %s: %v", Profile.Cfg.Name, err)
return nil, matched, fmt.Errorf("while running expression %s: %w", Profile.Cfg.Filters[eIdx], err)
profile.Logger.Warningf("failed to run profile expr for %s: %v", profile.Cfg.Name, err)
return nil, matched, fmt.Errorf("while running expression %s: %w", profile.Cfg.Filters[eIdx], err)
}
switch out := output.(type) {
@ -188,22 +188,22 @@ func (Profile *Runtime) EvaluateProfile(Alert *models.Alert) ([]*models.Decision
if out {
matched = true
/*the expression matched, create the associated decision*/
subdecisions, err := Profile.GenerateDecisionFromProfile(Alert)
subdecisions, err := profile.GenerateDecisionFromProfile(alert)
if err != nil {
return nil, matched, fmt.Errorf("while generating decision from profile %s: %w", Profile.Cfg.Name, err)
return nil, matched, fmt.Errorf("while generating decision from profile %s: %w", profile.Cfg.Name, err)
}
decisions = append(decisions, subdecisions...)
} else {
Profile.Logger.Debugf("Profile %s filter is unsuccessful", Profile.Cfg.Name)
profile.Logger.Debugf("Profile %s filter is unsuccessful", profile.Cfg.Name)
if Profile.Cfg.OnFailure == "break" {
if profile.Cfg.OnFailure == "break" {
break
}
}
default:
return nil, matched, fmt.Errorf("unexpected type %t (%v) while running '%s'", output, output, Profile.Cfg.Filters[eIdx])
return nil, matched, fmt.Errorf("unexpected type %t (%v) while running '%s'", output, output, profile.Cfg.Filters[eIdx])
}
}

View file

@ -222,7 +222,7 @@ func (c *Client) FlushAgentsAndBouncers(ctx context.Context, agentsCfg *csconfig
return nil
}
func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) error {
func (c *Client) FlushAlerts(ctx context.Context, maxAge string, maxItems int) error {
var (
deletedByAge int
deletedByNbItem int
@ -247,22 +247,22 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e
c.Log.Debugf("FlushAlerts (Total alerts): %d", totalAlerts)
if MaxAge != "" {
if maxAge != "" {
filter := map[string][]string{
"created_before": {MaxAge},
"created_before": {maxAge},
}
nbDeleted, err := c.DeleteAlertWithFilter(ctx, filter)
if err != nil {
c.Log.Warningf("FlushAlerts (max age): %s", err)
return fmt.Errorf("unable to flush alerts with filter until=%s: %w", MaxAge, err)
return fmt.Errorf("unable to flush alerts with filter until=%s: %w", maxAge, err)
}
c.Log.Debugf("FlushAlerts (deleted max age alerts): %d", nbDeleted)
deletedByAge = nbDeleted
}
if MaxItems > 0 {
if maxItems > 0 {
// We get the highest id for the alerts
// We subtract MaxItems to avoid deleting alerts that are not old enough
// This gives us the oldest alert that we want to keep
@ -282,7 +282,7 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e
}
if len(lastAlert) != 0 {
maxid := lastAlert[0].ID - MaxItems
maxid := lastAlert[0].ID - maxItems
c.Log.Debugf("FlushAlerts (max id): %d", maxid)
@ -299,12 +299,12 @@ func (c *Client) FlushAlerts(ctx context.Context, MaxAge string, MaxItems int) e
if deletedByNbItem > 0 {
c.Log.Infof("flushed %d/%d alerts because the max number of alerts has been reached (%d max)",
deletedByNbItem, totalAlerts, MaxItems)
deletedByNbItem, totalAlerts, maxItems)
}
if deletedByAge > 0 {
c.Log.Infof("flushed %d/%d alerts because they were created %s ago or more",
deletedByAge, totalAlerts, MaxAge)
deletedByAge, totalAlerts, maxAge)
}
return nil

View file

@ -29,29 +29,29 @@ var (
var ctiClient *cticlient.CrowdsecCTIClient
func InitCrowdsecCTI(Key *string, TTL *time.Duration, Size *int, LogLevel *log.Level) error {
if Key == nil || *Key == "" {
func InitCrowdsecCTI(key *string, ttl *time.Duration, size *int, logLevel *log.Level) error {
if key == nil || *key == "" {
log.Warningf("CTI API key not set or empty, CTI will not be available")
return cticlient.ErrDisabled
}
CTIApiKey = *Key
if Size == nil {
Size = new(int)
*Size = 1000
CTIApiKey = *key
if size == nil {
size = new(int)
*size = 1000
}
if TTL == nil {
TTL = new(time.Duration)
*TTL = 5 * time.Minute
if ttl == nil {
ttl = new(time.Duration)
*ttl = 5 * time.Minute
}
clog := log.New()
if err := types.ConfigureLogger(clog); err != nil {
return fmt.Errorf("while configuring datasource logger: %w", err)
}
if LogLevel != nil {
clog.SetLevel(*LogLevel)
if logLevel != nil {
clog.SetLevel(*logLevel)
}
subLogger := clog.WithField("type", "crowdsec-cti")
CrowdsecCTIInitCache(*Size, *TTL)
CrowdsecCTIInitCache(*size, *ttl)
ctiClient = cticlient.NewCrowdsecCTIClient(cticlient.WithAPIKey(CTIApiKey), cticlient.WithLogger(subLogger))
CTIApiEnabled = true
return nil

View file

@ -129,32 +129,34 @@ func Init(databaseClient *database.Client) error {
dataFileRegex = make(map[string][]*regexp.Regexp)
dataFileRe2 = make(map[string][]*re2.Regexp)
dbClient = databaseClient
XMLCacheInit()
return nil
}
func RegexpCacheInit(filename string, CacheCfg types.DataSource) error {
func RegexpCacheInit(filename string, cacheCfg types.DataSource) error {
// cache is explicitly disabled
if CacheCfg.Cache != nil && !*CacheCfg.Cache {
if cacheCfg.Cache != nil && !*cacheCfg.Cache {
return nil
}
// cache is implicitly disabled if no cache config is provided
if CacheCfg.Strategy == nil && CacheCfg.TTL == nil && CacheCfg.Size == nil {
if cacheCfg.Strategy == nil && cacheCfg.TTL == nil && cacheCfg.Size == nil {
return nil
}
// cache is enabled
if CacheCfg.Size == nil {
CacheCfg.Size = ptr.Of(50)
if cacheCfg.Size == nil {
cacheCfg.Size = ptr.Of(50)
}
gc := gcache.New(*CacheCfg.Size)
gc := gcache.New(*cacheCfg.Size)
if CacheCfg.Strategy == nil {
CacheCfg.Strategy = ptr.Of("LRU")
if cacheCfg.Strategy == nil {
cacheCfg.Strategy = ptr.Of("LRU")
}
switch *CacheCfg.Strategy {
switch *cacheCfg.Strategy {
case "LRU":
gc = gc.LRU()
case "LFU":
@ -162,11 +164,11 @@ func RegexpCacheInit(filename string, CacheCfg types.DataSource) error {
case "ARC":
gc = gc.ARC()
default:
return fmt.Errorf("unknown cache strategy '%s'", *CacheCfg.Strategy)
return fmt.Errorf("unknown cache strategy '%s'", *cacheCfg.Strategy)
}
if CacheCfg.TTL != nil {
gc.Expiration(*CacheCfg.TTL)
if cacheCfg.TTL != nil {
gc.Expiration(*cacheCfg.TTL)
}
cache := gc.Build()
@ -240,6 +242,7 @@ func Distinct(params ...any) (any, error) {
if rt := reflect.TypeOf(params[0]).Kind(); rt != reflect.Slice && rt != reflect.Array {
return nil, nil
}
array := params[0].([]interface{})
if array == nil {
return []interface{}{}, nil
@ -254,6 +257,7 @@ func Distinct(params ...any) (any, error) {
ret = append(ret, val)
}
}
return ret, nil
}
@ -282,8 +286,10 @@ func flatten(args []interface{}, v reflect.Value) []interface{} {
}
func existsInFileMaps(filename string, ftype string) (bool, error) {
ok := false
var err error
ok := false
switch ftype {
case "regex", "regexp":
if fflag.Re2RegexpInfileSupport.IsEnabled() {
@ -296,10 +302,11 @@ func existsInFileMaps(filename string, ftype string) (bool, error) {
default:
err = fmt.Errorf("unknown data type '%s' for : '%s'", ftype, filename)
}
return ok, err
}
//Expr helpers
// Expr helpers
// func Get(arr []string, index int) string {
func Get(params ...any) (any, error) {
@ -315,10 +322,12 @@ func Get(params ...any) (any, error) {
func Atof(params ...any) (any, error) {
x := params[0].(string)
log.Debugf("debug atof %s", x)
ret, err := strconv.ParseFloat(x, 64)
if err != nil {
log.Warningf("Atof : can't convert float '%s' : %v", x, err)
}
return ret, nil
}
@ -340,22 +349,28 @@ func Distance(params ...any) (any, error) {
long1 := params[1].(string)
lat2 := params[2].(string)
long2 := params[3].(string)
lat1f, err := strconv.ParseFloat(lat1, 64)
if err != nil {
log.Warningf("lat1 is not a float : %v", err)
return 0.0, fmt.Errorf("lat1 is not a float : %v", err)
}
long1f, err := strconv.ParseFloat(long1, 64)
if err != nil {
log.Warningf("long1 is not a float : %v", err)
return 0.0, fmt.Errorf("long1 is not a float : %v", err)
}
lat2f, err := strconv.ParseFloat(lat2, 64)
if err != nil {
log.Warningf("lat2 is not a float : %v", err)
return 0.0, fmt.Errorf("lat2 is not a float : %v", err)
}
long2f, err := strconv.ParseFloat(long2, 64)
if err != nil {
log.Warningf("long2 is not a float : %v", err)
@ -363,7 +378,7 @@ func Distance(params ...any) (any, error) {
return 0.0, fmt.Errorf("long2 is not a float : %v", err)
}
//either set of coordinates is 0,0, return 0 to avoid FPs
// either set of coordinates is 0,0, return 0 to avoid FPs
if (lat1f == 0.0 && long1f == 0.0) || (lat2f == 0.0 && long2f == 0.0) {
log.Warningf("one of the coordinates is 0,0, returning 0")
return 0.0, nil
@ -373,6 +388,7 @@ func Distance(params ...any) (any, error) {
second := haversine.Coord{Lat: lat2f, Lon: long2f}
_, km := haversine.Distance(first, second)
return km, nil
}

View file

@ -31,9 +31,9 @@ type BayesianBucket struct {
DumbProcessor
}
func updateProbability(prior, probGivenEvil, ProbGivenBenign float32) float32 {
func updateProbability(prior, probGivenEvil, probGivenBenign float32) float32 {
numerator := probGivenEvil * prior
denominator := numerator + ProbGivenBenign*(1-prior)
denominator := numerator + probGivenBenign*(1-prior)
return numerator / denominator
}

View file

@ -36,10 +36,10 @@ func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error) {
return &u, nil
}
func (u *OverflowFilter) OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) {
func (u *OverflowFilter) OnBucketOverflow(bucket *BucketFactory) func(*Leaky, types.RuntimeAlert, *types.Queue) (types.RuntimeAlert, *types.Queue) {
return func(l *Leaky, s types.RuntimeAlert, q *types.Queue) (types.RuntimeAlert, *types.Queue) {
el, err := exprhelpers.Run(u.FilterRuntime, map[string]interface{}{
"queue": q, "signal": s, "leaky": l}, l.logger, Bucket.Debug)
"queue": q, "signal": s, "leaky": l}, l.logger, bucket.Debug)
if err != nil {
l.logger.Errorf("Failed running overflow filter: %s", err)
return s, q