mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-10 20:05:55 +02:00
loop performance optimizations / 1 (#3313)
* rangeValCopy: each iteration copies 248 bytes * rangeValCopy: each iteration copies 576 bytes * rangeValCopy: each iteration copies 376 bytes * rangeValCopy: each iteration copies 312 bytes * enable linter: gocritic/rangeValCopy
This commit is contained in:
parent
7a1ad8376a
commit
411bb48a81
6 changed files with 89 additions and 37 deletions
|
@ -183,7 +183,6 @@ linters-settings:
|
|||
- ifElseChain
|
||||
- importShadow
|
||||
- hugeParam
|
||||
- rangeValCopy
|
||||
- commentedOutCode
|
||||
- commentedOutImport
|
||||
- unnamedResult
|
||||
|
@ -465,3 +464,23 @@ issues:
|
|||
- recvcheck
|
||||
path: "pkg/cwhub/item.go"
|
||||
text: 'the methods of "Item" use pointer receiver and non-pointer receiver.'
|
||||
|
||||
- linters:
|
||||
- gocritic
|
||||
path: "cmd/crowdsec-cli"
|
||||
text: "rangeValCopy: .*"
|
||||
|
||||
- linters:
|
||||
- gocritic
|
||||
path: "pkg/(cticlient|hubtest)"
|
||||
text: "rangeValCopy: .*"
|
||||
|
||||
- linters:
|
||||
- gocritic
|
||||
path: "(.+)_test.go"
|
||||
text: "rangeValCopy: .*"
|
||||
|
||||
- linters:
|
||||
- gocritic
|
||||
path: "pkg/(appsec|acquisition|dumps|alertcontext|leakybucket|exprhelpers)"
|
||||
text: "rangeValCopy: .*"
|
||||
|
|
|
@ -66,6 +66,7 @@ func (ka *KubernetesAuditSource) GetAggregMetrics() []prometheus.Collector {
|
|||
|
||||
func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error {
|
||||
k8sConfig := KubernetesAuditConfiguration{}
|
||||
|
||||
err := yaml.UnmarshalStrict(yamlConfig, &k8sConfig)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse k8s-audit configuration: %w", err)
|
||||
|
@ -92,6 +93,7 @@ func (ka *KubernetesAuditSource) UnmarshalConfig(yamlConfig []byte) error {
|
|||
if ka.config.Mode == "" {
|
||||
ka.config.Mode = configuration.TAIL_MODE
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -116,6 +118,7 @@ func (ka *KubernetesAuditSource) Configure(config []byte, logger *log.Entry, Met
|
|||
}
|
||||
|
||||
ka.mux.HandleFunc(ka.config.WebhookPath, ka.webhookHandler)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -137,6 +140,7 @@ func (ka *KubernetesAuditSource) OneShotAcquisition(_ context.Context, _ chan ty
|
|||
|
||||
func (ka *KubernetesAuditSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
|
||||
ka.outChan = out
|
||||
|
||||
t.Go(func() error {
|
||||
defer trace.CatchPanic("crowdsec/acquis/k8s-audit/live")
|
||||
ka.logger.Infof("Starting k8s-audit server on %s:%d%s", ka.config.ListenAddr, ka.config.ListenPort, ka.config.WebhookPath)
|
||||
|
@ -145,13 +149,16 @@ func (ka *KubernetesAuditSource) StreamingAcquisition(ctx context.Context, out c
|
|||
if err != nil && err != http.ErrServerClosed {
|
||||
return fmt.Errorf("k8s-audit server failed: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
<-t.Dying()
|
||||
ka.logger.Infof("Stopping k8s-audit server on %s:%d%s", ka.config.ListenAddr, ka.config.ListenPort, ka.config.WebhookPath)
|
||||
ka.server.Shutdown(ctx)
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -167,42 +174,52 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R
|
|||
if ka.metricsLevel != configuration.METRICS_NONE {
|
||||
requestCount.WithLabelValues(ka.addr).Inc()
|
||||
}
|
||||
|
||||
if r.Method != http.MethodPost {
|
||||
w.WriteHeader(http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
ka.logger.Tracef("webhookHandler called")
|
||||
|
||||
var auditEvents audit.EventList
|
||||
|
||||
jsonBody, err := io.ReadAll(r.Body)
|
||||
if err != nil {
|
||||
ka.logger.Errorf("Error reading request body: %v", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
ka.logger.Tracef("webhookHandler receveid: %s", string(jsonBody))
|
||||
|
||||
err = json.Unmarshal(jsonBody, &auditEvents)
|
||||
if err != nil {
|
||||
ka.logger.Errorf("Error decoding audit events: %s", err)
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
remoteIP := strings.Split(r.RemoteAddr, ":")[0]
|
||||
for _, auditEvent := range auditEvents.Items {
|
||||
|
||||
for idx := range auditEvents.Items {
|
||||
if ka.metricsLevel != configuration.METRICS_NONE {
|
||||
eventCount.WithLabelValues(ka.addr).Inc()
|
||||
}
|
||||
bytesEvent, err := json.Marshal(auditEvent)
|
||||
|
||||
bytesEvent, err := json.Marshal(auditEvents.Items[idx])
|
||||
if err != nil {
|
||||
ka.logger.Errorf("Error serializing audit event: %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
ka.logger.Tracef("Got audit event: %s", string(bytesEvent))
|
||||
l := types.Line{
|
||||
Raw: string(bytesEvent),
|
||||
Labels: ka.config.Labels,
|
||||
Time: auditEvent.StageTimestamp.Time,
|
||||
Time: auditEvents.Items[idx].StageTimestamp.Time,
|
||||
Src: remoteIP,
|
||||
Process: true,
|
||||
Module: ka.GetName(),
|
||||
|
|
|
@ -198,22 +198,24 @@ func eventSources(evt types.Event, leaky *Leaky) (map[string]models.Source, erro
|
|||
func EventsFromQueue(queue *types.Queue) []*models.Event {
|
||||
events := []*models.Event{}
|
||||
|
||||
for _, evt := range queue.Queue {
|
||||
if evt.Meta == nil {
|
||||
qEvents := queue.GetQueue()
|
||||
|
||||
for idx := range qEvents {
|
||||
if qEvents[idx].Meta == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
meta := models.Meta{}
|
||||
// we want consistence
|
||||
skeys := make([]string, 0, len(evt.Meta))
|
||||
for k := range evt.Meta {
|
||||
skeys := make([]string, 0, len(qEvents[idx].Meta))
|
||||
for k := range qEvents[idx].Meta {
|
||||
skeys = append(skeys, k)
|
||||
}
|
||||
|
||||
sort.Strings(skeys)
|
||||
|
||||
for _, k := range skeys {
|
||||
v := evt.Meta[k]
|
||||
v := qEvents[idx].Meta[k]
|
||||
subMeta := models.MetaItems0{Key: k, Value: v}
|
||||
meta = append(meta, &subMeta)
|
||||
}
|
||||
|
@ -223,15 +225,15 @@ func EventsFromQueue(queue *types.Queue) []*models.Event {
|
|||
Meta: meta,
|
||||
}
|
||||
// either MarshaledTime is present and is extracted from log
|
||||
if evt.MarshaledTime != "" {
|
||||
tmpTimeStamp := evt.MarshaledTime
|
||||
if qEvents[idx].MarshaledTime != "" {
|
||||
tmpTimeStamp := qEvents[idx].MarshaledTime
|
||||
ovflwEvent.Timestamp = &tmpTimeStamp
|
||||
} else if !evt.Time.IsZero() { // or .Time has been set during parse as time.Now().UTC()
|
||||
} else if !qEvents[idx].Time.IsZero() { // or .Time has been set during parse as time.Now().UTC()
|
||||
ovflwEvent.Timestamp = new(string)
|
||||
|
||||
raw, err := evt.Time.MarshalText()
|
||||
raw, err := qEvents[idx].Time.MarshalText()
|
||||
if err != nil {
|
||||
log.Warningf("while serializing time '%s' : %s", evt.Time.String(), err)
|
||||
log.Warningf("while serializing time '%s' : %s", qEvents[idx].Time.String(), err)
|
||||
} else {
|
||||
*ovflwEvent.Timestamp = string(raw)
|
||||
}
|
||||
|
@ -253,8 +255,9 @@ func alertFormatSource(leaky *Leaky, queue *types.Queue) (map[string]models.Sour
|
|||
|
||||
log.Debugf("Formatting (%s) - scope Info : scope_type:%s / scope_filter:%s", leaky.Name, leaky.scopeType.Scope, leaky.scopeType.Filter)
|
||||
|
||||
for _, evt := range queue.Queue {
|
||||
srcs, err := SourceFromEvent(evt, leaky)
|
||||
qEvents := queue.GetQueue()
|
||||
for idx := range qEvents {
|
||||
srcs, err := SourceFromEvent(qEvents[idx], leaky)
|
||||
if err != nil {
|
||||
return nil, "", fmt.Errorf("while extracting scope from bucket %s: %w", leaky.Name, err)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package parser
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
|
@ -236,7 +237,7 @@ func (n *Node) processGrok(p *types.Event, cachedExprEnv map[string]any) (bool,
|
|||
case string:
|
||||
gstr = out
|
||||
case int:
|
||||
gstr = fmt.Sprintf("%d", out)
|
||||
gstr = strconv.Itoa(out)
|
||||
case float64, float32:
|
||||
gstr = fmt.Sprintf("%f", out)
|
||||
default:
|
||||
|
@ -357,16 +358,17 @@ func (n *Node) process(p *types.Event, ctx UnixParserCtx, expressionEnv map[stri
|
|||
}
|
||||
|
||||
// Iterate on leafs
|
||||
for _, leaf := range n.LeavesNodes {
|
||||
ret, err := leaf.process(p, ctx, cachedExprEnv)
|
||||
leaves := n.LeavesNodes
|
||||
for idx := range leaves {
|
||||
ret, err := leaves[idx].process(p, ctx, cachedExprEnv)
|
||||
if err != nil {
|
||||
clog.Tracef("\tNode (%s) failed : %v", leaf.rn, err)
|
||||
clog.Tracef("\tNode (%s) failed : %v", leaves[idx].rn, err)
|
||||
clog.Debugf("Event leaving node : ko")
|
||||
|
||||
return false, err
|
||||
}
|
||||
|
||||
clog.Tracef("\tsub-node (%s) ret : %v (strategy:%s)", leaf.rn, ret, n.OnSuccess)
|
||||
clog.Tracef("\tsub-node (%s) ret : %v (strategy:%s)", leaves[idx].rn, ret, n.OnSuccess)
|
||||
|
||||
if ret {
|
||||
NodeState = true
|
||||
|
@ -593,7 +595,7 @@ func (n *Node) compile(pctx *UnixParserCtx, ectx EnricherCtx) error {
|
|||
/* compile leafs if present */
|
||||
for idx := range n.LeavesNodes {
|
||||
if n.LeavesNodes[idx].Name == "" {
|
||||
n.LeavesNodes[idx].Name = fmt.Sprintf("child-%s", n.Name)
|
||||
n.LeavesNodes[idx].Name = "child-" + n.Name
|
||||
}
|
||||
/*propagate debug/stats to child nodes*/
|
||||
if !n.LeavesNodes[idx].Debug && n.Debug {
|
||||
|
|
|
@ -29,10 +29,11 @@ func SetTargetByName(target string, value string, evt *types.Event) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
//it's a hack, we do it for the user
|
||||
// it's a hack, we do it for the user
|
||||
target = strings.TrimPrefix(target, "evt.")
|
||||
|
||||
log.Debugf("setting target %s to %s", target, value)
|
||||
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
log.Errorf("Runtime error while trying to set '%s': %+v", target, r)
|
||||
|
@ -46,6 +47,7 @@ func SetTargetByName(target string, value string, evt *types.Event) bool {
|
|||
//event is nil
|
||||
return false
|
||||
}
|
||||
|
||||
for _, f := range strings.Split(target, ".") {
|
||||
/*
|
||||
** According to current Event layout we only have to handle struct and map
|
||||
|
@ -57,7 +59,9 @@ func SetTargetByName(target string, value string, evt *types.Event) bool {
|
|||
if (tmp == reflect.Value{}) || tmp.IsZero() {
|
||||
log.Debugf("map entry is zero in '%s'", target)
|
||||
}
|
||||
|
||||
iter.SetMapIndex(reflect.ValueOf(f), reflect.ValueOf(value))
|
||||
|
||||
return true
|
||||
case reflect.Struct:
|
||||
tmp := iter.FieldByName(f)
|
||||
|
@ -65,9 +69,11 @@ func SetTargetByName(target string, value string, evt *types.Event) bool {
|
|||
log.Debugf("'%s' is not a valid target because '%s' is not valid", target, f)
|
||||
return false
|
||||
}
|
||||
|
||||
if tmp.Kind() == reflect.Ptr {
|
||||
tmp = reflect.Indirect(tmp)
|
||||
}
|
||||
|
||||
iter = tmp
|
||||
case reflect.Ptr:
|
||||
tmp := iter.Elem()
|
||||
|
@ -82,11 +88,14 @@ func SetTargetByName(target string, value string, evt *types.Event) bool {
|
|||
log.Errorf("'%s' can't be set", target)
|
||||
return false
|
||||
}
|
||||
|
||||
if iter.Kind() != reflect.String {
|
||||
log.Errorf("Expected string, got %v when handling '%s'", iter.Kind(), target)
|
||||
return false
|
||||
}
|
||||
|
||||
iter.Set(reflect.ValueOf(value))
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -321,46 +330,46 @@ func Parse(ctx UnixParserCtx, xp types.Event, nodes []Node) (types.Event, error)
|
|||
}
|
||||
|
||||
isStageOK := false
|
||||
for idx, node := range nodes {
|
||||
for idx := range nodes {
|
||||
//Only process current stage's nodes
|
||||
if event.Stage != node.Stage {
|
||||
if event.Stage != nodes[idx].Stage {
|
||||
continue
|
||||
}
|
||||
clog := log.WithFields(log.Fields{
|
||||
"node-name": node.rn,
|
||||
"node-name": nodes[idx].rn,
|
||||
"stage": event.Stage,
|
||||
})
|
||||
clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), node.rn)
|
||||
clog.Tracef("Processing node %d/%d -> %s", idx, len(nodes), nodes[idx].rn)
|
||||
if ctx.Profiling {
|
||||
node.Profiling = true
|
||||
nodes[idx].Profiling = true
|
||||
}
|
||||
ret, err := node.process(&event, ctx, map[string]interface{}{"evt": &event})
|
||||
ret, err := nodes[idx].process(&event, ctx, map[string]interface{}{"evt": &event})
|
||||
if err != nil {
|
||||
clog.Errorf("Error while processing node : %v", err)
|
||||
return event, err
|
||||
}
|
||||
clog.Tracef("node (%s) ret : %v", node.rn, ret)
|
||||
clog.Tracef("node (%s) ret : %v", nodes[idx].rn, ret)
|
||||
if ParseDump {
|
||||
var parserIdxInStage int
|
||||
StageParseMutex.Lock()
|
||||
if len(StageParseCache[stage][node.Name]) == 0 {
|
||||
StageParseCache[stage][node.Name] = make([]dumps.ParserResult, 0)
|
||||
if len(StageParseCache[stage][nodes[idx].Name]) == 0 {
|
||||
StageParseCache[stage][nodes[idx].Name] = make([]dumps.ParserResult, 0)
|
||||
parserIdxInStage = len(StageParseCache[stage])
|
||||
} else {
|
||||
parserIdxInStage = StageParseCache[stage][node.Name][0].Idx
|
||||
parserIdxInStage = StageParseCache[stage][nodes[idx].Name][0].Idx
|
||||
}
|
||||
StageParseMutex.Unlock()
|
||||
|
||||
evtcopy := deepcopy.Copy(event)
|
||||
parserInfo := dumps.ParserResult{Evt: evtcopy.(types.Event), Success: ret, Idx: parserIdxInStage}
|
||||
StageParseMutex.Lock()
|
||||
StageParseCache[stage][node.Name] = append(StageParseCache[stage][node.Name], parserInfo)
|
||||
StageParseCache[stage][nodes[idx].Name] = append(StageParseCache[stage][nodes[idx].Name], parserInfo)
|
||||
StageParseMutex.Unlock()
|
||||
}
|
||||
if ret {
|
||||
isStageOK = true
|
||||
}
|
||||
if ret && node.OnSuccess == "next_stage" {
|
||||
if ret && nodes[idx].OnSuccess == "next_stage" {
|
||||
clog.Debugf("node successful, stop end stage %s", stage)
|
||||
break
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ func MakeEvent(timeMachine bool, evtType int, process bool) Event {
|
|||
if timeMachine {
|
||||
evt.ExpectMode = TIMEMACHINE
|
||||
}
|
||||
|
||||
return evt
|
||||
}
|
||||
|
||||
|
@ -97,8 +98,9 @@ func (e *Event) GetType() string {
|
|||
|
||||
func (e *Event) GetMeta(key string) string {
|
||||
if e.Type == OVFLW {
|
||||
for _, alert := range e.Overflow.APIAlerts {
|
||||
for _, event := range alert.Events {
|
||||
alerts := e.Overflow.APIAlerts
|
||||
for idx := range alerts {
|
||||
for _, event := range alerts[idx].Events {
|
||||
if event.GetMeta(key) != "" {
|
||||
return event.GetMeta(key)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue