From d10067e77213c9c7d36e753733485549d43d4b91 Mon Sep 17 00:00:00 2001 From: mmetc <92726601+mmetc@users.noreply.github.com> Date: Fri, 2 May 2025 14:12:00 +0200 Subject: [PATCH] refactor pkg/database/Client.createAlertChunk() (#3585) --- .golangci.yml | 2 +- pkg/database/alerts.go | 393 ++++++++++++++++++++++------------------- 2 files changed, 217 insertions(+), 178 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index 93ed9b196..c6dac451f 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -218,7 +218,7 @@ linters: - name: cyclomatic arguments: # lower this after refactoring - - 39 + - 38 - name: defer disabled: true - name: empty-block diff --git a/pkg/database/alerts.go b/pkg/database/alerts.go index 025cf0630..6f27b5afe 100644 --- a/pkg/database/alerts.go +++ b/pkg/database/alerts.go @@ -426,160 +426,238 @@ func (c *Client) createDecisionChunk(ctx context.Context, simulated bool, stopAt return ret, nil } +func parseAlertTimes(alert *models.Alert, logger log.FieldLogger) (time.Time, time.Time) { + now := time.Now().UTC() + + start, err := time.Parse(time.RFC3339, *alert.StartAt) + if err != nil { + logger.Errorf("creating alert: Failed to parse startAtTime '%s', defaulting to now: %s", *alert.StartAt, err) + + start = now + } + + stop, err := time.Parse(time.RFC3339, *alert.StopAt) + if err != nil { + logger.Errorf("creating alert: Failed to parse stopAtTime '%s', defaulting to now: %s", *alert.StopAt, err) + + stop = now + } + + return start, stop +} + +func buildEventCreates(ctx context.Context, logger log.FieldLogger, client *ent.Client, machineID string, alertItem *models.Alert) ([]*ent.Event, error) { + // let's track when we strip or drop data, notify outside of loop to avoid spam + stripped := false + dropped := false + + if len(alertItem.Events) == 0 { + return nil, nil + } + + eventBulk := make([]*ent.EventCreate, len(alertItem.Events)) + + for i, eventItem := range alertItem.Events { + ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp) + if err != nil { + logger.Errorf("creating alert: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err) + + ts = time.Now().UTC() + } + + marshallMetas, err := json.Marshal(eventItem.Meta) + if err != nil { + return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err) + } + + // the serialized field is too big, let's try to progressively strip it + if event.SerializedValidator(string(marshallMetas)) != nil { + stripped = true + + valid := false + stripSize := 2048 + + for !valid && stripSize > 0 { + for _, serializedItem := range eventItem.Meta { + if len(serializedItem.Value) > stripSize*2 { + serializedItem.Value = serializedItem.Value[:stripSize] + "" + } + } + + marshallMetas, err = json.Marshal(eventItem.Meta) + if err != nil { + return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err) + } + + if event.SerializedValidator(string(marshallMetas)) == nil { + valid = true + } + + stripSize /= 2 + } + + // nothing worked, drop it + if !valid { + dropped = true + stripped = false + marshallMetas = []byte("") + } + } + + eventBulk[i] = client.Event.Create(). + SetTime(ts). + SetSerialized(string(marshallMetas)) + } + + if stripped { + logger.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario) + } + + if dropped { + logger.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario) + } + + return client.Event.CreateBulk(eventBulk...).Save(ctx) +} + +func buildMetaCreates(ctx context.Context, logger log.FieldLogger, client *ent.Client, alertItem *models.Alert) ([]*ent.Meta, error) { + if len(alertItem.Meta) == 0 { + return nil, nil + } + + metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta)) + + for i, metaItem := range alertItem.Meta { + key := metaItem.Key + value := metaItem.Value + + if len(metaItem.Value) > 4095 { + logger.Warningf("truncated meta %s: value too long", metaItem.Key) + + value = value[:4095] + } + + if len(metaItem.Key) > 255 { + logger.Warningf("truncated meta %s: key too long", metaItem.Key) + + key = key[:255] + } + + metaBulk[i] = client.Meta.Create(). + SetKey(key). + SetValue(value) + } + + return client.Meta.CreateBulk(metaBulk...).Save(ctx) +} + +func buildDecisions(ctx context.Context, logger log.FieldLogger, client *Client, alertItem *models.Alert, stopAtTime time.Time) ([]*ent.Decision, int, error) { + decisions := []*ent.Decision{} + + decisionChunks := slicetools.Chunks(alertItem.Decisions, client.decisionBulkSize) + for _, decisionChunk := range decisionChunks { + decisionRet, err := client.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk) + if err != nil { + return nil, 0, fmt.Errorf("creating alert decisions: %w", err) + } + + decisions = append(decisions, decisionRet...) + } + + discarded := len(alertItem.Decisions) - len(decisions) + if discarded > 0 { + logger.Warningf("discarded %d decisions for %s", discarded, alertItem.UUID) + } + + return decisions, discarded, nil +} + +func retryOnBusy(fn func() error) error { + for retry := range maxLockRetries { + err := fn() + if err == nil { + return nil + } + var sqliteErr sqlite3.Error + if errors.As(err, &sqliteErr) && sqliteErr.Code == sqlite3.ErrBusy { + // sqlite3.Error{ + // Code: 5, + // ExtendedCode: 5, + // SystemErrno: 0, + // err: "database is locked", + // } + log.Warningf("while updating decisions, sqlite3.ErrBusy: %s, retry %d of %d", err, retry, maxLockRetries) + time.Sleep(1 * time.Second) + + continue + } + + return err + } + + return fmt.Errorf("exceeded %d busy retries", maxLockRetries) +} + +func saveAlerts(ctx context.Context, c *Client, alertBuilders []*ent.AlertCreate, alertDecisions [][]*ent.Decision) ([]string, error) { + alertsCreateBulk, err := c.Ent.Alert.CreateBulk(alertBuilders...).Save(ctx) + if err != nil { + return nil, errors.Wrapf(BulkError, "bulk creating alert : %s", err) + } + + ret := make([]string, len(alertsCreateBulk)) + for i, a := range alertsCreateBulk { + ret[i] = strconv.Itoa(a.ID) + + d := alertDecisions[i] + decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize) + + for _, d2 := range decisionsChunk { + if err := retryOnBusy(func() error { + _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx) + return err + }); err != nil { + return nil, fmt.Errorf("attach decisions to alert %d: %w", a.ID, err) + } + } + } + + return ret, nil +} + func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner *ent.Machine, alerts []*models.Alert) ([]string, error) { alertBuilders := []*ent.AlertCreate{} alertDecisions := [][]*ent.Decision{} for _, alertItem := range alerts { - var ( - metas []*ent.Meta - events []*ent.Event - ) + var err error - startAtTime, err := time.Parse(time.RFC3339, *alertItem.StartAt) - if err != nil { - c.Log.Errorf("creating alert: Failed to parse startAtTime '%s', defaulting to now: %s", *alertItem.StartAt, err) - - startAtTime = time.Now().UTC() - } - - stopAtTime, err := time.Parse(time.RFC3339, *alertItem.StopAt) - if err != nil { - c.Log.Errorf("creating alert: Failed to parse stopAtTime '%s', defaulting to now: %s", *alertItem.StopAt, err) - - stopAtTime = time.Now().UTC() - } + startAtTime, stopAtTime := parseAlertTimes(alertItem, c.Log) /*display proper alert in logs*/ for _, disp := range alertItem.FormatAsStrings(machineID, log.StandardLogger()) { c.Log.Info(disp) } - // let's track when we strip or drop data, notify outside of loop to avoid spam - stripped := false - dropped := false - - if len(alertItem.Events) > 0 { - eventBulk := make([]*ent.EventCreate, len(alertItem.Events)) - - for i, eventItem := range alertItem.Events { - ts, err := time.Parse(time.RFC3339, *eventItem.Timestamp) - if err != nil { - c.Log.Errorf("creating alert: Failed to parse event timestamp '%s', defaulting to now: %s", *eventItem.Timestamp, err) - - ts = time.Now().UTC() - } - - marshallMetas, err := json.Marshal(eventItem.Meta) - if err != nil { - return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err) - } - - // the serialized field is too big, let's try to progressively strip it - if event.SerializedValidator(string(marshallMetas)) != nil { - stripped = true - - valid := false - stripSize := 2048 - - for !valid && stripSize > 0 { - for _, serializedItem := range eventItem.Meta { - if len(serializedItem.Value) > stripSize*2 { - serializedItem.Value = serializedItem.Value[:stripSize] + "" - } - } - - marshallMetas, err = json.Marshal(eventItem.Meta) - if err != nil { - return nil, errors.Wrapf(MarshalFail, "event meta '%v' : %s", eventItem.Meta, err) - } - - if event.SerializedValidator(string(marshallMetas)) == nil { - valid = true - } - - stripSize /= 2 - } - - // nothing worked, drop it - if !valid { - dropped = true - stripped = false - marshallMetas = []byte("") - } - } - - eventBulk[i] = c.Ent.Event.Create(). - SetTime(ts). - SetSerialized(string(marshallMetas)) - } - - if stripped { - c.Log.Warningf("stripped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario) - } - - if dropped { - c.Log.Warningf("dropped 'serialized' field (machine %s / scenario %s)", machineID, *alertItem.Scenario) - } - - events, err = c.Ent.Event.CreateBulk(eventBulk...).Save(ctx) - if err != nil { - return nil, errors.Wrapf(BulkError, "creating alert events: %s", err) - } + events, err := buildEventCreates(ctx, c.Log, c.Ent, machineID, alertItem) + if err != nil { + return nil, fmt.Errorf("building events for alert %s: %w", alertItem.UUID, err) } - if len(alertItem.Meta) > 0 { - metaBulk := make([]*ent.MetaCreate, len(alertItem.Meta)) - - for i, metaItem := range alertItem.Meta { - key := metaItem.Key - value := metaItem.Value - - if len(metaItem.Value) > 4095 { - c.Log.Warningf("truncated meta %s: value too long", metaItem.Key) - - value = value[:4095] - } - - if len(metaItem.Key) > 255 { - c.Log.Warningf("truncated meta %s: key too long", metaItem.Key) - - key = key[:255] - } - - metaBulk[i] = c.Ent.Meta.Create(). - SetKey(key). - SetValue(value) - } - - metas, err = c.Ent.Meta.CreateBulk(metaBulk...).Save(ctx) - if err != nil { - c.Log.Warningf("error creating alert meta: %s", err) - } + metas, err := buildMetaCreates(ctx, c.Log, c.Ent, alertItem) + if err != nil { + c.Log.Warningf("error creating alert meta: %s", err) } - decisions := []*ent.Decision{} - - decisionChunks := slicetools.Chunks(alertItem.Decisions, c.decisionBulkSize) - for _, decisionChunk := range decisionChunks { - decisionRet, err := c.createDecisionChunk(ctx, *alertItem.Simulated, stopAtTime, decisionChunk) - if err != nil { - return nil, fmt.Errorf("creating alert decisions: %w", err) - } - - decisions = append(decisions, decisionRet...) - } - - discarded := len(alertItem.Decisions) - len(decisions) - if discarded > 0 { - c.Log.Warningf("discarded %d decisions for %s", discarded, alertItem.UUID) + decisions, discardCount, err := buildDecisions(ctx, c.Log, c, alertItem, stopAtTime) + if err != nil { + return nil, fmt.Errorf("building decisions for alert %s: %w", alertItem.UUID, err) } // if all decisions were discarded, discard the alert too - if discarded > 0 && len(decisions) == 0 { - c.Log.Warningf("dropping alert %s with invalid decisions", alertItem.UUID) + if discardCount > 0 && len(decisions) == 0 { + c.Log.Warningf("dropping alert %s: all decisions invalid", alertItem.UUID) continue - } + } alertBuilder := c.Ent.Alert. Create(). @@ -620,51 +698,12 @@ func (c *Client) createAlertChunk(ctx context.Context, machineID string, owner * return nil, nil } - alertsCreateBulk, err := c.Ent.Alert.CreateBulk(alertBuilders...).Save(ctx) + // Save alerts, then attach decisions with retry logic + ids, err := saveAlerts(ctx, c, alertBuilders, alertDecisions) if err != nil { - return nil, errors.Wrapf(BulkError, "bulk creating alert : %s", err) + return nil, err } - - ret := make([]string, len(alertsCreateBulk)) - for i, a := range alertsCreateBulk { - ret[i] = strconv.Itoa(a.ID) - - d := alertDecisions[i] - decisionsChunk := slicetools.Chunks(d, c.decisionBulkSize) - - for _, d2 := range decisionsChunk { - retry := 0 - - for retry < maxLockRetries { - // so much for the happy path... but sqlite3 errors work differently - _, err := c.Ent.Alert.Update().Where(alert.IDEQ(a.ID)).AddDecisions(d2...).Save(ctx) - if err == nil { - break - } - - var sqliteErr sqlite3.Error - if errors.As(err, &sqliteErr) { - if sqliteErr.Code == sqlite3.ErrBusy { - // sqlite3.Error{ - // Code: 5, - // ExtendedCode: 5, - // SystemErrno: 0, - // err: "database is locked", - // } - retry++ - log.Warningf("while updating decisions, sqlite3.ErrBusy: %s, retry %d of %d", err, retry, maxLockRetries) - time.Sleep(1 * time.Second) - - continue - } - } - - return nil, fmt.Errorf("error while updating decisions: %w", err) - } - } - } - - return ret, nil + return ids, nil } func (c *Client) CreateAlert(ctx context.Context, machineID string, alertList []*models.Alert) ([]string, error) {