mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-10 20:05:55 +02:00
372 lines
10 KiB
Go
372 lines
10 KiB
Go
package apiserver
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"gopkg.in/tomb.v2"
|
|
|
|
"github.com/crowdsecurity/go-cs-lib/trace"
|
|
|
|
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
|
|
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
|
|
"github.com/crowdsecurity/crowdsec/pkg/database"
|
|
"github.com/crowdsecurity/crowdsec/pkg/longpollclient"
|
|
"github.com/crowdsecurity/crowdsec/pkg/models"
|
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
|
)
|
|
|
|
const (
|
|
SyncInterval = time.Second * 10
|
|
PapiPullKey = "papi:last_pull"
|
|
)
|
|
|
|
var operationMap = map[string]func(*Message, *Papi, bool) error{
|
|
"decision": DecisionCmd,
|
|
"alert": AlertCmd,
|
|
"management": ManagementCmd,
|
|
}
|
|
|
|
type Header struct {
|
|
OperationType string `json:"operation_type"`
|
|
OperationCmd string `json:"operation_cmd"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Message string `json:"message"`
|
|
UUID string `json:"uuid"`
|
|
Source *Source `json:"source"`
|
|
Destination string `json:"destination"`
|
|
}
|
|
|
|
type Source struct {
|
|
User string `json:"user"`
|
|
}
|
|
|
|
type Message struct {
|
|
Header *Header
|
|
Data any `json:"data"`
|
|
}
|
|
|
|
type OperationChannels struct {
|
|
AddAlertChannel chan []*models.Alert
|
|
DeleteDecisionChannel chan []*models.Decision
|
|
}
|
|
|
|
type Papi struct {
|
|
URL string
|
|
Client *longpollclient.LongPollClient
|
|
DBClient *database.Client
|
|
apiClient *apiclient.ApiClient
|
|
Channels *OperationChannels
|
|
mu sync.Mutex
|
|
pullTomb tomb.Tomb
|
|
syncTomb tomb.Tomb
|
|
SyncInterval time.Duration
|
|
consoleConfig *csconfig.ConsoleConfig
|
|
Logger *log.Entry
|
|
apic *apic
|
|
}
|
|
|
|
type PapiPermCheckError struct {
|
|
Error string `json:"error"`
|
|
}
|
|
|
|
type PapiPermCheckSuccess struct {
|
|
Status string `json:"status"`
|
|
Plan string `json:"plan"`
|
|
Categories []string `json:"categories"`
|
|
}
|
|
|
|
func NewPAPI(apic *apic, dbClient *database.Client, consoleConfig *csconfig.ConsoleConfig, logLevel log.Level) (*Papi, error) {
|
|
logger := log.New()
|
|
if err := types.ConfigureLogger(logger, &logLevel); err != nil {
|
|
return &Papi{}, fmt.Errorf("creating papi logger: %w", err)
|
|
}
|
|
|
|
papiUrl := *apic.apiClient.PapiURL
|
|
papiUrl.Path = fmt.Sprintf("%s%s", types.PAPIVersion, types.PAPIPollUrl)
|
|
|
|
longPollClient, err := longpollclient.NewLongPollClient(longpollclient.LongPollClientConfig{
|
|
Url: papiUrl,
|
|
Logger: logger,
|
|
HttpClient: apic.apiClient.GetClient(),
|
|
})
|
|
if err != nil {
|
|
return &Papi{}, fmt.Errorf("failed to create PAPI client: %w", err)
|
|
}
|
|
|
|
channels := &OperationChannels{
|
|
AddAlertChannel: apic.AlertsAddChan,
|
|
DeleteDecisionChannel: make(chan []*models.Decision),
|
|
}
|
|
|
|
papi := &Papi{
|
|
URL: apic.apiClient.PapiURL.String(),
|
|
Client: longPollClient,
|
|
DBClient: dbClient,
|
|
Channels: channels,
|
|
SyncInterval: SyncInterval,
|
|
mu: sync.Mutex{},
|
|
pullTomb: tomb.Tomb{},
|
|
syncTomb: tomb.Tomb{},
|
|
apiClient: apic.apiClient,
|
|
apic: apic,
|
|
consoleConfig: consoleConfig,
|
|
Logger: logger.WithFields(log.Fields{"interval": SyncInterval.Seconds(), "source": "papi"}),
|
|
}
|
|
|
|
return papi, nil
|
|
}
|
|
|
|
func (p *Papi) handleEvent(event longpollclient.Event, sync bool) error {
|
|
logger := p.Logger.WithField("request-id", event.RequestId)
|
|
logger.Debugf("message received: %+v", event.Data)
|
|
|
|
message := &Message{}
|
|
if err := json.Unmarshal([]byte(event.Data), message); err != nil {
|
|
return fmt.Errorf("polling papi message format is not compatible: %+v: %w", event.Data, err)
|
|
}
|
|
|
|
if message.Header == nil {
|
|
return errors.New("no header in message, skipping")
|
|
}
|
|
|
|
if message.Header.Source == nil {
|
|
return errors.New("no source user in header message, skipping")
|
|
}
|
|
|
|
operationFunc, ok := operationMap[message.Header.OperationType]
|
|
if !ok {
|
|
return fmt.Errorf("operation '%s' unknown, continue", message.Header.OperationType)
|
|
}
|
|
|
|
logger.Debugf("Calling operation '%s'", message.Header.OperationType)
|
|
|
|
err := operationFunc(message, p, sync)
|
|
if err != nil {
|
|
return fmt.Errorf("'%s %s failed: %w", message.Header.OperationType, message.Header.OperationCmd, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Papi) GetPermissions(ctx context.Context) (PapiPermCheckSuccess, error) {
|
|
httpClient := p.apiClient.GetClient()
|
|
papiCheckUrl := fmt.Sprintf("%s%s%s", p.URL, types.PAPIVersion, types.PAPIPermissionsUrl)
|
|
|
|
req, err := http.NewRequestWithContext(ctx, http.MethodGet, papiCheckUrl, http.NoBody)
|
|
if err != nil {
|
|
return PapiPermCheckSuccess{}, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
resp, err := httpClient.Do(req)
|
|
if err != nil {
|
|
return PapiPermCheckSuccess{}, fmt.Errorf("failed to get response: %w", err)
|
|
}
|
|
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
errResp := PapiPermCheckError{}
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&errResp)
|
|
if err != nil {
|
|
return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return PapiPermCheckSuccess{}, fmt.Errorf("unable to query PAPI : %s (%d)", errResp.Error, resp.StatusCode)
|
|
}
|
|
|
|
respBody := PapiPermCheckSuccess{}
|
|
|
|
err = json.NewDecoder(resp.Body).Decode(&respBody)
|
|
if err != nil {
|
|
return PapiPermCheckSuccess{}, fmt.Errorf("failed to decode response: %w", err)
|
|
}
|
|
|
|
return respBody, nil
|
|
}
|
|
|
|
func reverse(s []longpollclient.Event) []longpollclient.Event {
|
|
a := make([]longpollclient.Event, len(s))
|
|
copy(a, s)
|
|
|
|
for i := len(a)/2 - 1; i >= 0; i-- {
|
|
opp := len(a) - 1 - i
|
|
a[i], a[opp] = a[opp], a[i]
|
|
}
|
|
|
|
return a
|
|
}
|
|
|
|
func (p *Papi) PullOnce(ctx context.Context, since time.Time, sync bool) error {
|
|
events, err := p.Client.PullOnce(ctx, since)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
reversedEvents := reverse(events) // PAPI sends events in the reverse order, which is not an issue when pulling them in real time, but here we need the correct order
|
|
eventsCount := len(events)
|
|
p.Logger.Infof("received %d events", eventsCount)
|
|
|
|
for i, event := range reversedEvents {
|
|
if err := p.handleEvent(event, sync); err != nil {
|
|
p.Logger.WithField("request-id", event.RequestId).Errorf("failed to handle event: %s", err)
|
|
}
|
|
|
|
p.Logger.Debugf("handled event %d/%d", i, eventsCount)
|
|
}
|
|
|
|
p.Logger.Debugf("finished handling events")
|
|
// Don't update the timestamp in DB, as a "real" LAPI might be running
|
|
// Worst case, crowdsec will receive a few duplicated events and will discard them
|
|
return nil
|
|
}
|
|
|
|
// PullPAPI is the long polling client for real-time decisions from PAPI
|
|
func (p *Papi) Pull(ctx context.Context) error {
|
|
defer trace.CatchPanic("lapi/PullPAPI")
|
|
p.Logger.Infof("Starting Polling API Pull")
|
|
|
|
lastTimestamp := time.Time{}
|
|
|
|
lastTimestampStr, err := p.DBClient.GetConfigItem(ctx, PapiPullKey)
|
|
if err != nil {
|
|
p.Logger.Warningf("failed to get last timestamp for papi pull: %s", err)
|
|
}
|
|
|
|
// value doesn't exist, it's first time we're pulling
|
|
if lastTimestampStr == "" {
|
|
binTime, err := lastTimestamp.MarshalText()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize last timestamp: %w", err)
|
|
}
|
|
|
|
if err := p.DBClient.SetConfigItem(ctx, PapiPullKey, string(binTime)); err != nil {
|
|
p.Logger.Errorf("error setting papi pull last key: %s", err)
|
|
} else {
|
|
p.Logger.Debugf("config item '%s' set in database with value '%s'", PapiPullKey, string(binTime))
|
|
}
|
|
} else {
|
|
if err := lastTimestamp.UnmarshalText([]byte(lastTimestampStr)); err != nil {
|
|
return fmt.Errorf("failed to parse last timestamp: %w", err)
|
|
}
|
|
}
|
|
|
|
p.Logger.Infof("Starting PAPI pull (since:%s)", lastTimestamp)
|
|
|
|
for event := range p.Client.Start(ctx, lastTimestamp) {
|
|
logger := p.Logger.WithField("request-id", event.RequestId)
|
|
// update last timestamp in database
|
|
newTime := time.Now().UTC()
|
|
|
|
binTime, err := newTime.MarshalText()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to serialize last timestamp: %w", err)
|
|
}
|
|
|
|
err = p.handleEvent(event, false)
|
|
if err != nil {
|
|
logger.Errorf("failed to handle event: %s", err)
|
|
continue
|
|
}
|
|
|
|
if err := p.DBClient.SetConfigItem(ctx, PapiPullKey, string(binTime)); err != nil {
|
|
return fmt.Errorf("failed to update last timestamp: %w", err)
|
|
}
|
|
|
|
logger.Debugf("set last timestamp to %s", newTime)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Papi) SyncDecisions(ctx context.Context) error {
|
|
defer trace.CatchPanic("lapi/syncDecisionsToCAPI")
|
|
|
|
var cache models.DecisionsDeleteRequest
|
|
|
|
ticker := time.NewTicker(p.SyncInterval)
|
|
p.Logger.Infof("Start decisions sync to CrowdSec Central API (interval: %s)", p.SyncInterval)
|
|
|
|
for {
|
|
select {
|
|
case <-p.syncTomb.Dying(): // if one apic routine is dying, do we kill the others?
|
|
p.Logger.Infof("sync decisions tomb is dying, sending cache (%d elements) before exiting", len(cache))
|
|
|
|
if len(cache) == 0 {
|
|
return nil
|
|
}
|
|
|
|
go p.SendDeletedDecisions(ctx, &cache)
|
|
|
|
return nil
|
|
case <-ticker.C:
|
|
if len(cache) > 0 {
|
|
p.mu.Lock()
|
|
cacheCopy := cache
|
|
cache = make([]models.DecisionsDeleteRequestItem, 0)
|
|
p.mu.Unlock()
|
|
p.Logger.Infof("sync decisions: %d deleted decisions to push", len(cacheCopy))
|
|
|
|
go p.SendDeletedDecisions(ctx, &cacheCopy)
|
|
}
|
|
case deletedDecisions := <-p.Channels.DeleteDecisionChannel:
|
|
if (p.consoleConfig.ShareManualDecisions != nil && *p.consoleConfig.ShareManualDecisions) || (p.consoleConfig.ConsoleManagement != nil && *p.consoleConfig.ConsoleManagement) {
|
|
var tmpDecisions []models.DecisionsDeleteRequestItem
|
|
|
|
p.Logger.Debugf("%d decisions deletion to add in cache", len(deletedDecisions))
|
|
|
|
for _, decision := range deletedDecisions {
|
|
tmpDecisions = append(tmpDecisions, models.DecisionsDeleteRequestItem(decision.UUID))
|
|
}
|
|
|
|
p.mu.Lock()
|
|
cache = append(cache, tmpDecisions...)
|
|
p.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Papi) sendDeletedDecisionsBatch(ctx context.Context, decisions []models.DecisionsDeleteRequestItem) error {
|
|
ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
_, _, err := p.apiClient.DecisionDelete.Add(ctxBatch, (*models.DecisionsDeleteRequest)(&decisions))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *Papi) SendDeletedDecisions(ctx context.Context, cacheOrig *models.DecisionsDeleteRequest) {
|
|
var cache []models.DecisionsDeleteRequestItem = *cacheOrig
|
|
|
|
batchSize := 50
|
|
|
|
for start := 0; start < len(cache); start += batchSize {
|
|
end := start + batchSize
|
|
|
|
if end > len(cache) {
|
|
end = len(cache)
|
|
}
|
|
|
|
if err := p.sendDeletedDecisionsBatch(ctx, cache[start:end]); err != nil {
|
|
p.Logger.Errorf("sending deleted decisions to central API: %s", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (p *Papi) Shutdown() {
|
|
p.Logger.Infof("Shutting down PAPI")
|
|
p.syncTomb.Kill(nil)
|
|
p.Client.Stop()
|
|
}
|