mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-11 12:25:53 +02:00
loop performance optimizations / 2 (#3364)
This commit is contained in:
parent
99552f4a53
commit
2c95a24f69
3 changed files with 36 additions and 34 deletions
|
@ -16,9 +16,7 @@ import (
|
|||
"github.com/crowdsecurity/crowdsec/pkg/types"
|
||||
)
|
||||
|
||||
const (
|
||||
MaxContextValueLen = 4000
|
||||
)
|
||||
const MaxContextValueLen = 4000
|
||||
|
||||
var alertContext = Context{}
|
||||
|
||||
|
@ -34,7 +32,8 @@ func ValidateContextExpr(key string, expressions []string) error {
|
|||
_, err := expr.Compile(expression, exprhelpers.GetExprOptions(map[string]interface{}{
|
||||
"evt": &types.Event{},
|
||||
"match": &types.MatchedRule{},
|
||||
"req": &http.Request{}})...)
|
||||
"req": &http.Request{},
|
||||
})...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("compilation of '%s' failed: %w", expression, err)
|
||||
}
|
||||
|
@ -79,7 +78,8 @@ func NewAlertContext(contextToSend map[string][]string, valueLength int) error {
|
|||
valueCompiled, err := expr.Compile(value, exprhelpers.GetExprOptions(map[string]interface{}{
|
||||
"evt": &types.Event{},
|
||||
"match": &types.MatchedRule{},
|
||||
"req": &http.Request{}})...)
|
||||
"req": &http.Request{},
|
||||
})...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("compilation of '%s' context value failed: %w", value, err)
|
||||
}
|
||||
|
@ -114,6 +114,7 @@ func TruncateContextMap(contextMap map[string][]string, contextValueLen int) ([]
|
|||
}
|
||||
metas = append(metas, &meta)
|
||||
}
|
||||
|
||||
return metas, errors
|
||||
}
|
||||
|
||||
|
@ -150,20 +151,19 @@ func TruncateContext(values []string, contextValueLen int) (string, error) {
|
|||
}
|
||||
|
||||
func EvalAlertContextRules(evt types.Event, match *types.MatchedRule, request *http.Request, tmpContext map[string][]string) []error {
|
||||
|
||||
var errors []error
|
||||
|
||||
//if we're evaluating context for appsec event, match and request will be present.
|
||||
//otherwise, only evt will be.
|
||||
// if we're evaluating context for appsec event, match and request will be present.
|
||||
// otherwise, only evt will be.
|
||||
if match == nil {
|
||||
match = types.NewMatchedRule()
|
||||
}
|
||||
|
||||
if request == nil {
|
||||
request = &http.Request{}
|
||||
}
|
||||
|
||||
for key, values := range alertContext.ContextToSendCompiled {
|
||||
|
||||
if _, ok := tmpContext[key]; !ok {
|
||||
tmpContext[key] = make([]string, 0)
|
||||
}
|
||||
|
@ -176,6 +176,7 @@ func EvalAlertContextRules(evt types.Event, match *types.MatchedRule, request *h
|
|||
errors = append(errors, fmt.Errorf("failed to get value for %s: %w", key, err))
|
||||
continue
|
||||
}
|
||||
|
||||
switch out := output.(type) {
|
||||
case string:
|
||||
val = out
|
||||
|
@ -208,6 +209,7 @@ func EvalAlertContextRules(evt types.Event, match *types.MatchedRule, request *h
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return errors
|
||||
}
|
||||
|
||||
|
@ -237,8 +239,8 @@ func EventToContext(events []types.Event) (models.Meta, []error) {
|
|||
|
||||
tmpContext := make(map[string][]string)
|
||||
|
||||
for _, evt := range events {
|
||||
tmpErrors := EvalAlertContextRules(evt, nil, nil, tmpContext)
|
||||
for i := range events {
|
||||
tmpErrors := EvalAlertContextRules(events[i], nil, nil, tmpContext)
|
||||
errors = append(errors, tmpErrors...)
|
||||
}
|
||||
|
||||
|
|
|
@ -145,25 +145,25 @@ func (t *tree) processEvents(parserResults ParserResults) {
|
|||
}
|
||||
|
||||
func (t *tree) processBuckets(bucketPour BucketPourInfo) {
|
||||
for bname, evtlist := range bucketPour {
|
||||
for _, evt := range evtlist {
|
||||
if evt.Line.Raw == "" {
|
||||
for bname, events := range bucketPour {
|
||||
for i := range events {
|
||||
if events[i].Line.Raw == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// it might be bucket overflow being reprocessed, skip this
|
||||
if _, ok := t.state[evt.Line.Time]; !ok {
|
||||
t.state[evt.Line.Time] = make(map[string]map[string]ParserResult)
|
||||
t.assoc[evt.Line.Time] = evt.Line.Raw
|
||||
if _, ok := t.state[events[i].Line.Time]; !ok {
|
||||
t.state[events[i].Line.Time] = make(map[string]map[string]ParserResult)
|
||||
t.assoc[events[i].Line.Time] = events[i].Line.Raw
|
||||
}
|
||||
|
||||
// there is a trick : to know if an event successfully exit the parsers, we check if it reached the pour() phase
|
||||
// there is a trick: to know if an event successfully exit the parsers, we check if it reached the pour() phase
|
||||
// we thus use a fake stage "buckets" and a fake parser "OK" to know if it entered
|
||||
if _, ok := t.state[evt.Line.Time]["buckets"]; !ok {
|
||||
t.state[evt.Line.Time]["buckets"] = make(map[string]ParserResult)
|
||||
if _, ok := t.state[events[i].Line.Time]["buckets"]; !ok {
|
||||
t.state[events[i].Line.Time]["buckets"] = make(map[string]ParserResult)
|
||||
}
|
||||
|
||||
t.state[evt.Line.Time]["buckets"][bname] = ParserResult{Success: true}
|
||||
t.state[events[i].Line.Time]["buckets"][bname] = ParserResult{Success: true}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -348,7 +348,7 @@ func LoadBucket(bucketFactory *BucketFactory, tomb *tomb.Tomb) error {
|
|||
|
||||
if bucketFactory.Debug {
|
||||
clog := log.New()
|
||||
if err := types.ConfigureLogger(clog); err != nil {
|
||||
if err = types.ConfigureLogger(clog); err != nil {
|
||||
return fmt.Errorf("while creating bucket-specific logger: %w", err)
|
||||
}
|
||||
|
||||
|
@ -496,7 +496,7 @@ func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFac
|
|||
return fmt.Errorf("can't parse state file %s: %w", file, err)
|
||||
}
|
||||
|
||||
for k, v := range state {
|
||||
for k := range state {
|
||||
var tbucket *Leaky
|
||||
|
||||
log.Debugf("Reloading bucket %s", k)
|
||||
|
@ -509,30 +509,30 @@ func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFac
|
|||
found := false
|
||||
|
||||
for _, h := range bucketFactories {
|
||||
if h.Name != v.Name {
|
||||
if h.Name != state[k].Name {
|
||||
continue
|
||||
}
|
||||
|
||||
log.Debugf("found factory %s/%s -> %s", h.Author, h.Name, h.Description)
|
||||
// check in which mode the bucket was
|
||||
if v.Mode == types.TIMEMACHINE {
|
||||
if state[k].Mode == types.TIMEMACHINE {
|
||||
tbucket = NewTimeMachine(h)
|
||||
} else if v.Mode == types.LIVE {
|
||||
} else if state[k].Mode == types.LIVE {
|
||||
tbucket = NewLeaky(h)
|
||||
} else {
|
||||
log.Errorf("Unknown bucket type : %d", v.Mode)
|
||||
log.Errorf("Unknown bucket type : %d", state[k].Mode)
|
||||
}
|
||||
/*Trying to restore queue state*/
|
||||
tbucket.Queue = v.Queue
|
||||
tbucket.Queue = state[k].Queue
|
||||
/*Trying to set the limiter to the saved values*/
|
||||
tbucket.Limiter.Load(v.SerializedState)
|
||||
tbucket.Limiter.Load(state[k].SerializedState)
|
||||
tbucket.In = make(chan *types.Event)
|
||||
tbucket.Mapkey = k
|
||||
tbucket.Signal = make(chan bool, 1)
|
||||
tbucket.First_ts = v.First_ts
|
||||
tbucket.Last_ts = v.Last_ts
|
||||
tbucket.Ovflw_ts = v.Ovflw_ts
|
||||
tbucket.Total_count = v.Total_count
|
||||
tbucket.First_ts = state[k].First_ts
|
||||
tbucket.Last_ts = state[k].Last_ts
|
||||
tbucket.Ovflw_ts = state[k].Ovflw_ts
|
||||
tbucket.Total_count = state[k].Total_count
|
||||
buckets.Bucket_map.Store(k, tbucket)
|
||||
h.tomb.Go(func() error {
|
||||
return LeakRoutine(tbucket)
|
||||
|
@ -545,7 +545,7 @@ func LoadBucketsState(file string, buckets *Buckets, bucketFactories []BucketFac
|
|||
}
|
||||
|
||||
if !found {
|
||||
return fmt.Errorf("unable to find holder for bucket %s: %s", k, spew.Sdump(v))
|
||||
return fmt.Errorf("unable to find holder for bucket %s: %s", k, spew.Sdump(state[k]))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue