mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-11 12:25:53 +02:00
refactor pkg/database/Client.createAlertChunk() (#3585)
This commit is contained in:
parent
201aebaac2
commit
d10067e772
2 changed files with 217 additions and 178 deletions
|
@ -218,7 +218,7 @@ linters:
|
||||||
- name: cyclomatic
|
- name: cyclomatic
|
||||||
arguments:
|
arguments:
|
||||||
# lower this after refactoring
|
# lower this after refactoring
|
||||||
- 39
|
- 38
|
||||||
- name: defer
|
- name: defer
|
||||||
disabled: true
|
disabled: true
|
||||||
- name: empty-block
|
- name: empty-block
|
||||||
|
|
|
@ -426,160 +426,238 @@ func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAt
|
||||||
return ret, nil
|
return ret, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func parseAlertTimes(alert *models.Alert, logger log.FieldLogger) (time.Time, time.Time) {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
|
||||||
|
start, err := time.Parse(time.RFC3339, *alert.StartAt)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("creating alert: Failed to parse startAtTime '%s', defaulting to now: %s", *alert.StartAt, err)
|
||||||
|
|
||||||
|
start = now
|
||||||
|
}
|
||||||
|
|
||||||
|
stop, err := time.Parse(time.RFC3339, *alert.StopAt)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("creating alert: Failed to parse stopAtTime '%s', defaulting to now: %s", *alert.StopAt, err)
|
||||||
|
|
||||||
|
stop = now
|
||||||
|
}
|
||||||
|
|
||||||
|
return start, stop
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildEventCreates(ctx context.Context, logger log.FieldLogger, client *ent.Client, machineID string, alertItem *models.Alert) ([]*ent.Event, error) {
|
||||||
|
// let's track when we strip or drop data, notify outside of loop to avoid spam
|
||||||
|
stripped := false
|
||||||
|
dropped := false
|
||||||
|
|
||||||
|
if len(alertItem.Events) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
|
||||||
|
|
||||||
|
for i, eventItem := range alertItem.Events {
|
||||||
|
ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
|
||||||
|
if err != nil {
|
||||||
|
logger.Errorf("creating alert: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)
|
||||||
|
|
||||||
|
ts = time.Now().UTC()
|
||||||
|
}
|
||||||
|
|
||||||
|
marshallMetas, err := json.Marshal(eventItem.Meta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// the serialized field is too big, let's try to progressively strip it
|
||||||
|
if event.SerializedValidator(string(marshallMetas)) != nil {
|
||||||
|
stripped = true
|
||||||
|
|
||||||
|
valid := false
|
||||||
|
stripSize := 2048
|
||||||
|
|
||||||
|
for !valid && stripSize > 0 {
|
||||||
|
for _, serializedItem := range eventItem.Meta {
|
||||||
|
if len(serializedItem.Value) > stripSize*2 {
|
||||||
|
serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
marshallMetas, err = json.Marshal(eventItem.Meta)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if event.SerializedValidator(string(marshallMetas)) == nil {
|
||||||
|
valid = true
|
||||||
|
}
|
||||||
|
|
||||||
|
stripSize /= 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// nothing worked, drop it
|
||||||
|
if !valid {
|
||||||
|
dropped = true
|
||||||
|
stripped = false
|
||||||
|
marshallMetas = []byte("")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eventBulk[i] = client.Event.Create().
|
||||||
|
SetTime(ts).
|
||||||
|
SetSerialized(string(marshallMetas))
|
||||||
|
}
|
||||||
|
|
||||||
|
if stripped {
|
||||||
|
logger.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario)
|
||||||
|
}
|
||||||
|
|
||||||
|
if dropped {
|
||||||
|
logger.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.Event.CreateBulk(eventBulk...).Save(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.Client, alertItem *models.Alert) ([]*ent.Meta, error) {
|
||||||
|
if len(alertItem.Meta) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
|
||||||
|
|
||||||
|
for i, metaItem := range alertItem.Meta {
|
||||||
|
key := metaItem.Key
|
||||||
|
value := metaItem.Value
|
||||||
|
|
||||||
|
if len(metaItem.Value) > 4095 {
|
||||||
|
logger.Warningf("truncated meta %s: value too long", metaItem.Key)
|
||||||
|
|
||||||
|
value = value[:4095]
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(metaItem.Key) > 255 {
|
||||||
|
logger.Warningf("truncated meta %s: key too long", metaItem.Key)
|
||||||
|
|
||||||
|
key = key[:255]
|
||||||
|
}
|
||||||
|
|
||||||
|
metaBulk[i] = client.Meta.Create().
|
||||||
|
SetKey(key).
|
||||||
|
SetValue(value)
|
||||||
|
}
|
||||||
|
|
||||||
|
return client.Meta.CreateBulk(metaBulk...).Save(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) {
|
||||||
|
decisions := []*ent.Decision{}
|
||||||
|
|
||||||
|
decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize)
|
||||||
|
for _, decisionChunk := range decisionChunks {
|
||||||
|
decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
|
||||||
|
if err != nil {
|
||||||
|
return nil, 0, fmt.Errorf("creating alert decisions: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
decisions = append(decisions, decisionRet...)
|
||||||
|
}
|
||||||
|
|
||||||
|
discarded := len(alertItem.Decisions) - len(decisions)
|
||||||
|
if discarded > 0 {
|
||||||
|
logger.Warningf("discarded %d decisions for %s", discarded, alertItem.UUID)
|
||||||
|
}
|
||||||
|
|
||||||
|
return decisions, discarded, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func retryOnBusy(fn func() error) error {
|
||||||
|
for retry := range maxLockRetries {
|
||||||
|
err := fn()
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
var sqliteErr sqlite3.Error
|
||||||
|
if errors.As(err, &sqliteErr) && sqliteErr.Code == sqlite3.ErrBusy {
|
||||||
|
// sqlite3.Error{
|
||||||
|
// Code: 5,
|
||||||
|
// ExtendedCode: 5,
|
||||||
|
// SystemErrno: 0,
|
||||||
|
// err: "database is locked",
|
||||||
|
// }
|
||||||
|
log.Warningf("while updating decisions, sqlite3.ErrBusy: %s, retry %d of %d", err, retry, maxLockRetries)
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("exceeded %d busy retries", maxLockRetries)
|
||||||
|
}
|
||||||
|
|
||||||
|
func saveAlerts(ctx context.Context, c *Client, alertBuilders []*ent.AlertCreate, alertDecisions [][]*ent.Decision) ([]string, error) {
|
||||||
|
alertsCreateBulk, err := c.Ent.Alert.CreateBulk(alertBuilders...).Save(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := make([]string, len(alertsCreateBulk))
|
||||||
|
for i, a := range alertsCreateBulk {
|
||||||
|
ret[i] = strconv.Itoa(a.ID)
|
||||||
|
|
||||||
|
d := alertDecisions[i]
|
||||||
|
decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)
|
||||||
|
|
||||||
|
for _, d2 := range decisionsChunk {
|
||||||
|
if err := retryOnBusy(func() error {
|
||||||
|
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
|
||||||
|
return err
|
||||||
|
}); err != nil {
|
||||||
|
return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
|
func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) {
|
||||||
alertBuilders := []*ent.AlertCreate{}
|
alertBuilders := []*ent.AlertCreate{}
|
||||||
alertDecisions := [][]*ent.Decision{}
|
alertDecisions := [][]*ent.Decision{}
|
||||||
|
|
||||||
for _, alertItem := range alerts {
|
for _, alertItem := range alerts {
|
||||||
var (
|
var err error
|
||||||
metas []*ent.Meta
|
|
||||||
events []*ent.Event
|
|
||||||
)
|
|
||||||
|
|
||||||
startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt)
|
startAtTime, stopAtTime := parseAlertTimes(alertItem, c.Log)
|
||||||
if err != nil {
|
|
||||||
c.Log.Errorf("creating alert: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err)
|
|
||||||
|
|
||||||
startAtTime = time.Now().UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt)
|
|
||||||
if err != nil {
|
|
||||||
c.Log.Errorf("creating alert: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err)
|
|
||||||
|
|
||||||
stopAtTime = time.Now().UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
/*display proper alert in logs*/
|
/*display proper alert in logs*/
|
||||||
for _, disp := range alertItem.FormatAsStrings(machineID, log.StandardLogger()) {
|
for _, disp := range alertItem.FormatAsStrings(machineID, log.StandardLogger()) {
|
||||||
c.Log.Info(disp)
|
c.Log.Info(disp)
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's track when we strip or drop data, notify outside of loop to avoid spam
|
events, err := buildEventCreates(ctx, c.Log, c.Ent, machineID, alertItem)
|
||||||
stripped := false
|
if err != nil {
|
||||||
dropped := false
|
return nil, fmt.Errorf("building events for alert %s: %w", alertItem.UUID, err)
|
||||||
|
|
||||||
if len(alertItem.Events) > 0 {
|
|
||||||
eventBulk := make([]*ent.EventCreate, len(alertItem.Events))
|
|
||||||
|
|
||||||
for i, eventItem := range alertItem.Events {
|
|
||||||
ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp)
|
|
||||||
if err != nil {
|
|
||||||
c.Log.Errorf("creating alert: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err)
|
|
||||||
|
|
||||||
ts = time.Now().UTC()
|
|
||||||
}
|
|
||||||
|
|
||||||
marshallMetas, err := json.Marshal(eventItem.Meta)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// the serialized field is too big, let's try to progressively strip it
|
|
||||||
if event.SerializedValidator(string(marshallMetas)) != nil {
|
|
||||||
stripped = true
|
|
||||||
|
|
||||||
valid := false
|
|
||||||
stripSize := 2048
|
|
||||||
|
|
||||||
for !valid && stripSize > 0 {
|
|
||||||
for _, serializedItem := range eventItem.Meta {
|
|
||||||
if len(serializedItem.Value) > stripSize*2 {
|
|
||||||
serializedItem.Value = serializedItem.Value[:stripSize] + "<stripped>"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
marshallMetas, err = json.Marshal(eventItem.Meta)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if event.SerializedValidator(string(marshallMetas)) == nil {
|
|
||||||
valid = true
|
|
||||||
}
|
|
||||||
|
|
||||||
stripSize /= 2
|
|
||||||
}
|
|
||||||
|
|
||||||
// nothing worked, drop it
|
|
||||||
if !valid {
|
|
||||||
dropped = true
|
|
||||||
stripped = false
|
|
||||||
marshallMetas = []byte("")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
eventBulk[i] = c.Ent.Event.Create().
|
|
||||||
SetTime(ts).
|
|
||||||
SetSerialized(string(marshallMetas))
|
|
||||||
}
|
|
||||||
|
|
||||||
if stripped {
|
|
||||||
c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario)
|
|
||||||
}
|
|
||||||
|
|
||||||
if dropped {
|
|
||||||
c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario)
|
|
||||||
}
|
|
||||||
|
|
||||||
events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(ctx)
|
|
||||||
if err != nil {
|
|
||||||
return nil, errors.Wrapf(BulkError, "creating alert events: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(alertItem.Meta) > 0 {
|
metas, err := buildMetaCreates(ctx, c.Log, c.Ent, alertItem)
|
||||||
metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta))
|
if err != nil {
|
||||||
|
c.Log.Warningf("error creating alert meta: %s", err)
|
||||||
for i, metaItem := range alertItem.Meta {
|
|
||||||
key := metaItem.Key
|
|
||||||
value := metaItem.Value
|
|
||||||
|
|
||||||
if len(metaItem.Value) > 4095 {
|
|
||||||
c.Log.Warningf("truncated meta %s: value too long", metaItem.Key)
|
|
||||||
|
|
||||||
value = value[:4095]
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(metaItem.Key) > 255 {
|
|
||||||
c.Log.Warningf("truncated meta %s: key too long", metaItem.Key)
|
|
||||||
|
|
||||||
key = key[:255]
|
|
||||||
}
|
|
||||||
|
|
||||||
metaBulk[i] = c.Ent.Meta.Create().
|
|
||||||
SetKey(key).
|
|
||||||
SetValue(value)
|
|
||||||
}
|
|
||||||
|
|
||||||
metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(ctx)
|
|
||||||
if err != nil {
|
|
||||||
c.Log.Warningf("error creating alert meta: %s", err)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
decisions := []*ent.Decision{}
|
decisions, discardCount, err := buildDecisions(ctx, c.Log, c, alertItem, stopAtTime)
|
||||||
|
if err != nil {
|
||||||
decisionChunks := slicetools.Chunks(alertItem.Decisions, c.decisionBulkSize)
|
return nil, fmt.Errorf("building decisions for alert %s: %w", alertItem.UUID, err)
|
||||||
for _, decisionChunk := range decisionChunks {
|
|
||||||
decisionRet, err := c.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("creating alert decisions: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
decisions = append(decisions, decisionRet...)
|
|
||||||
}
|
|
||||||
|
|
||||||
discarded := len(alertItem.Decisions) - len(decisions)
|
|
||||||
if discarded > 0 {
|
|
||||||
c.Log.Warningf("discarded %d decisions for %s", discarded, alertItem.UUID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// if all decisions were discarded, discard the alert too
|
// if all decisions were discarded, discard the alert too
|
||||||
if discarded > 0 && len(decisions) == 0 {
|
if discardCount > 0 && len(decisions) == 0 {
|
||||||
c.Log.Warningf("dropping alert %s with invalid decisions", alertItem.UUID)
|
c.Log.Warningf("dropping alert %s: all decisions invalid", alertItem.UUID)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
alertBuilder := c.Ent.Alert.
|
alertBuilder := c.Ent.Alert.
|
||||||
Create().
|
Create().
|
||||||
|
@ -620,51 +698,12 @@ func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
alertsCreateBulk, err := c.Ent.Alert.CreateBulk(alertBuilders...).Save(ctx)
|
// Save alerts, then attach decisions with retry logic
|
||||||
|
ids, err := saveAlerts(ctx, c, alertBuilders, alertDecisions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, errors.Wrapf(BulkError, "bulk creating alert : %s", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
return ids, nil
|
||||||
ret := make([]string, len(alertsCreateBulk))
|
|
||||||
for i, a := range alertsCreateBulk {
|
|
||||||
ret[i] = strconv.Itoa(a.ID)
|
|
||||||
|
|
||||||
d := alertDecisions[i]
|
|
||||||
decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize)
|
|
||||||
|
|
||||||
for _, d2 := range decisionsChunk {
|
|
||||||
retry := 0
|
|
||||||
|
|
||||||
for retry < maxLockRetries {
|
|
||||||
// so much for the happy path... but sqlite3 errors work differently
|
|
||||||
_, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx)
|
|
||||||
if err == nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
var sqliteErr sqlite3.Error
|
|
||||||
if errors.As(err, &sqliteErr) {
|
|
||||||
if sqliteErr.Code == sqlite3.ErrBusy {
|
|
||||||
// sqlite3.Error{
|
|
||||||
// Code: 5,
|
|
||||||
// ExtendedCode: 5,
|
|
||||||
// SystemErrno: 0,
|
|
||||||
// err: "database is locked",
|
|
||||||
// }
|
|
||||||
retry++
|
|
||||||
log.Warningf("while updating decisions, sqlite3.ErrBusy: %s, retry %d of %d", err, retry, maxLockRetries)
|
|
||||||
time.Sleep(1 * time.Second)
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil, fmt.Errorf("error while updating decisions: %w", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []*models.Alert) ([]string, error) {
|
func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []*models.Alert) ([]string, error) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue