mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-10 20:05:55 +02:00
1207 lines
36 KiB
Go
1207 lines
36 KiB
Go
package apiserver
|
|
|
|
import (
|
|
"bufio"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/http"
|
|
"net/url"
|
|
"slices"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/davecgh/go-spew/spew"
|
|
"github.com/go-openapi/strfmt"
|
|
"github.com/golang-jwt/jwt/v4"
|
|
log "github.com/sirupsen/logrus"
|
|
"gopkg.in/tomb.v2"
|
|
|
|
"github.com/crowdsecurity/go-cs-lib/ptr"
|
|
"github.com/crowdsecurity/go-cs-lib/trace"
|
|
|
|
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
|
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
|
"github.com/crowdsecurity/crowdsec/pkg/database"
|
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent"
|
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/alert"
|
|
"github.com/crowdsecurity/crowdsec/pkg/database/ent/decision"
|
|
"github.com/crowdsecurity/crowdsec/pkg/models"
|
|
"github.com/crowdsecurity/crowdsec/pkg/modelscapi"
|
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
|
)
|
|
|
|
const (
|
|
// delta values must be smaller than the interval
|
|
pullIntervalDefault = time.Hour * 2
|
|
pullIntervalDelta = time.Minute * 5
|
|
pushIntervalDefault = time.Second * 10
|
|
pushIntervalDelta = time.Second * 7
|
|
metricsIntervalDefault = time.Minute * 30
|
|
metricsIntervalDelta = time.Minute * 15
|
|
usageMetricsInterval = time.Minute * 30
|
|
usageMetricsIntervalDelta = time.Minute * 15
|
|
)
|
|
|
|
type apic struct {
|
|
// when changing the intervals in tests, always set *First too
|
|
// or they can be negative
|
|
pullInterval time.Duration
|
|
pullIntervalFirst time.Duration
|
|
pushInterval time.Duration
|
|
pushIntervalFirst time.Duration
|
|
metricsInterval time.Duration
|
|
metricsIntervalFirst time.Duration
|
|
usageMetricsInterval time.Duration
|
|
usageMetricsIntervalFirst time.Duration
|
|
dbClient *database.Client
|
|
apiClient *apiclient.ApiClient
|
|
AlertsAddChan chan []*models.Alert
|
|
|
|
mu sync.Mutex
|
|
pushTomb tomb.Tomb
|
|
pullTomb tomb.Tomb
|
|
metricsTomb tomb.Tomb
|
|
startup bool
|
|
credentials *csconfig.ApiCredentialsCfg
|
|
scenarioList []string
|
|
consoleConfig *csconfig.ConsoleConfig
|
|
isPulling chan bool
|
|
whitelists *csconfig.CapiWhitelist
|
|
|
|
pullBlocklists bool
|
|
pullCommunity bool
|
|
shareSignals bool
|
|
}
|
|
|
|
// randomDuration returns a duration value between d-delta and d+delta
|
|
func randomDuration(d time.Duration, delta time.Duration) time.Duration {
|
|
ret := d + time.Duration(rand.Int63n(int64(2*delta))) - delta
|
|
// ticker interval must be > 0 (nanoseconds)
|
|
if ret <= 0 {
|
|
return 1
|
|
}
|
|
|
|
return ret
|
|
}
|
|
|
|
func (a *apic) FetchScenariosListFromDB(ctx context.Context) ([]string, error) {
|
|
scenarios := make([]string, 0)
|
|
|
|
machines, err := a.dbClient.ListMachines(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("while listing machines: %w", err)
|
|
}
|
|
// merge all scenarios together
|
|
for _, v := range machines {
|
|
machineScenarios := strings.Split(v.Scenarios, ",")
|
|
log.Debugf("%d scenarios for machine %d", len(machineScenarios), v.ID)
|
|
|
|
for _, sv := range machineScenarios {
|
|
if !slices.Contains(scenarios, sv) && sv != "" {
|
|
scenarios = append(scenarios, sv)
|
|
}
|
|
}
|
|
}
|
|
|
|
log.Debugf("Returning list of scenarios : %+v", scenarios)
|
|
|
|
return scenarios, nil
|
|
}
|
|
|
|
func decisionsToApiDecisions(decisions []*models.Decision) models.AddSignalsRequestItemDecisions {
|
|
apiDecisions := models.AddSignalsRequestItemDecisions{}
|
|
|
|
for _, decision := range decisions {
|
|
x := &models.AddSignalsRequestItemDecisionsItem{
|
|
Duration: ptr.Of(*decision.Duration),
|
|
ID: new(int64),
|
|
Origin: ptr.Of(*decision.Origin),
|
|
Scenario: ptr.Of(*decision.Scenario),
|
|
Scope: ptr.Of(*decision.Scope),
|
|
// Simulated: *decision.Simulated,
|
|
Type: ptr.Of(*decision.Type),
|
|
Until: decision.Until,
|
|
Value: ptr.Of(*decision.Value),
|
|
UUID: decision.UUID,
|
|
}
|
|
*x.ID = decision.ID
|
|
|
|
if decision.Simulated != nil {
|
|
x.Simulated = *decision.Simulated
|
|
}
|
|
|
|
apiDecisions = append(apiDecisions, x)
|
|
}
|
|
|
|
return apiDecisions
|
|
}
|
|
|
|
func alertToSignal(alert *models.Alert, scenarioTrust string, shareContext bool) *models.AddSignalsRequestItem {
|
|
signal := &models.AddSignalsRequestItem{
|
|
Message: alert.Message,
|
|
Scenario: alert.Scenario,
|
|
ScenarioHash: alert.ScenarioHash,
|
|
ScenarioVersion: alert.ScenarioVersion,
|
|
Source: &models.AddSignalsRequestItemSource{
|
|
AsName: alert.Source.AsName,
|
|
AsNumber: alert.Source.AsNumber,
|
|
Cn: alert.Source.Cn,
|
|
IP: alert.Source.IP,
|
|
Latitude: alert.Source.Latitude,
|
|
Longitude: alert.Source.Longitude,
|
|
Range: alert.Source.Range,
|
|
Scope: alert.Source.Scope,
|
|
Value: alert.Source.Value,
|
|
},
|
|
StartAt: alert.StartAt,
|
|
StopAt: alert.StopAt,
|
|
CreatedAt: alert.CreatedAt,
|
|
MachineID: alert.MachineID,
|
|
ScenarioTrust: scenarioTrust,
|
|
Decisions: decisionsToApiDecisions(alert.Decisions),
|
|
UUID: alert.UUID,
|
|
}
|
|
if shareContext {
|
|
signal.Context = make([]*models.AddSignalsRequestItemContextItems0, 0)
|
|
|
|
for _, meta := range alert.Meta {
|
|
contextItem := models.AddSignalsRequestItemContextItems0{
|
|
Key: meta.Key,
|
|
Value: meta.Value,
|
|
}
|
|
signal.Context = append(signal.Context, &contextItem)
|
|
}
|
|
}
|
|
|
|
return signal
|
|
}
|
|
|
|
func NewAPIC(ctx context.Context, config *csconfig.OnlineApiClientCfg, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, apicWhitelist *csconfig.CapiWhitelist) (*apic, error) {
|
|
var err error
|
|
|
|
if apicWhitelist == nil {
|
|
apicWhitelist = &csconfig.CapiWhitelist{}
|
|
}
|
|
|
|
ret := &apic{
|
|
AlertsAddChan: make(chan []*models.Alert),
|
|
dbClient: dbClient,
|
|
mu: sync.Mutex{},
|
|
startup: true,
|
|
credentials: config.Credentials,
|
|
pullTomb: tomb.Tomb{},
|
|
pushTomb: tomb.Tomb{},
|
|
metricsTomb: tomb.Tomb{},
|
|
scenarioList: make([]string, 0),
|
|
consoleConfig: consoleConfig,
|
|
pullInterval: pullIntervalDefault,
|
|
pullIntervalFirst: randomDuration(pullIntervalDefault, pullIntervalDelta),
|
|
pushInterval: pushIntervalDefault,
|
|
pushIntervalFirst: randomDuration(pushIntervalDefault, pushIntervalDelta),
|
|
metricsInterval: metricsIntervalDefault,
|
|
metricsIntervalFirst: randomDuration(metricsIntervalDefault, metricsIntervalDelta),
|
|
usageMetricsInterval: usageMetricsInterval,
|
|
usageMetricsIntervalFirst: randomDuration(usageMetricsInterval, usageMetricsIntervalDelta),
|
|
isPulling: make(chan bool, 1),
|
|
whitelists: apicWhitelist,
|
|
pullBlocklists: *config.PullConfig.Blocklists,
|
|
pullCommunity: *config.PullConfig.Community,
|
|
shareSignals: *config.Sharing,
|
|
}
|
|
|
|
apiURL, err := url.Parse(config.Credentials.URL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.URL, err)
|
|
}
|
|
|
|
papiURL, err := url.Parse(config.Credentials.PapiURL)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("while parsing '%s': %w", config.Credentials.PapiURL, err)
|
|
}
|
|
|
|
ret.scenarioList, err = ret.FetchScenariosListFromDB(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("while fetching scenarios from db: %w", err)
|
|
}
|
|
|
|
ret.apiClient, err = apiclient.NewClient(&apiclient.Config{
|
|
MachineID: config.Credentials.Login,
|
|
Password: strfmt.Password(config.Credentials.Password),
|
|
URL: apiURL,
|
|
PapiURL: papiURL,
|
|
VersionPrefix: "v3",
|
|
Scenarios: ret.scenarioList,
|
|
UpdateScenario: ret.FetchScenariosListFromDB,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("while creating api client: %w", err)
|
|
}
|
|
|
|
err = ret.Authenticate(ctx, config)
|
|
return ret, err
|
|
}
|
|
|
|
// loadAPICToken attempts to retrieve and validate a JWT token from the local database.
|
|
// It returns the token string, its expiration time, and a boolean indicating whether the token is valid.
|
|
//
|
|
// A token is considered valid if:
|
|
// - it exists in the database,
|
|
// - it is a properly formatted JWT with an "exp" claim,
|
|
// - it is not expired or near expiry.
|
|
func loadAPICToken(ctx context.Context, db *database.Client) (string, time.Time, bool) {
|
|
token, err := db.GetConfigItem(ctx, "apic_token")
|
|
if err != nil {
|
|
log.Debugf("error fetching token from DB: %s", err)
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
if token == nil {
|
|
log.Debug("no token found in DB")
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
parser := new(jwt.Parser)
|
|
tok, _, err := parser.ParseUnverified(*token, jwt.MapClaims{})
|
|
if err != nil {
|
|
log.Debugf("error parsing token: %s", err)
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
claims, ok := tok.Claims.(jwt.MapClaims)
|
|
if !ok {
|
|
log.Debugf("error parsing token claims: %s", err)
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
expFloat, ok := claims["exp"].(float64)
|
|
if !ok {
|
|
log.Debug("token missing 'exp' claim")
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
exp := time.Unix(int64(expFloat), 0)
|
|
if time.Now().UTC().After(exp.Add(-1*time.Minute)) {
|
|
log.Debug("auth token expired")
|
|
return "", time.Time{}, false
|
|
}
|
|
|
|
return *token, exp, true
|
|
}
|
|
|
|
// saveAPICToken stores the given JWT token in the local database under the "apic_token" config item.
|
|
func saveAPICToken(ctx context.Context, db *database.Client, token string) error {
|
|
if err := db.SetConfigItem(ctx, "apic_token", token); err != nil {
|
|
return fmt.Errorf("saving token to db: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Authenticate ensures the API client is authorized to communicate with the CAPI.
|
|
// It attempts to reuse a previously saved JWT token from the database, falling back to
|
|
// an authentication request if the token is missing, invalid, or expired.
|
|
//
|
|
// If a new token is obtained, it is saved back to the database for caching.
|
|
func (a *apic) Authenticate(ctx context.Context, config *csconfig.OnlineApiClientCfg) error {
|
|
if token, exp, valid := loadAPICToken(ctx, a.dbClient); valid {
|
|
log.Debug("using valid token from DB")
|
|
a.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = token
|
|
a.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration = exp
|
|
}
|
|
|
|
log.Debug("No token found, authenticating")
|
|
|
|
scenarios, err := a.FetchScenariosListFromDB(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("get scenario in db: %w", err)
|
|
}
|
|
|
|
password := strfmt.Password(config.Credentials.Password)
|
|
|
|
authResp, _, err := a.apiClient.Auth.AuthenticateWatcher(ctx, models.WatcherAuthRequest{
|
|
MachineID: &config.Credentials.Login,
|
|
Password: &password,
|
|
Scenarios: scenarios,
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("authenticate watcher (%s): %w", config.Credentials.Login, err)
|
|
}
|
|
|
|
if err = a.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Expiration.UnmarshalText([]byte(authResp.Expire)); err != nil {
|
|
return fmt.Errorf("unable to parse jwt expiration: %w", err)
|
|
}
|
|
|
|
a.apiClient.GetClient().Transport.(*apiclient.JWTTransport).Token = authResp.Token
|
|
|
|
return saveAPICToken(ctx, a.dbClient, authResp.Token)
|
|
}
|
|
|
|
// keep track of all alerts in cache and push it to CAPI every PushInterval.
|
|
func (a *apic) Push(ctx context.Context) error {
|
|
defer trace.CatchPanic("lapi/pushToAPIC")
|
|
|
|
var cache models.AddSignalsRequest
|
|
|
|
ticker := time.NewTicker(a.pushIntervalFirst)
|
|
|
|
log.Infof("Start push to CrowdSec Central API (interval: %s once, then %s)", a.pushIntervalFirst.Round(time.Second), a.pushInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-a.pushTomb.Dying(): // if one apic routine is dying, do we kill the others?
|
|
a.pullTomb.Kill(nil)
|
|
a.metricsTomb.Kill(nil)
|
|
log.Infof("push tomb is dying, sending cache (%d elements) before exiting", len(cache))
|
|
|
|
if len(cache) == 0 {
|
|
return nil
|
|
}
|
|
|
|
go a.Send(ctx, &cache)
|
|
|
|
return nil
|
|
case <-ticker.C:
|
|
ticker.Reset(a.pushInterval)
|
|
|
|
if len(cache) > 0 {
|
|
a.mu.Lock()
|
|
cacheCopy := cache
|
|
cache = make(models.AddSignalsRequest, 0)
|
|
a.mu.Unlock()
|
|
log.Infof("Signal push: %d signals to push", len(cacheCopy))
|
|
|
|
go a.Send(ctx, &cacheCopy)
|
|
}
|
|
case alerts := <-a.AlertsAddChan:
|
|
var signals []*models.AddSignalsRequestItem
|
|
|
|
for _, alert := range alerts {
|
|
if ok := shouldShareAlert(alert, a.consoleConfig, a.shareSignals); ok {
|
|
signals = append(signals, alertToSignal(alert, getScenarioTrustOfAlert(alert), *a.consoleConfig.ShareContext))
|
|
}
|
|
}
|
|
|
|
a.mu.Lock()
|
|
cache = append(cache, signals...)
|
|
a.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func getScenarioTrustOfAlert(alert *models.Alert) string {
|
|
scenarioTrust := "certified"
|
|
if alert.ScenarioHash == nil || *alert.ScenarioHash == "" {
|
|
scenarioTrust = "custom"
|
|
} else if alert.ScenarioVersion == nil || *alert.ScenarioVersion == "" || *alert.ScenarioVersion == "?" {
|
|
scenarioTrust = "tainted"
|
|
}
|
|
|
|
if len(alert.Decisions) > 0 {
|
|
if *alert.Decisions[0].Origin == types.CscliOrigin {
|
|
scenarioTrust = "manual"
|
|
}
|
|
}
|
|
|
|
return scenarioTrust
|
|
}
|
|
|
|
func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig, shareSignals bool) bool {
|
|
if !shareSignals {
|
|
log.Debugf("sharing signals is disabled")
|
|
return false
|
|
}
|
|
|
|
if *alert.Simulated {
|
|
log.Debugf("simulation enabled for alert (id:%d), will not be sent to CAPI", alert.ID)
|
|
return false
|
|
}
|
|
|
|
switch scenarioTrust := getScenarioTrustOfAlert(alert); scenarioTrust {
|
|
case "manual":
|
|
if !*consoleConfig.ShareManualDecisions {
|
|
log.Debugf("manual decision generated an alert, doesn't send it to CAPI because options is disabled")
|
|
return false
|
|
}
|
|
case "tainted":
|
|
if !*consoleConfig.ShareTaintedScenarios {
|
|
log.Debugf("tainted scenario generated an alert, doesn't send it to CAPI because options is disabled")
|
|
return false
|
|
}
|
|
case "custom":
|
|
if !*consoleConfig.ShareCustomScenarios {
|
|
log.Debugf("custom scenario generated an alert, doesn't send it to CAPI because options is disabled")
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (a *apic) sendBatch(ctx context.Context, signals []*models.AddSignalsRequestItem) error {
|
|
ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
_, _, err := a.apiClient.Signal.Add(ctxBatch, (*models.AddSignalsRequest)(&signals))
|
|
|
|
return err
|
|
}
|
|
|
|
func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) {
|
|
/*we do have a problem with this :
|
|
The apic.Push background routine reads from alertToPush chan.
|
|
This chan is filled by Controller.CreateAlert
|
|
|
|
If the chan apic.Send hangs, the alertToPush chan will become full,
|
|
with means that Controller.CreateAlert is going to hang, blocking API worker(s).
|
|
|
|
So instead, we prefer to cancel write.
|
|
|
|
I don't know enough about gin to tell how much of an issue it can be.
|
|
*/
|
|
var cache []*models.AddSignalsRequestItem = *cacheOrig
|
|
|
|
batchSize := 50
|
|
|
|
for start := 0; start < len(cache); start += batchSize {
|
|
end := min(start+batchSize, len(cache))
|
|
|
|
if err := a.sendBatch(ctx, cache[start:end]); err != nil {
|
|
log.Errorf("sending signal to central API: %s", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *apic) CAPIPullIsOld(ctx context.Context) (bool, error) {
|
|
/*only pull community blocklist if it's older than 1h30 */
|
|
alerts := a.dbClient.Ent.Alert.Query()
|
|
alerts = alerts.Where(alert.HasDecisionsWith(decision.OriginEQ(database.CapiMachineID)))
|
|
alerts = alerts.Where(alert.CreatedAtGTE(time.Now().UTC().Add(-time.Duration(1*time.Hour + 30*time.Minute)))) //nolint:unconvert
|
|
|
|
count, err := alerts.Count(ctx)
|
|
if err != nil {
|
|
return false, fmt.Errorf("while looking for CAPI alert: %w", err)
|
|
}
|
|
|
|
if count > 0 {
|
|
log.Printf("last CAPI pull is newer than 1h30, skip.")
|
|
return false, nil
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (a *apic) HandleDeletedDecisionsV3(ctx context.Context, deletedDecisions []*modelscapi.GetDecisionsStreamResponseDeletedItem, deleteCounters map[string]map[string]int) (int, error) {
|
|
var nbDeleted int
|
|
|
|
for _, decisions := range deletedDecisions {
|
|
scope := decisions.Scope
|
|
|
|
for _, decision := range decisions.Decisions {
|
|
filter := map[string][]string{
|
|
"value": {decision},
|
|
"origin": {types.CAPIOrigin},
|
|
}
|
|
if strings.ToLower(*scope) != "ip" {
|
|
filter["scopes"] = []string{*scope}
|
|
}
|
|
|
|
dbCliDel, _, err := a.dbClient.ExpireDecisionsWithFilter(ctx, filter)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("expiring decisions error: %w", err)
|
|
}
|
|
|
|
updateCounterForDecision(deleteCounters, ptr.Of(types.CAPIOrigin), nil, dbCliDel)
|
|
nbDeleted += dbCliDel
|
|
}
|
|
}
|
|
|
|
return nbDeleted, nil
|
|
}
|
|
|
|
func createAlertsForDecisions(decisions []*models.Decision) []*models.Alert {
|
|
newAlerts := make([]*models.Alert, 0)
|
|
|
|
for _, decision := range decisions {
|
|
found := false
|
|
|
|
for _, sub := range newAlerts {
|
|
if sub.Source.Scope == nil {
|
|
log.Warningf("nil scope in %+v", sub)
|
|
continue
|
|
}
|
|
|
|
if *decision.Origin == types.CAPIOrigin {
|
|
if *sub.Source.Scope == types.CAPIOrigin {
|
|
found = true
|
|
break
|
|
}
|
|
} else if *decision.Origin == types.ListOrigin {
|
|
if *sub.Source.Scope == *decision.Origin {
|
|
if sub.Scenario == nil {
|
|
log.Warningf("nil scenario in %+v", sub)
|
|
}
|
|
|
|
if *sub.Scenario == *decision.Scenario {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
} else {
|
|
log.Warningf("unknown origin %s : %+v", *decision.Origin, decision)
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
log.Debugf("Create entry for origin:%s scenario:%s", *decision.Origin, *decision.Scenario)
|
|
newAlerts = append(newAlerts, createAlertForDecision(decision))
|
|
}
|
|
}
|
|
|
|
return newAlerts
|
|
}
|
|
|
|
func createAlertForDecision(decision *models.Decision) *models.Alert {
|
|
var (
|
|
scenario string
|
|
scope string
|
|
)
|
|
|
|
switch *decision.Origin {
|
|
case types.CAPIOrigin:
|
|
scenario = types.CAPIOrigin
|
|
scope = types.CAPIOrigin
|
|
case types.ListOrigin:
|
|
scenario = *decision.Scenario
|
|
scope = types.ListOrigin
|
|
default:
|
|
scenario = ""
|
|
scope = ""
|
|
|
|
log.Warningf("unknown origin %s", *decision.Origin)
|
|
}
|
|
|
|
return &models.Alert{
|
|
Source: &models.Source{
|
|
Scope: ptr.Of(scope),
|
|
Value: ptr.Of(""),
|
|
},
|
|
Scenario: ptr.Of(scenario),
|
|
Message: ptr.Of(""),
|
|
StartAt: ptr.Of(time.Now().UTC().Format(time.RFC3339)),
|
|
StopAt: ptr.Of(time.Now().UTC().Format(time.RFC3339)),
|
|
Capacity: ptr.Of(int32(0)),
|
|
Simulated: ptr.Of(false),
|
|
EventsCount: ptr.Of(int32(0)),
|
|
Leakspeed: ptr.Of(""),
|
|
ScenarioHash: ptr.Of(""),
|
|
ScenarioVersion: ptr.Of(""),
|
|
MachineID: database.CapiMachineID,
|
|
}
|
|
}
|
|
|
|
// This function takes in list of parent alerts and decisions and then pairs them up.
|
|
func fillAlertsWithDecisions(alerts []*models.Alert, decisions []*models.Decision, addCounters map[string]map[string]int) []*models.Alert {
|
|
for _, decision := range decisions {
|
|
// count and create separate alerts for each list
|
|
updateCounterForDecision(addCounters, decision.Origin, decision.Scenario, 1)
|
|
|
|
/*CAPI might send lower case scopes, unify it.*/
|
|
switch strings.ToLower(*decision.Scope) {
|
|
case "ip":
|
|
*decision.Scope = types.Ip
|
|
case "range":
|
|
*decision.Scope = types.Range
|
|
}
|
|
|
|
found := false
|
|
// add the individual decisions to the right list
|
|
for idx, alert := range alerts {
|
|
if *decision.Origin == types.CAPIOrigin {
|
|
if *alert.Source.Scope == types.CAPIOrigin {
|
|
alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
|
|
found = true
|
|
|
|
break
|
|
}
|
|
} else if *decision.Origin == types.ListOrigin {
|
|
if *alert.Source.Scope == types.ListOrigin && *alert.Scenario == *decision.Scenario {
|
|
alerts[idx].Decisions = append(alerts[idx].Decisions, decision)
|
|
found = true
|
|
|
|
break
|
|
}
|
|
} else {
|
|
log.Warningf("unknown origin %s", *decision.Origin)
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
log.Warningf("Orphaned decision for %s - %s", *decision.Origin, *decision.Scenario)
|
|
}
|
|
}
|
|
|
|
return alerts
|
|
}
|
|
|
|
// we receive a list of decisions and links for blocklist and we need to create a list of alerts :
|
|
// one alert for "community blocklist"
|
|
// one alert per list we're subscribed to
|
|
func (a *apic) PullTop(ctx context.Context, forcePull bool) error {
|
|
var err error
|
|
|
|
hasPulledAllowlists := false
|
|
|
|
// A mutex with TryLock would be a bit simpler
|
|
// But go does not guarantee that TryLock will be able to acquire the lock even if it is available
|
|
select {
|
|
case a.isPulling <- true:
|
|
defer func() {
|
|
<-a.isPulling
|
|
}()
|
|
default:
|
|
return errors.New("pull already in progress")
|
|
}
|
|
|
|
if !forcePull {
|
|
if lastPullIsOld, err := a.CAPIPullIsOld(ctx); err != nil {
|
|
return err
|
|
} else if !lastPullIsOld {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
log.Debug("Acquiring lock for pullCAPI")
|
|
|
|
err = a.dbClient.AcquirePullCAPILock(ctx)
|
|
if a.dbClient.IsLocked(err) {
|
|
log.Info("PullCAPI is already running, skipping")
|
|
return nil
|
|
}
|
|
|
|
/*defer lock release*/
|
|
defer func() {
|
|
log.Debug("Releasing lock for pullCAPI")
|
|
|
|
if err := a.dbClient.ReleasePullCAPILock(ctx); err != nil {
|
|
log.Errorf("while releasing lock: %v", err)
|
|
}
|
|
}()
|
|
|
|
log.Infof("Starting community-blocklist update")
|
|
|
|
log.Debugf("Community pull: %t | Blocklist pull: %t", a.pullCommunity, a.pullBlocklists)
|
|
|
|
data, _, err := a.apiClient.Decisions.GetStreamV3(ctx, apiclient.DecisionsStreamOpts{Startup: a.startup, CommunityPull: a.pullCommunity, AdditionalPull: a.pullBlocklists})
|
|
if err != nil {
|
|
return fmt.Errorf("get stream: %w", err)
|
|
}
|
|
|
|
a.startup = false
|
|
/*to count additions/deletions across lists*/
|
|
|
|
log.Debugf("Received %d new decisions", len(data.New))
|
|
log.Debugf("Received %d deleted decisions", len(data.Deleted))
|
|
|
|
if data.Links != nil {
|
|
log.Debugf("Received %d blocklists links", len(data.Links.Blocklists))
|
|
log.Debugf("Received %d allowlists links", len(data.Links.Allowlists))
|
|
}
|
|
|
|
addCounters, deleteCounters := makeAddAndDeleteCounters()
|
|
|
|
// process deleted decisions
|
|
nbDeleted, err := a.HandleDeletedDecisionsV3(ctx, data.Deleted, deleteCounters)
|
|
if err != nil {
|
|
log.Errorf("could not delete decisions from CAPI: %s", err)
|
|
}
|
|
|
|
log.Printf("capi/community-blocklist : %d explicit deletions", nbDeleted)
|
|
|
|
// Update allowlists before processing decisions
|
|
if data.Links != nil {
|
|
if len(data.Links.Allowlists) > 0 {
|
|
hasPulledAllowlists = true
|
|
|
|
if err := a.UpdateAllowlists(ctx, data.Links.Allowlists, forcePull); err != nil {
|
|
log.Errorf("could not update allowlists from CAPI: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(data.New) > 0 {
|
|
// create one alert for community blocklist using the first decision
|
|
decisions := a.apiClient.Decisions.GetDecisionsFromGroups(data.New)
|
|
// apply APIC specific whitelists
|
|
decisions = a.ApplyApicWhitelists(ctx, decisions)
|
|
|
|
alert := createAlertForDecision(decisions[0])
|
|
alertsFromCapi := []*models.Alert{alert}
|
|
alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
|
|
|
|
err = a.SaveAlerts(ctx, alertsFromCapi, addCounters, deleteCounters)
|
|
if err != nil {
|
|
log.Errorf("could not save alert for CAPI pull: %s", err)
|
|
}
|
|
} else {
|
|
if a.pullCommunity {
|
|
log.Info("capi/community-blocklist : received 0 new entries (expected if you just installed crowdsec)")
|
|
} else {
|
|
log.Debug("capi/community-blocklist : community blocklist pull is disabled")
|
|
}
|
|
}
|
|
|
|
// update allowlists/blocklists
|
|
if data.Links != nil {
|
|
if len(data.Links.Blocklists) > 0 {
|
|
if err := a.UpdateBlocklists(ctx, data.Links.Blocklists, addCounters, forcePull); err != nil {
|
|
log.Errorf("could not update blocklists from CAPI: %s", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if hasPulledAllowlists {
|
|
deleted, err := a.dbClient.ApplyAllowlistsToExistingDecisions(ctx)
|
|
if err != nil {
|
|
log.Errorf("could not apply allowlists to existing decisions: %s", err)
|
|
}
|
|
|
|
if deleted > 0 {
|
|
log.Infof("deleted %d decisions from allowlists", deleted)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// we receive a link to a blocklist, we pull the content of the blocklist and we create one alert
|
|
func (a *apic) PullBlocklist(ctx context.Context, blocklist *modelscapi.BlocklistLink, forcePull bool) error {
|
|
addCounters, _ := makeAddAndDeleteCounters()
|
|
if err := a.UpdateBlocklists(ctx, []*modelscapi.BlocklistLink{blocklist}, addCounters, forcePull); err != nil {
|
|
return fmt.Errorf("while pulling blocklist: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apic) PullAllowlist(ctx context.Context, allowlist *modelscapi.AllowlistLink, forcePull bool) error {
|
|
if err := a.UpdateAllowlists(ctx, []*modelscapi.AllowlistLink{allowlist}, forcePull); err != nil {
|
|
return fmt.Errorf("while pulling allowlist: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apic) UpdateAllowlists(ctx context.Context, allowlistsLinks []*modelscapi.AllowlistLink, forcePull bool) error {
|
|
if len(allowlistsLinks) == 0 {
|
|
return nil
|
|
}
|
|
|
|
defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
|
|
if err != nil {
|
|
return fmt.Errorf("while creating default client: %w", err)
|
|
}
|
|
|
|
for _, link := range allowlistsLinks {
|
|
if log.IsLevelEnabled(log.TraceLevel) {
|
|
log.Tracef("allowlist body: %+v", spew.Sdump(link))
|
|
}
|
|
|
|
if link.Name == nil {
|
|
log.Warningf("allowlist has no name")
|
|
continue
|
|
}
|
|
|
|
if link.URL == nil {
|
|
log.Warningf("allowlist %s has no URL", *link.Name)
|
|
continue
|
|
}
|
|
|
|
if link.ID == nil {
|
|
log.Warningf("allowlist %s has no ID", *link.Name)
|
|
continue
|
|
}
|
|
|
|
description := ""
|
|
if link.Description != nil {
|
|
description = *link.Description
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, *link.URL, http.NoBody)
|
|
if err != nil {
|
|
log.Errorf("while pulling allowlist: %s", err)
|
|
continue
|
|
}
|
|
|
|
resp, err := defaultClient.GetClient().Do(req)
|
|
if err != nil {
|
|
log.Errorf("while pulling allowlist: %s", err)
|
|
continue
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
scanner := bufio.NewScanner(resp.Body)
|
|
items := make([]*models.AllowlistItem, 0)
|
|
|
|
for scanner.Scan() {
|
|
item := scanner.Text()
|
|
j := &models.AllowlistItem{}
|
|
|
|
if err := json.Unmarshal([]byte(item), j); err != nil {
|
|
log.Errorf("while unmarshalling allowlist item: %s", err)
|
|
continue
|
|
}
|
|
|
|
items = append(items, j)
|
|
}
|
|
|
|
list, err := a.dbClient.GetAllowListByID(ctx, *link.ID, false)
|
|
if err != nil {
|
|
if !ent.IsNotFound(err) {
|
|
log.Errorf("while getting allowlist %s: %s", *link.Name, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
if list == nil {
|
|
list, err = a.dbClient.CreateAllowList(ctx, *link.Name, description, *link.ID, true)
|
|
if err != nil {
|
|
log.Errorf("while creating allowlist %s: %s", *link.Name, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
added, err := a.dbClient.ReplaceAllowlist(ctx, list, items, true)
|
|
if err != nil {
|
|
log.Errorf("while replacing allowlist %s: %s", *link.Name, err)
|
|
continue
|
|
}
|
|
|
|
log.Infof("added %d values to allowlist %s", added, list.Name)
|
|
|
|
if list.Name != *link.Name || list.Description != description {
|
|
err = a.dbClient.UpdateAllowlistMeta(ctx, *link.ID, *link.Name, description)
|
|
if err != nil {
|
|
log.Errorf("while updating allowlist meta %s: %s", *link.Name, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
log.Infof("Allowlist %s updated", *link.Name)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// if decisions is whitelisted: return representation of the whitelist ip or cidr
|
|
// if not whitelisted: empty string
|
|
func (a *apic) whitelistedBy(decision *models.Decision, additionalIPs []net.IP, additionalRanges []*net.IPNet) string {
|
|
if decision.Value == nil {
|
|
return ""
|
|
}
|
|
|
|
ipval := net.ParseIP(*decision.Value)
|
|
for _, cidr := range a.whitelists.Cidrs {
|
|
if cidr.Contains(ipval) {
|
|
return cidr.String()
|
|
}
|
|
}
|
|
|
|
for _, ip := range a.whitelists.Ips {
|
|
if ip != nil && ip.Equal(ipval) {
|
|
return ip.String()
|
|
}
|
|
}
|
|
|
|
for _, ip := range additionalIPs {
|
|
if ip.Equal(ipval) {
|
|
return ip.String()
|
|
}
|
|
}
|
|
|
|
for _, cidr := range additionalRanges {
|
|
if cidr.Contains(ipval) {
|
|
return cidr.String()
|
|
}
|
|
}
|
|
|
|
return ""
|
|
}
|
|
|
|
func (a *apic) ApplyApicWhitelists(ctx context.Context, decisions []*models.Decision) []*models.Decision {
|
|
allowlisted_ips, allowlisted_cidrs, err := a.dbClient.GetAllowlistsContentForAPIC(ctx)
|
|
if err != nil {
|
|
log.Errorf("while getting allowlists content: %s", err)
|
|
}
|
|
|
|
if a.whitelists != nil && (len(a.whitelists.Cidrs) > 0 || len(a.whitelists.Ips) > 0) {
|
|
log.Warn("capi_whitelists_path is deprecated, please use centralized allowlists instead. See https://docs.crowdsec.net/docs/next/local_api/centralized_allowlists.")
|
|
}
|
|
|
|
if (a.whitelists == nil || len(a.whitelists.Cidrs) == 0 && len(a.whitelists.Ips) == 0) && len(allowlisted_ips) == 0 && len(allowlisted_cidrs) == 0 {
|
|
return decisions
|
|
}
|
|
// deal with CAPI whitelists for fire. We want to avoid having a second list, so we shrink in place
|
|
outIdx := 0
|
|
|
|
for _, decision := range decisions {
|
|
whitelister := a.whitelistedBy(decision, allowlisted_ips, allowlisted_cidrs)
|
|
if whitelister != "" {
|
|
log.Infof("%s from %s is whitelisted by %s", *decision.Value, *decision.Scenario, whitelister)
|
|
continue
|
|
}
|
|
|
|
decisions[outIdx] = decision
|
|
outIdx++
|
|
}
|
|
// shrink the list, those are deleted items
|
|
return decisions[:outIdx]
|
|
}
|
|
|
|
func (a *apic) SaveAlerts(ctx context.Context, alertsFromCapi []*models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) error {
|
|
for _, alert := range alertsFromCapi {
|
|
setAlertScenario(alert, addCounters, deleteCounters)
|
|
log.Debugf("%s has %d decisions", *alert.Source.Scope, len(alert.Decisions))
|
|
|
|
if a.dbClient.Type == "sqlite" && (a.dbClient.WalMode == nil || !*a.dbClient.WalMode) {
|
|
log.Warningf("sqlite is not using WAL mode, LAPI might become unresponsive when inserting the community blocklist")
|
|
}
|
|
|
|
alertID, inserted, deleted, err := a.dbClient.UpdateCommunityBlocklist(ctx, alert)
|
|
if err != nil {
|
|
return fmt.Errorf("while saving alert from %s: %w", *alert.Source.Scope, err)
|
|
}
|
|
|
|
log.Printf("%s : added %d entries, deleted %d entries (alert:%d)", *alert.Source.Scope, inserted, deleted, alertID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apic) ShouldForcePullBlocklist(ctx context.Context, blocklist *modelscapi.BlocklistLink) (bool, error) {
|
|
// we should force pull if the blocklist decisions are about to expire or there's no decision in the db
|
|
alertQuery := a.dbClient.Ent.Alert.Query()
|
|
alertQuery.Where(alert.SourceScopeEQ(fmt.Sprintf("%s:%s", types.ListOrigin, *blocklist.Name)))
|
|
alertQuery.Order(ent.Desc(alert.FieldCreatedAt))
|
|
|
|
alertInstance, err := alertQuery.First(ctx)
|
|
if err != nil {
|
|
if ent.IsNotFound(err) {
|
|
log.Debugf("no alert found for %s, force refresh", *blocklist.Name)
|
|
return true, nil
|
|
}
|
|
|
|
return false, fmt.Errorf("while getting alert: %w", err)
|
|
}
|
|
|
|
decisionQuery := a.dbClient.Ent.Decision.Query()
|
|
decisionQuery.Where(decision.HasOwnerWith(alert.IDEQ(alertInstance.ID)))
|
|
|
|
firstDecision, err := decisionQuery.First(ctx)
|
|
if err != nil {
|
|
if ent.IsNotFound(err) {
|
|
log.Debugf("no decision found for %s, force refresh", *blocklist.Name)
|
|
return true, nil
|
|
}
|
|
|
|
return false, fmt.Errorf("while getting decision: %w", err)
|
|
}
|
|
|
|
if firstDecision == nil || firstDecision.Until == nil || firstDecision.Until.Sub(time.Now().UTC()) < (a.pullInterval+15*time.Minute) {
|
|
log.Debugf("at least one decision found for %s, expire soon, force refresh", *blocklist.Name)
|
|
return true, nil
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (a *apic) updateBlocklist(ctx context.Context, client *apiclient.ApiClient, blocklist *modelscapi.BlocklistLink, addCounters map[string]map[string]int, forcePull bool) error {
|
|
if blocklist.Scope == nil {
|
|
log.Warningf("blocklist has no scope")
|
|
return nil
|
|
}
|
|
|
|
if blocklist.Duration == nil {
|
|
log.Warningf("blocklist has no duration")
|
|
return nil
|
|
}
|
|
|
|
if !forcePull {
|
|
_forcePull, err := a.ShouldForcePullBlocklist(ctx, blocklist)
|
|
if err != nil {
|
|
return fmt.Errorf("while checking if we should force pull blocklist %s: %w", *blocklist.Name, err)
|
|
}
|
|
|
|
forcePull = _forcePull
|
|
}
|
|
|
|
blocklistConfigItemName := fmt.Sprintf("blocklist:%s:last_pull", *blocklist.Name)
|
|
|
|
var (
|
|
lastPullTimestamp *string
|
|
err error
|
|
)
|
|
|
|
if !forcePull {
|
|
lastPullTimestamp, err = a.dbClient.GetConfigItem(ctx, blocklistConfigItemName)
|
|
if err != nil {
|
|
return fmt.Errorf("while getting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
|
|
}
|
|
}
|
|
|
|
decisions, hasChanged, err := client.Decisions.GetDecisionsFromBlocklist(ctx, blocklist, lastPullTimestamp)
|
|
if err != nil {
|
|
return fmt.Errorf("while getting decisions from blocklist %s: %w", *blocklist.Name, err)
|
|
}
|
|
|
|
if !hasChanged {
|
|
if lastPullTimestamp == nil {
|
|
log.Infof("blocklist %s hasn't been modified or there was an error reading it, skipping", *blocklist.Name)
|
|
} else {
|
|
log.Infof("blocklist %s hasn't been modified since %s, skipping", *blocklist.Name, *lastPullTimestamp)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
err = a.dbClient.SetConfigItem(ctx, blocklistConfigItemName, time.Now().UTC().Format(http.TimeFormat))
|
|
if err != nil {
|
|
return fmt.Errorf("while setting last pull timestamp for blocklist %s: %w", *blocklist.Name, err)
|
|
}
|
|
|
|
if len(decisions) == 0 {
|
|
log.Infof("blocklist %s has no decisions", *blocklist.Name)
|
|
return nil
|
|
}
|
|
// apply APIC specific whitelists
|
|
decisions = a.ApplyApicWhitelists(ctx, decisions)
|
|
alert := createAlertForDecision(decisions[0])
|
|
alertsFromCapi := []*models.Alert{alert}
|
|
alertsFromCapi = fillAlertsWithDecisions(alertsFromCapi, decisions, addCounters)
|
|
|
|
err = a.SaveAlerts(ctx, alertsFromCapi, addCounters, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("while saving alert from blocklist %s: %w", *blocklist.Name, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (a *apic) UpdateBlocklists(ctx context.Context, blocklists []*modelscapi.BlocklistLink, addCounters map[string]map[string]int, forcePull bool) error {
|
|
if len(blocklists) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// we must use a different http client than apiClient's because the transport of apiClient is jwtTransport or here we have signed apis that are incompatibles
|
|
// we can use the same baseUrl as the urls are absolute and the parse will take care of it
|
|
defaultClient, err := apiclient.NewDefaultClient(a.apiClient.BaseURL, "", "", nil)
|
|
if err != nil {
|
|
return fmt.Errorf("while creating default client: %w", err)
|
|
}
|
|
|
|
for _, blocklist := range blocklists {
|
|
if err := a.updateBlocklist(ctx, defaultClient, blocklist, addCounters, forcePull); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func setAlertScenario(alert *models.Alert, addCounters map[string]map[string]int, deleteCounters map[string]map[string]int) {
|
|
switch *alert.Source.Scope {
|
|
case types.CAPIOrigin:
|
|
*alert.Source.Scope = types.CommunityBlocklistPullSourceScope
|
|
alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs",
|
|
addCounters[types.CAPIOrigin]["all"],
|
|
deleteCounters[types.CAPIOrigin]["all"]))
|
|
case types.ListOrigin:
|
|
*alert.Source.Scope = fmt.Sprintf("%s:%s", types.ListOrigin, *alert.Scenario)
|
|
alert.Scenario = ptr.Of(fmt.Sprintf("update : +%d/-%d IPs",
|
|
addCounters[types.ListOrigin][*alert.Scenario],
|
|
deleteCounters[types.ListOrigin][*alert.Scenario]))
|
|
}
|
|
}
|
|
|
|
func (a *apic) Pull(ctx context.Context) error {
|
|
defer trace.CatchPanic("lapi/pullFromAPIC")
|
|
|
|
toldOnce := false
|
|
|
|
for {
|
|
scenario, err := a.FetchScenariosListFromDB(ctx)
|
|
if err != nil {
|
|
log.Errorf("unable to fetch scenarios from db: %s", err)
|
|
}
|
|
|
|
if len(scenario) > 0 {
|
|
break
|
|
}
|
|
|
|
if !toldOnce {
|
|
log.Warning("scenario list is empty, will not pull yet")
|
|
|
|
toldOnce = true
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
|
|
if err := a.PullTop(ctx, false); err != nil {
|
|
log.Errorf("capi pull top: %s", err)
|
|
}
|
|
|
|
log.Infof("Start pull from CrowdSec Central API (interval: %s once, then %s)", a.pullIntervalFirst.Round(time.Second), a.pullInterval)
|
|
ticker := time.NewTicker(a.pullIntervalFirst)
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
ticker.Reset(a.pullInterval)
|
|
|
|
if err := a.PullTop(ctx, false); err != nil {
|
|
log.Errorf("capi pull top: %s", err)
|
|
continue
|
|
}
|
|
case <-a.pullTomb.Dying(): // if one apic routine is dying, do we kill the others?
|
|
a.metricsTomb.Kill(nil)
|
|
a.pushTomb.Kill(nil)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *apic) Shutdown() {
|
|
a.pushTomb.Kill(nil)
|
|
a.pullTomb.Kill(nil)
|
|
a.metricsTomb.Kill(nil)
|
|
}
|
|
|
|
func makeAddAndDeleteCounters() (map[string]map[string]int, map[string]map[string]int) {
|
|
addCounters := make(map[string]map[string]int)
|
|
addCounters[types.CAPIOrigin] = make(map[string]int)
|
|
addCounters[types.ListOrigin] = make(map[string]int)
|
|
|
|
deleteCounters := make(map[string]map[string]int)
|
|
deleteCounters[types.CAPIOrigin] = make(map[string]int)
|
|
deleteCounters[types.ListOrigin] = make(map[string]int)
|
|
|
|
return addCounters, deleteCounters
|
|
}
|
|
|
|
func updateCounterForDecision(counter map[string]map[string]int, origin *string, scenario *string, totalDecisions int) {
|
|
switch *origin {
|
|
case types.CAPIOrigin:
|
|
counter[*origin]["all"] += totalDecisions
|
|
case types.ListOrigin:
|
|
counter[*origin][*scenario] += totalDecisions
|
|
default:
|
|
log.Warningf("Unknown origin %s", *origin)
|
|
}
|
|
}
|