crowdsec/cmd/crowdsec/pour.go
2025-01-14 10:48:05 +00:00

69 lines
1.9 KiB
Go

package main
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
leaky "github.com/crowdsecurity/crowdsec/pkg/leakybucket"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
func runPour(input chan types.Event, holders []leaky.BucketFactory, buckets *leaky.Buckets, cConfig *csconfig.Config) error {
count := 0
for {
// bucket is now ready
select {
case <-bucketsTomb.Dying():
log.Infof("Bucket routine exiting")
return nil
case parsed := <-input:
startTime := time.Now()
count++
if count%5000 == 0 {
log.Infof("%d existing buckets", leaky.LeakyRoutineCount)
// when in forensics mode, garbage collect buckets
if cConfig.Crowdsec.BucketsGCEnabled {
if parsed.MarshaledTime != "" {
z := &time.Time{}
if err := z.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("Failed to parse time from event '%s' : %s", parsed.MarshaledTime, err)
} else {
log.Warning("Starting buckets garbage collection ...")
if err = leaky.GarbageCollectBuckets(*z, buckets); err != nil {
return fmt.Errorf("failed to start bucket GC : %w", err)
}
}
}
}
}
// here we can bucketify with parsed
poured, err := leaky.PourItemToHolders(parsed, holders, buckets)
if err != nil {
log.Errorf("bucketify failed for: %v with %s", parsed, err)
continue
}
elapsed := time.Since(startTime)
globalPourHistogram.With(prometheus.Labels{"type": parsed.Line.Module, "source": parsed.Line.Src}).Observe(elapsed.Seconds())
if poured {
globalBucketPourOk.Inc()
} else {
globalBucketPourKo.Inc()
}
if parsed.MarshaledTime != "" {
if err := lastProcessedItem.UnmarshalText([]byte(parsed.MarshaledTime)); err != nil {
log.Warningf("failed to parse time from event : %s", err)
}
}
}
}
}