mirror of
https://github.com/crowdsecurity/crowdsec.git
synced 2025-05-15 13:53:58 +02:00
* Add support for loki datasource --------- Co-authored-by: Mathieu Lecarme <mathieu@garambrogne.net> Co-authored-by: Sebastien Blot <sebastien@crowdsec.net> Co-authored-by: Thibault "bui" Koechlin <thibault@crowdsec.net>
370 lines
9.8 KiB
Go
370 lines
9.8 KiB
Go
package loki
|
|
|
|
/*
|
|
https://grafana.com/docs/loki/latest/api/#get-lokiapiv1tail
|
|
*/
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net/url"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
log "github.com/sirupsen/logrus"
|
|
tomb "gopkg.in/tomb.v2"
|
|
yaml "gopkg.in/yaml.v2"
|
|
|
|
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
|
|
lokiclient "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/loki/internal/lokiclient"
|
|
"github.com/crowdsecurity/crowdsec/pkg/types"
|
|
)
|
|
|
|
const (
|
|
readyTimeout time.Duration = 3 * time.Second
|
|
readyLoop int = 3
|
|
readySleep time.Duration = 10 * time.Second
|
|
lokiLimit int = 100
|
|
)
|
|
|
|
var linesRead = prometheus.NewCounterVec(
|
|
prometheus.CounterOpts{
|
|
Name: "cs_lokisource_hits_total",
|
|
Help: "Total lines that were read.",
|
|
},
|
|
[]string{"source"})
|
|
|
|
type LokiAuthConfiguration struct {
|
|
Username string `yaml:"username"`
|
|
Password string `yaml:"password"`
|
|
}
|
|
|
|
type LokiConfiguration struct {
|
|
URL string `yaml:"url"` // Loki url
|
|
Prefix string `yaml:"prefix"` // Loki prefix
|
|
Query string `yaml:"query"` // LogQL query
|
|
Limit int `yaml:"limit"` // Limit of logs to read
|
|
DelayFor time.Duration `yaml:"delay_for"`
|
|
Since time.Duration `yaml:"since"`
|
|
Headers map[string]string `yaml:"headers"` // HTTP headers for talking to Loki
|
|
WaitForReady time.Duration `yaml:"wait_for_ready"` // Retry interval, default is 10 seconds
|
|
Auth LokiAuthConfiguration `yaml:"auth"`
|
|
MaxFailureDuration time.Duration `yaml:"max_failure_duration"` // Max duration of failure before stopping the source
|
|
configuration.DataSourceCommonCfg `yaml:",inline"`
|
|
}
|
|
|
|
type LokiSource struct {
|
|
Config LokiConfiguration
|
|
|
|
Client *lokiclient.LokiClient
|
|
|
|
logger *log.Entry
|
|
lokiWebsocket string
|
|
}
|
|
|
|
func (l *LokiSource) GetMetrics() []prometheus.Collector {
|
|
return []prometheus.Collector{linesRead}
|
|
}
|
|
|
|
func (l *LokiSource) GetAggregMetrics() []prometheus.Collector {
|
|
return []prometheus.Collector{linesRead}
|
|
}
|
|
|
|
func (l *LokiSource) UnmarshalConfig(yamlConfig []byte) error {
|
|
err := yaml.UnmarshalStrict(yamlConfig, &l.Config)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot parse loki acquisition configuration: %w", err)
|
|
}
|
|
|
|
if l.Config.Query == "" {
|
|
return errors.New("loki query is mandatory")
|
|
}
|
|
|
|
if l.Config.WaitForReady == 0 {
|
|
l.Config.WaitForReady = 10 * time.Second
|
|
}
|
|
|
|
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
|
|
return errors.New("delay_for should be a value between 1s and 5s")
|
|
}
|
|
|
|
if l.Config.Mode == "" {
|
|
l.Config.Mode = configuration.TAIL_MODE
|
|
}
|
|
if l.Config.Prefix == "" {
|
|
l.Config.Prefix = "/"
|
|
}
|
|
|
|
if !strings.HasSuffix(l.Config.Prefix, "/") {
|
|
l.Config.Prefix += "/"
|
|
}
|
|
|
|
if l.Config.Limit == 0 {
|
|
l.Config.Limit = lokiLimit
|
|
}
|
|
|
|
if l.Config.Mode == configuration.TAIL_MODE {
|
|
l.logger.Infof("Resetting since")
|
|
l.Config.Since = 0
|
|
}
|
|
|
|
if l.Config.MaxFailureDuration == 0 {
|
|
l.Config.MaxFailureDuration = 30 * time.Second
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *LokiSource) Configure(config []byte, logger *log.Entry) error {
|
|
l.Config = LokiConfiguration{}
|
|
l.logger = logger
|
|
err := l.UnmarshalConfig(config)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
l.logger.Infof("Since value: %s", l.Config.Since.String())
|
|
|
|
clientConfig := lokiclient.Config{
|
|
LokiURL: l.Config.URL,
|
|
Headers: l.Config.Headers,
|
|
Limit: l.Config.Limit,
|
|
Query: l.Config.Query,
|
|
Since: l.Config.Since,
|
|
Username: l.Config.Auth.Username,
|
|
Password: l.Config.Auth.Password,
|
|
FailMaxDuration: l.Config.MaxFailureDuration,
|
|
}
|
|
|
|
l.Client = lokiclient.NewLokiClient(clientConfig)
|
|
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
|
|
return nil
|
|
}
|
|
|
|
func (l *LokiSource) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
|
|
l.logger = logger
|
|
l.Config = LokiConfiguration{}
|
|
l.Config.Mode = configuration.CAT_MODE
|
|
l.Config.Labels = labels
|
|
l.Config.UniqueId = uuid
|
|
|
|
u, err := url.Parse(dsn)
|
|
if err != nil {
|
|
return fmt.Errorf("while parsing dsn '%s': %w", dsn, err)
|
|
}
|
|
if u.Scheme != "loki" {
|
|
return fmt.Errorf("invalid DSN %s for loki source, must start with loki://", dsn)
|
|
}
|
|
if u.Host == "" {
|
|
return errors.New("empty loki host")
|
|
}
|
|
scheme := "http"
|
|
|
|
params := u.Query()
|
|
if q := params.Get("ssl"); q != "" {
|
|
scheme = "https"
|
|
}
|
|
if q := params.Get("query"); q != "" {
|
|
l.Config.Query = q
|
|
}
|
|
if w := params.Get("wait_for_ready"); w != "" {
|
|
l.Config.WaitForReady, err = time.ParseDuration(w)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
} else {
|
|
l.Config.WaitForReady = 10 * time.Second
|
|
}
|
|
|
|
if d := params.Get("delay_for"); d != "" {
|
|
l.Config.DelayFor, err = time.ParseDuration(d)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid duration: %w", err)
|
|
}
|
|
if l.Config.DelayFor < 0*time.Second || l.Config.DelayFor > 5*time.Second {
|
|
return errors.New("delay_for should be a value between 1s and 5s")
|
|
}
|
|
} else {
|
|
l.Config.DelayFor = 0 * time.Second
|
|
}
|
|
|
|
if s := params.Get("since"); s != "" {
|
|
l.Config.Since, err = time.ParseDuration(s)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid since in dsn: %w", err)
|
|
}
|
|
}
|
|
|
|
if max_failure_duration := params.Get("max_failure_duration"); max_failure_duration != "" {
|
|
duration, err := time.ParseDuration(max_failure_duration)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid max_failure_duration in dsn: %w", err)
|
|
}
|
|
l.Config.MaxFailureDuration = duration
|
|
} else {
|
|
l.Config.MaxFailureDuration = 5 * time.Second // for OneShot mode it doesn't make sense to have longer duration
|
|
}
|
|
|
|
if limit := params.Get("limit"); limit != "" {
|
|
limit, err := strconv.Atoi(limit)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid limit in dsn: %w", err)
|
|
}
|
|
l.Config.Limit = limit
|
|
} else {
|
|
l.Config.Limit = 5000 // max limit allowed by loki
|
|
}
|
|
|
|
if logLevel := params.Get("log_level"); logLevel != "" {
|
|
level, err := log.ParseLevel(logLevel)
|
|
if err != nil {
|
|
return fmt.Errorf("invalid log_level in dsn: %w", err)
|
|
}
|
|
l.Config.LogLevel = &level
|
|
l.logger.Logger.SetLevel(level)
|
|
}
|
|
|
|
l.Config.URL = fmt.Sprintf("%s://%s", scheme, u.Host)
|
|
if u.User != nil {
|
|
l.Config.Auth.Username = u.User.Username()
|
|
l.Config.Auth.Password, _ = u.User.Password()
|
|
}
|
|
|
|
clientConfig := lokiclient.Config{
|
|
LokiURL: l.Config.URL,
|
|
Headers: l.Config.Headers,
|
|
Limit: l.Config.Limit,
|
|
Query: l.Config.Query,
|
|
Since: l.Config.Since,
|
|
Username: l.Config.Auth.Username,
|
|
Password: l.Config.Auth.Password,
|
|
DelayFor: int(l.Config.DelayFor / time.Second),
|
|
}
|
|
|
|
l.Client = lokiclient.NewLokiClient(clientConfig)
|
|
l.Client.Logger = logger.WithFields(log.Fields{"component": "lokiclient", "source": l.Config.URL})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (l *LokiSource) GetMode() string {
|
|
return l.Config.Mode
|
|
}
|
|
|
|
func (l *LokiSource) GetName() string {
|
|
return "loki"
|
|
}
|
|
|
|
// OneShotAcquisition reads a set of file and returns when done
|
|
func (l *LokiSource) OneShotAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
l.logger.Debug("Loki one shot acquisition")
|
|
l.Client.SetTomb(t)
|
|
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
|
|
defer cancel()
|
|
err := l.Client.Ready(readyCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("loki is not ready: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
c := l.Client.QueryRange(ctx, false)
|
|
|
|
for {
|
|
select {
|
|
case <-t.Dying():
|
|
l.logger.Debug("Loki one shot acquisition stopped")
|
|
cancel()
|
|
return nil
|
|
case resp, ok := <-c:
|
|
if !ok {
|
|
l.logger.Info("Loki acquisition done, chan closed")
|
|
cancel()
|
|
return nil
|
|
}
|
|
for _, stream := range resp.Data.Result {
|
|
for _, entry := range stream.Entries {
|
|
l.readOneEntry(entry, l.Config.Labels, out)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]string, out chan types.Event) {
|
|
ll := types.Line{}
|
|
ll.Raw = entry.Line
|
|
ll.Time = entry.Timestamp
|
|
ll.Src = l.Config.URL
|
|
ll.Labels = labels
|
|
ll.Process = true
|
|
ll.Module = l.GetName()
|
|
|
|
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
|
|
expectMode := types.LIVE
|
|
if l.Config.UseTimeMachine {
|
|
expectMode = types.TIMEMACHINE
|
|
}
|
|
out <- types.Event{
|
|
Line: ll,
|
|
Process: true,
|
|
Type: types.LOG,
|
|
ExpectMode: expectMode,
|
|
}
|
|
}
|
|
|
|
func (l *LokiSource) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
|
|
l.Client.SetTomb(t)
|
|
readyCtx, cancel := context.WithTimeout(context.Background(), l.Config.WaitForReady)
|
|
defer cancel()
|
|
err := l.Client.Ready(readyCtx)
|
|
if err != nil {
|
|
return fmt.Errorf("loki is not ready: %w", err)
|
|
}
|
|
ll := l.logger.WithField("websocket_url", l.lokiWebsocket)
|
|
t.Go(func() error {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
respChan := l.Client.QueryRange(ctx, true)
|
|
if err != nil {
|
|
ll.Errorf("could not start loki tail: %s", err)
|
|
return fmt.Errorf("while starting loki tail: %w", err)
|
|
}
|
|
for {
|
|
select {
|
|
case resp, ok := <-respChan:
|
|
if !ok {
|
|
ll.Warnf("loki channel closed")
|
|
return err
|
|
}
|
|
for _, stream := range resp.Data.Result {
|
|
for _, entry := range stream.Entries {
|
|
l.readOneEntry(entry, l.Config.Labels, out)
|
|
}
|
|
}
|
|
case <-t.Dying():
|
|
return nil
|
|
}
|
|
}
|
|
})
|
|
return nil
|
|
}
|
|
|
|
func (l *LokiSource) CanRun() error {
|
|
return nil
|
|
}
|
|
|
|
func (l *LokiSource) GetUuid() string {
|
|
return l.Config.UniqueId
|
|
}
|
|
|
|
func (l *LokiSource) Dump() interface{} {
|
|
return l
|
|
}
|
|
|
|
// SupportedModes returns the supported modes by the acquisition module
|
|
func (l *LokiSource) SupportedModes() []string {
|
|
return []string{configuration.TAIL_MODE, configuration.CAT_MODE}
|
|
}
|