add HTTP datasource (#3294)

This commit is contained in:
he2ss 2024-11-05 14:15:04 +01:00 committed by GitHub
parent 57521114bd
commit 19b70f10be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 1419 additions and 100 deletions

View file

@ -134,6 +134,7 @@ COMPONENTS := \
datasource_cloudwatch \
datasource_docker \
datasource_file \
datasource_http \
datasource_k8saudit \
datasource_kafka \
datasource_journalctl \

View file

@ -337,6 +337,20 @@ func GetMetrics(sources []DataSource, aggregated bool) error {
return nil
}
// There's no need for an actual deep copy
// The event is almost empty, we are mostly interested in allocating new maps for Parsed/Meta/...
func copyEvent(evt types.Event, line string) types.Event {
evtCopy := types.MakeEvent(evt.ExpectMode == types.TIMEMACHINE, evt.Type, evt.Process)
evtCopy.Line = evt.Line
evtCopy.Line.Raw = line
evtCopy.Line.Labels = make(map[string]string)
for k, v := range evt.Line.Labels {
evtCopy.Line.Labels[k] = v
}
return evtCopy
}
func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
defer trace.CatchPanic("crowdsec/acquis")
logger.Infof("transformer started")
@ -363,8 +377,7 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
switch v := out.(type) {
case string:
logger.Tracef("transform expression returned %s", v)
evt.Line.Raw = v
output <- evt
output <- copyEvent(evt, v)
case []interface{}:
logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content
@ -373,19 +386,16 @@ func transform(transformChan chan types.Event, output chan types.Event, AcquisTo
if !ok {
logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string")
output <- evt
continue
}
evt.Line.Raw = l
output <- evt
output <- copyEvent(evt, l)
}
case []string:
logger.Tracef("transform expression returned %v", v)
for _, line := range v {
evt.Line.Raw = line
output <- evt
output <- copyEvent(evt, line)
}
default:
logger.Errorf("transform expression returned an invalid type %T, sending event as-is", out)

12
pkg/acquisition/http.go Normal file
View file

@ -0,0 +1,12 @@
//go:build !no_datasource_http
package acquisition
import (
httpacquisition "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules/http"
)
//nolint:gochecknoinits
func init() {
registerDataSource("http", func() DataSource { return &httpacquisition.HTTPSource{} })
}

View file

@ -116,10 +116,7 @@ func AppsecEventGeneration(inEvt types.Event, request *http.Request) (*types.Eve
}
func EventFromRequest(r *appsec.ParsedRequest, labels map[string]string) (types.Event, error) {
evt := types.Event{}
// we might want to change this based on in-band vs out-of-band ?
evt.Type = types.LOG
evt.ExpectMode = types.LIVE
evt := types.MakeEvent(false, types.LOG, true)
// def needs fixing
evt.Stage = "s00-raw"
evt.Parsed = map[string]string{

View file

@ -710,7 +710,7 @@ func (cw *CloudwatchSource) CatLogStream(ctx context.Context, cfg *LogStreamTail
func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig) (types.Event, error) {
l := types.Line{}
evt := types.Event{}
evt := types.MakeEvent(cfg.ExpectMode == types.TIMEMACHINE, types.LOG, true)
if log.Message == nil {
return evt, errors.New("nil message")
}
@ -726,9 +726,6 @@ func cwLogToEvent(log *cloudwatchlogs.OutputLogEvent, cfg *LogStreamTailConfig)
l.Process = true
l.Module = "cloudwatch"
evt.Line = l
evt.Process = true
evt.Type = types.LOG
evt.ExpectMode = cfg.ExpectMode
cfg.logger.Debugf("returned event labels : %+v", evt.Line.Labels)
return evt, nil
}

View file

@ -334,7 +334,10 @@ func (d *DockerSource) OneShotAcquisition(ctx context.Context, out chan types.Ev
if d.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": containerConfig.Name}).Inc()
}
evt := types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
evt := types.MakeEvent(true, types.LOG, true)
evt.Line = l
evt.Process = true
evt.Type = types.LOG
out <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
}
@ -579,12 +582,8 @@ func (d *DockerSource) TailDocker(ctx context.Context, container *ContainerConfi
l.Src = container.Name
l.Process = true
l.Module = d.GetName()
var evt types.Event
if !d.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(d.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
linesRead.With(prometheus.Labels{"source": container.Name}).Inc()
outChan <- evt
d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)

View file

@ -621,11 +621,9 @@ func (f *FileSource) tailFile(out chan types.Event, t *tomb.Tomb, tail *tail.Tai
// we're tailing, it must be real time logs
logger.Debugf("pushing %+v", l)
expectMode := types.LIVE
if f.config.UseTimeMachine {
expectMode = types.TIMEMACHINE
}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: expectMode}
evt := types.MakeEvent(f.config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
}
}
}
@ -684,7 +682,7 @@ func (f *FileSource) readFile(filename string, out chan types.Event, t *tomb.Tom
linesRead.With(prometheus.Labels{"source": filename}).Inc()
// we're reading logs at once, it must be time-machine buckets
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})}
}
}

View file

@ -0,0 +1,416 @@
package httpacquisition
import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"encoding/json"
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
"time"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"gopkg.in/yaml.v3"
"github.com/crowdsecurity/go-cs-lib/trace"
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
"github.com/crowdsecurity/crowdsec/pkg/types"
)
var (
dataSourceName = "http"
)
var linesRead = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "cs_httpsource_hits_total",
Help: "Total lines that were read from http source",
},
[]string{"path", "src"})
type HttpConfiguration struct {
//IPFilter []string `yaml:"ip_filter"`
//ChunkSize *int64 `yaml:"chunk_size"`
ListenAddr string `yaml:"listen_addr"`
Path string `yaml:"path"`
AuthType string `yaml:"auth_type"`
BasicAuth *BasicAuthConfig `yaml:"basic_auth"`
Headers *map[string]string `yaml:"headers"`
TLS *TLSConfig `yaml:"tls"`
CustomStatusCode *int `yaml:"custom_status_code"`
CustomHeaders *map[string]string `yaml:"custom_headers"`
MaxBodySize *int64 `yaml:"max_body_size"`
Timeout *time.Duration `yaml:"timeout"`
configuration.DataSourceCommonCfg `yaml:",inline"`
}
type BasicAuthConfig struct {
Username string `yaml:"username"`
Password string `yaml:"password"`
}
type TLSConfig struct {
InsecureSkipVerify bool `yaml:"insecure_skip_verify"`
ServerCert string `yaml:"server_cert"`
ServerKey string `yaml:"server_key"`
CaCert string `yaml:"ca_cert"`
}
type HTTPSource struct {
metricsLevel int
Config HttpConfiguration
logger *log.Entry
Server *http.Server
}
func (h *HTTPSource) GetUuid() string {
return h.Config.UniqueId
}
func (h *HTTPSource) UnmarshalConfig(yamlConfig []byte) error {
h.Config = HttpConfiguration{}
err := yaml.Unmarshal(yamlConfig, &h.Config)
if err != nil {
return fmt.Errorf("cannot parse %s datasource configuration: %w", dataSourceName, err)
}
if h.Config.Mode == "" {
h.Config.Mode = configuration.TAIL_MODE
}
return nil
}
func (hc *HttpConfiguration) Validate() error {
if hc.ListenAddr == "" {
return errors.New("listen_addr is required")
}
if hc.Path == "" {
hc.Path = "/"
}
if hc.Path[0] != '/' {
return errors.New("path must start with /")
}
switch hc.AuthType {
case "basic_auth":
baseErr := "basic_auth is selected, but"
if hc.BasicAuth == nil {
return errors.New(baseErr + " basic_auth is not provided")
}
if hc.BasicAuth.Username == "" {
return errors.New(baseErr + " username is not provided")
}
if hc.BasicAuth.Password == "" {
return errors.New(baseErr + " password is not provided")
}
case "headers":
if hc.Headers == nil {
return errors.New("headers is selected, but headers is not provided")
}
case "mtls":
if hc.TLS == nil || hc.TLS.CaCert == "" {
return errors.New("mtls is selected, but ca_cert is not provided")
}
default:
return errors.New("invalid auth_type: must be one of basic_auth, headers, mtls")
}
if hc.TLS != nil {
if hc.TLS.ServerCert == "" {
return errors.New("server_cert is required")
}
if hc.TLS.ServerKey == "" {
return errors.New("server_key is required")
}
}
if hc.MaxBodySize != nil && *hc.MaxBodySize <= 0 {
return errors.New("max_body_size must be positive")
}
/*
if hc.ChunkSize != nil && *hc.ChunkSize <= 0 {
return errors.New("chunk_size must be positive")
}
*/
if hc.CustomStatusCode != nil {
statusText := http.StatusText(*hc.CustomStatusCode)
if statusText == "" {
return errors.New("invalid HTTP status code")
}
}
return nil
}
func (h *HTTPSource) Configure(yamlConfig []byte, logger *log.Entry, MetricsLevel int) error {
h.logger = logger
h.metricsLevel = MetricsLevel
err := h.UnmarshalConfig(yamlConfig)
if err != nil {
return err
}
if err := h.Config.Validate(); err != nil {
return fmt.Errorf("invalid configuration: %w", err)
}
return nil
}
func (h *HTTPSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return fmt.Errorf("%s datasource does not support command-line acquisition", dataSourceName)
}
func (h *HTTPSource) GetMode() string {
return h.Config.Mode
}
func (h *HTTPSource) GetName() string {
return dataSourceName
}
func (h *HTTPSource) OneShotAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
return fmt.Errorf("%s datasource does not support one-shot acquisition", dataSourceName)
}
func (h *HTTPSource) CanRun() error {
return nil
}
func (h *HTTPSource) GetMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (h *HTTPSource) GetAggregMetrics() []prometheus.Collector {
return []prometheus.Collector{linesRead}
}
func (h *HTTPSource) Dump() interface{} {
return h
}
func (hc *HttpConfiguration) NewTLSConfig() (*tls.Config, error) {
tlsConfig := tls.Config{
InsecureSkipVerify: hc.TLS.InsecureSkipVerify,
}
if hc.TLS.ServerCert != "" && hc.TLS.ServerKey != "" {
cert, err := tls.LoadX509KeyPair(hc.TLS.ServerCert, hc.TLS.ServerKey)
if err != nil {
return nil, fmt.Errorf("failed to load server cert/key: %w", err)
}
tlsConfig.Certificates = []tls.Certificate{cert}
}
if hc.AuthType == "mtls" && hc.TLS.CaCert != "" {
caCert, err := os.ReadFile(hc.TLS.CaCert)
if err != nil {
return nil, fmt.Errorf("failed to read ca cert: %w", err)
}
caCertPool, err := x509.SystemCertPool()
if err != nil {
return nil, fmt.Errorf("failed to load system cert pool: %w", err)
}
if caCertPool == nil {
caCertPool = x509.NewCertPool()
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.ClientCAs = caCertPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
}
return &tlsConfig, nil
}
func authorizeRequest(r *http.Request, hc *HttpConfiguration) error {
if hc.AuthType == "basic_auth" {
username, password, ok := r.BasicAuth()
if !ok {
return errors.New("missing basic auth")
}
if username != hc.BasicAuth.Username || password != hc.BasicAuth.Password {
return errors.New("invalid basic auth")
}
}
if hc.AuthType == "headers" {
for key, value := range *hc.Headers {
if r.Header.Get(key) != value {
return errors.New("invalid headers")
}
}
}
return nil
}
func (h *HTTPSource) processRequest(w http.ResponseWriter, r *http.Request, hc *HttpConfiguration, out chan types.Event) error {
if hc.MaxBodySize != nil && r.ContentLength > *hc.MaxBodySize {
w.WriteHeader(http.StatusRequestEntityTooLarge)
return fmt.Errorf("body size exceeds max body size: %d > %d", r.ContentLength, *hc.MaxBodySize)
}
srcHost, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return err
}
defer r.Body.Close()
reader := r.Body
if r.Header.Get("Content-Encoding") == "gzip" {
reader, err = gzip.NewReader(r.Body)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return fmt.Errorf("failed to create gzip reader: %w", err)
}
defer reader.Close()
}
decoder := json.NewDecoder(reader)
for {
var message json.RawMessage
if err := decoder.Decode(&message); err != nil {
if err == io.EOF {
break
}
w.WriteHeader(http.StatusBadRequest)
return fmt.Errorf("failed to decode: %w", err)
}
line := types.Line{
Raw: string(message),
Src: srcHost,
Time: time.Now().UTC(),
Labels: hc.Labels,
Process: true,
Module: h.GetName(),
}
if h.metricsLevel == configuration.METRICS_AGGREGATE {
line.Src = hc.Path
}
evt := types.MakeEvent(h.Config.UseTimeMachine, types.LOG, true)
evt.Line = line
if h.metricsLevel == configuration.METRICS_AGGREGATE {
linesRead.With(prometheus.Labels{"path": hc.Path, "src": ""}).Inc()
} else if h.metricsLevel == configuration.METRICS_FULL {
linesRead.With(prometheus.Labels{"path": hc.Path, "src": srcHost}).Inc()
}
h.logger.Tracef("line to send: %+v", line)
out <- evt
}
return nil
}
func (h *HTTPSource) RunServer(out chan types.Event, t *tomb.Tomb) error {
mux := http.NewServeMux()
mux.HandleFunc(h.Config.Path, func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
h.logger.Errorf("method not allowed: %s", r.Method)
http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
return
}
if err := authorizeRequest(r, &h.Config); err != nil {
h.logger.Errorf("failed to authorize request from '%s': %s", r.RemoteAddr, err)
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
err := h.processRequest(w, r, &h.Config, out)
if err != nil {
h.logger.Errorf("failed to process request from '%s': %s", r.RemoteAddr, err)
return
}
if h.Config.CustomHeaders != nil {
for key, value := range *h.Config.CustomHeaders {
w.Header().Set(key, value)
}
}
if h.Config.CustomStatusCode != nil {
w.WriteHeader(*h.Config.CustomStatusCode)
} else {
w.WriteHeader(http.StatusOK)
}
w.Write([]byte("OK"))
})
h.Server = &http.Server{
Addr: h.Config.ListenAddr,
Handler: mux,
}
if h.Config.Timeout != nil {
h.Server.ReadTimeout = *h.Config.Timeout
}
if h.Config.TLS != nil {
tlsConfig, err := h.Config.NewTLSConfig()
if err != nil {
return fmt.Errorf("failed to create tls config: %w", err)
}
h.logger.Tracef("tls config: %+v", tlsConfig)
h.Server.TLSConfig = tlsConfig
}
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/http/server")
if h.Config.TLS != nil {
h.logger.Infof("start https server on %s", h.Config.ListenAddr)
err := h.Server.ListenAndServeTLS(h.Config.TLS.ServerCert, h.Config.TLS.ServerKey)
if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("https server failed: %w", err)
}
} else {
h.logger.Infof("start http server on %s", h.Config.ListenAddr)
err := h.Server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
return fmt.Errorf("http server failed: %w", err)
}
}
return nil
})
//nolint //fp
for {
select {
case <-t.Dying():
h.logger.Infof("%s datasource stopping", dataSourceName)
if err := h.Server.Close(); err != nil {
return fmt.Errorf("while closing %s server: %w", dataSourceName, err)
}
return nil
}
}
}
func (h *HTTPSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
h.logger.Debugf("start http server on %s", h.Config.ListenAddr)
t.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis/http/live")
return h.RunServer(out, t)
})
return nil
}

View file

@ -0,0 +1,785 @@
package httpacquisition
import (
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"
"testing"
"time"
"github.com/crowdsecurity/crowdsec/pkg/types"
"github.com/crowdsecurity/go-cs-lib/cstest"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"gopkg.in/tomb.v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
const (
testHTTPServerAddr = "http://127.0.0.1:8080"
testHTTPServerAddrTLS = "https://127.0.0.1:8080"
)
func TestConfigure(t *testing.T) {
tests := []struct {
config string
expectedErr string
}{
{
config: `
foobar: bla`,
expectedErr: "invalid configuration: listen_addr is required",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: wrongpath`,
expectedErr: "invalid configuration: path must start with /",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth`,
expectedErr: "invalid configuration: basic_auth is selected, but basic_auth is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers`,
expectedErr: "invalid configuration: headers is selected, but headers is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth
basic_auth:
username: 132`,
expectedErr: "invalid configuration: basic_auth is selected, but password is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth
basic_auth:
password: 132`,
expectedErr: "invalid configuration: basic_auth is selected, but username is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:`,
expectedErr: "invalid configuration: headers is selected, but headers is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: toto`,
expectedErr: "invalid configuration: invalid auth_type: must be one of basic_auth, headers, mtls",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: value
tls:
server_key: key`,
expectedErr: "invalid configuration: server_cert is required",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: value
tls:
server_cert: cert`,
expectedErr: "invalid configuration: server_key is required",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: mtls
tls:
server_cert: cert
server_key: key`,
expectedErr: "invalid configuration: mtls is selected, but ca_cert is not provided",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: value
max_body_size: 0`,
expectedErr: "invalid configuration: max_body_size must be positive",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: value
timeout: toto`,
expectedErr: "cannot parse http datasource configuration: yaml: unmarshal errors:\n line 8: cannot unmarshal !!str `toto` into time.Duration",
},
{
config: `
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: value
custom_status_code: 999`,
expectedErr: "invalid configuration: invalid HTTP status code",
},
}
subLogger := log.WithFields(log.Fields{
"type": "http",
})
for _, test := range tests {
h := HTTPSource{}
err := h.Configure([]byte(test.config), subLogger, 0)
cstest.AssertErrorContains(t, err, test.expectedErr)
}
}
func TestGetUuid(t *testing.T) {
h := HTTPSource{}
h.Config.UniqueId = "test"
assert.Equal(t, "test", h.GetUuid())
}
func TestUnmarshalConfig(t *testing.T) {
h := HTTPSource{}
err := h.UnmarshalConfig([]byte(`
source: http
listen_addr: 127.0.0.1:8080
path: 15
auth_type: headers`))
cstest.AssertErrorMessage(t, err, "cannot parse http datasource configuration: yaml: line 4: found a tab character that violates indentation")
}
func TestConfigureByDSN(t *testing.T) {
h := HTTPSource{}
err := h.ConfigureByDSN("http://localhost:8080/test", map[string]string{}, log.WithFields(log.Fields{
"type": "http",
}), "test")
cstest.AssertErrorMessage(
t,
err,
"http datasource does not support command-line acquisition",
)
}
func TestGetMode(t *testing.T) {
h := HTTPSource{}
h.Config.Mode = "test"
assert.Equal(t, "test", h.GetMode())
}
func TestGetName(t *testing.T) {
h := HTTPSource{}
assert.Equal(t, "http", h.GetName())
}
func SetupAndRunHTTPSource(t *testing.T, h *HTTPSource, config []byte, metricLevel int) (chan types.Event, *tomb.Tomb) {
ctx := context.Background()
subLogger := log.WithFields(log.Fields{
"type": "http",
})
err := h.Configure(config, subLogger, metricLevel)
require.NoError(t, err)
tomb := tomb.Tomb{}
out := make(chan types.Event)
err = h.StreamingAcquisition(ctx, out, &tomb)
require.NoError(t, err)
for _, metric := range h.GetMetrics() {
prometheus.Register(metric)
}
return out, &tomb
}
func TestStreamingAcquisitionWrongHTTPMethod(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth
basic_auth:
username: test
password: test`), 0)
time.Sleep(1 * time.Second)
res, err := http.Get(fmt.Sprintf("%s/test", testHTTPServerAddr))
require.NoError(t, err)
assert.Equal(t, http.StatusMethodNotAllowed, res.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionUnknownPath(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth
basic_auth:
username: test
password: test`), 0)
time.Sleep(1 * time.Second)
res, err := http.Get(fmt.Sprintf("%s/unknown", testHTTPServerAddr))
require.NoError(t, err)
assert.Equal(t, http.StatusNotFound, res.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionBasicAuth(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: basic_auth
basic_auth:
username: test
password: test`), 0)
time.Sleep(1 * time.Second)
client := &http.Client{}
resp, err := http.Post(fmt.Sprintf("%s/test", testHTTPServerAddr), "application/json", strings.NewReader("test"))
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("test"))
require.NoError(t, err)
req.SetBasicAuth("test", "WrongPassword")
resp, err = client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionBadHeaders(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test`), 0)
time.Sleep(1 * time.Second)
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("test"))
require.NoError(t, err)
req.Header.Add("Key", "wrong")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionMaxBodySize(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test
max_body_size: 5`), 0)
time.Sleep(1 * time.Second)
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader("testtest"))
require.NoError(t, err)
req.Header.Add("Key", "test")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusRequestEntityTooLarge, resp.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionSuccess(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test`), 2)
time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt}, errChan)
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt))
require.NoError(t, err)
req.Header.Add("Key", "test")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 1)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionCustomStatusCodeAndCustomHeaders(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test
custom_status_code: 201
custom_headers:
success: true`), 2)
time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt}, errChan)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(rawEvt))
require.NoError(t, err)
req.Header.Add("Key", "test")
resp, err := http.DefaultClient.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusCreated, resp.StatusCode)
assert.Equal(t, "true", resp.Header.Get("Success"))
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 1)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
type slowReader struct {
delay time.Duration
body []byte
index int
}
func (sr *slowReader) Read(p []byte) (int, error) {
if sr.index >= len(sr.body) {
return 0, io.EOF
}
time.Sleep(sr.delay) // Simulate a delay in reading
n := copy(p, sr.body[sr.index:])
sr.index += n
return n, nil
}
func assertEvents(out chan types.Event, expected []string, errChan chan error) {
readLines := []types.Event{}
for i := 0; i < len(expected); i++ {
select {
case event := <-out:
readLines = append(readLines, event)
case <-time.After(2 * time.Second):
errChan <- errors.New("timeout waiting for event")
return
}
}
if len(readLines) != len(expected) {
errChan <- fmt.Errorf("expected %d lines, got %d", len(expected), len(readLines))
return
}
for i, evt := range readLines {
if evt.Line.Raw != expected[i] {
errChan <- fmt.Errorf(`expected %s, got '%+v'`, expected, evt.Line.Raw)
return
}
if evt.Line.Src != "127.0.0.1" {
errChan <- fmt.Errorf("expected '127.0.0.1', got '%s'", evt.Line.Src)
return
}
if evt.Line.Module != "http" {
errChan <- fmt.Errorf("expected 'http', got '%s'", evt.Line.Module)
return
}
}
errChan <- nil
}
func TestStreamingAcquisitionTimeout(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test
timeout: 1s`), 0)
time.Sleep(1 * time.Second)
slow := &slowReader{
delay: 2 * time.Second,
body: []byte(`{"test": "delayed_payload"}`),
}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), slow)
require.NoError(t, err)
req.Header.Add("Key", "test")
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionTLSHTTPRequest(t *testing.T) {
h := &HTTPSource{}
_, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
auth_type: mtls
path: /test
tls:
server_cert: testdata/server.crt
server_key: testdata/server.key
ca_cert: testdata/ca.crt`), 0)
time.Sleep(1 * time.Second)
resp, err := http.Post(fmt.Sprintf("%s/test", testHTTPServerAddr), "application/json", strings.NewReader("test"))
require.NoError(t, err)
assert.Equal(t, http.StatusBadRequest, resp.StatusCode)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionTLSWithHeadersAuthSuccess(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test
tls:
server_cert: testdata/server.crt
server_key: testdata/server.key
`), 0)
time.Sleep(1 * time.Second)
caCert, err := os.ReadFile("testdata/server.crt")
require.NoError(t, err)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
RootCAs: caCertPool,
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt}, errChan)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt))
require.NoError(t, err)
req.Header.Add("Key", "test")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 0)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionMTLS(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: mtls
tls:
server_cert: testdata/server.crt
server_key: testdata/server.key
ca_cert: testdata/ca.crt`), 0)
time.Sleep(1 * time.Second)
// init client cert
cert, err := tls.LoadX509KeyPair("testdata/client.crt", "testdata/client.key")
require.NoError(t, err)
caCert, err := os.ReadFile("testdata/ca.crt")
require.NoError(t, err)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
client := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt}, errChan)
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddrTLS), strings.NewReader(rawEvt))
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 0)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionGzipData(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test`), 2)
time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt, rawEvt}, errChan)
var b strings.Builder
gz := gzip.NewWriter(&b)
_, err := gz.Write([]byte(rawEvt))
require.NoError(t, err)
_, err = gz.Write([]byte(rawEvt))
require.NoError(t, err)
err = gz.Close()
require.NoError(t, err)
// send gzipped compressed data
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(b.String()))
require.NoError(t, err)
req.Header.Add("Key", "test")
req.Header.Add("Content-Encoding", "gzip")
req.Header.Add("Content-Type", "application/json")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 2)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func TestStreamingAcquisitionNDJson(t *testing.T) {
h := &HTTPSource{}
out, tomb := SetupAndRunHTTPSource(t, h, []byte(`
source: http
listen_addr: 127.0.0.1:8080
path: /test
auth_type: headers
headers:
key: test`), 2)
time.Sleep(1 * time.Second)
rawEvt := `{"test": "test"}`
errChan := make(chan error)
go assertEvents(out, []string{rawEvt, rawEvt}, errChan)
client := &http.Client{}
req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/test", testHTTPServerAddr), strings.NewReader(fmt.Sprintf("%s\n%s\n", rawEvt, rawEvt)))
require.NoError(t, err)
req.Header.Add("Key", "test")
req.Header.Add("Content-Type", "application/x-ndjson")
resp, err := client.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
err = <-errChan
require.NoError(t, err)
assertMetrics(t, h.GetMetrics(), 2)
h.Server.Close()
tomb.Kill(nil)
tomb.Wait()
}
func assertMetrics(t *testing.T, metrics []prometheus.Collector, expected int) {
promMetrics, err := prometheus.DefaultGatherer.Gather()
require.NoError(t, err)
isExist := false
for _, metricFamily := range promMetrics {
if metricFamily.GetName() == "cs_httpsource_hits_total" {
isExist = true
assert.Len(t, metricFamily.GetMetric(), 1)
for _, metric := range metricFamily.GetMetric() {
assert.InDelta(t, float64(expected), metric.GetCounter().GetValue(), 0.000001)
labels := metric.GetLabel()
assert.Len(t, labels, 2)
assert.Equal(t, "path", labels[0].GetName())
assert.Equal(t, "/test", labels[0].GetValue())
assert.Equal(t, "src", labels[1].GetName())
assert.Equal(t, "127.0.0.1", labels[1].GetValue())
}
}
}
if !isExist && expected > 0 {
t.Fatalf("expected metric cs_httpsource_hits_total not found")
}
for _, metric := range metrics {
metric.(*prometheus.CounterVec).Reset()
}
}

View file

@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIIDvzCCAqegAwIBAgIUHQfsFpWkCy7gAmDa3A6O+y5CvAswDQYJKoZIhvcNAQEL
BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV
BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx
EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDAxMDBaFw0yOTEwMjIxMDAx
MDBaMG8xCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD
VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxETAPBgNVBAsTCENyb3dkc2Vj
MRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQCZSR2/A24bpVHSiEeSlelfdA32uhk9wHkauwy2qxos/G/UmKG/dgWrHzRh
LawlFVHtVn4u7Hjqz2y2EsH3bX42jC5NMVARgXIOBr1dE6F5/bPqA6SoVgkDm9wh
ZBigyAMxYsR4+3ahuf0pQflBShKrLZ1UYoe6tQXob7l3x5vThEhNkBawBkLfWpj7
7Imm1tGyEZdxCMkT400KRtSmJRrnpiOCUosnacwgp7MCbKWOIOng07Eh16cVUiuI
BthWU/LycIuac2xaD9PFpeK/MpwASRRPXZgPUhiZuaa7vttD0phCdDaS46Oln5/7
tFRZH0alBZmkpVZJCWAP4ujIA3vLAgMBAAGjUzBRMA4GA1UdDwEB/wQEAwIBBjAP
BgNVHRMBAf8EBTADAQH/MB0GA1UdDgQWBBTwpg+WN1nZJs4gj5hfoa+fMSZjGTAP
BgNVHREECDAGhwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQAZuOWT8zHcwbWvC6Jm
/ccgB/U7SbeIYFJrCZd9mTyqsgnkFNH8yJ5F4dXXtPXr+SO/uWWa3G5hams3qVFf
zWzzPDQdyhUhfh5fjUHR2RsSGBmCxcapYHpVvAP5aY1/ujYrXMvAJV0hfDO2tGHb
rveuJxhe8ymQ1Yb2u9NcmI1HG9IVt3Airz4gAIUJWbFvRigky0bukfddOkfiUiaF
DMPJQO6HAj8d8ctSHHVZWzhAInZ1pDg6HIHYF44m1tT27pSQoi0ZFocskDi/fC2f
EIF0nu5fRLUS6BZEfpnDi9U0lbJ/kUrgT5IFHMFqXdRpDqcnXpJZhYtp5l6GoqjM
gT33
-----END CERTIFICATE-----

View file

@ -0,0 +1,24 @@
-----BEGIN CERTIFICATE-----
MIID7jCCAtagAwIBAgIUJMTPh3oPJLPgsnb9T85ieb4EuOQwDQYJKoZIhvcNAQEL
BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV
BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx
EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDQ2MDBaFw0yNTEwMjMxMDQ2
MDBaMHIxCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD
VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxFzAVBgNVBAsTDlRlc3Rpbmcg
Y2xpZW50MQ8wDQYDVQQDEwZjbGllbnQwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAw
ggEKAoIBAQDAUOdpRieRrrH6krUjgcjLgJg6TzoWAb/iv6rfcioX1L9bj9fZSkwu
GqKzXX/PceIXElzQgiGJZErbJtnTzhGS80QgtAB8BwWQIT2zgoGcYJf7pPFvmcMM
qMGFwK0dMC+LHPk+ePtFz8dskI2XJ8jgBdtuZcnDblMuVGtjYT6n0rszvRdo118+
mlGCLPzOfsO1JdOqLWAR88yZfqCFt1TrwmzpRT1crJQeM6i7muw4aO0L7uSek9QM
6APHz0QexSq7/zHOtRjA4jnJbDzZJHRlwOdlsNU9cmTz6uWIQXlg+2ovD55YurNy
+jYfmfDYpimhoeGf54zaETp1fTuTJYpxAgMBAAGjfzB9MA4GA1UdDwEB/wQEAwIF
oDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAd
BgNVHQ4EFgQUmH0/7RuKnoW7sEK4Cr8eVNGbb8swHwYDVR0jBBgwFoAU8KYPljdZ
2SbOII+YX6GvnzEmYxkwDQYJKoZIhvcNAQELBQADggEBAHVn9Zuoyxu9iTFoyJ50
e/XKcmt2uK2M1x+ap2Av7Wb/Omikx/R2YPq7994BfiUCAezY2YtreZzkE6Io1wNM
qApijEJnlqEmOXiYJqlF89QrCcsAsz6lfaqitYBZSL3o4KT+7/uUDVxgNEjEksRz
9qy6DFBLvyhxbOM2zDEV+MVfemBWSvNiojHqXzDBkZnBHHclJLuIKsXDZDGhKbNd
hsoGU00RLevvcUpUJ3a68ekgwiYFJifm0uyfmao9lmiB3i+8ZW3Q4rbwHtD+U7U2
3n+U5PkhiUAveuMfrvUMzsTolZiop9ZLtcALDUFaqyr4tjfVOf5+CGjiluio7oE1
UYg=
-----END CERTIFICATE-----

View file

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEowIBAAKCAQEAwFDnaUYnka6x+pK1I4HIy4CYOk86FgG/4r+q33IqF9S/W4/X
2UpMLhqis11/z3HiFxJc0IIhiWRK2ybZ084RkvNEILQAfAcFkCE9s4KBnGCX+6Tx
b5nDDKjBhcCtHTAvixz5Pnj7Rc/HbJCNlyfI4AXbbmXJw25TLlRrY2E+p9K7M70X
aNdfPppRgiz8zn7DtSXTqi1gEfPMmX6ghbdU68Js6UU9XKyUHjOou5rsOGjtC+7k
npPUDOgDx89EHsUqu/8xzrUYwOI5yWw82SR0ZcDnZbDVPXJk8+rliEF5YPtqLw+e
WLqzcvo2H5nw2KYpoaHhn+eM2hE6dX07kyWKcQIDAQABAoIBAQChriKuza0MfBri
9x3UCRN/is/wDZVe1P+2KL8F9ZvPxytNVeP4qM7c38WzF8MQ6sRR8z0WiqCZOjj4
f3QX7iG2MlAvUkUqAFk778ZIuUov5sE/bU8RLOrfJKz1vqOLa2w8/xHH5LwS1/jn
m6t9zZPCSwpMiMSUSZci1xQlS6b6POZMjeqLPqv9cP8PJNv9UNrHFcQnQi1iwKJH
MJ7CQI3R8FSeGad3P7tB9YDaBm7hHmd/TevuFkymcKNT44XBSgddPDfgKui6sHTY
QQWgWI9VGVO350ZBLRLkrk8wboY4vc15qbBzYFG66WiR/tNdLt3rDYxpcXaDvcQy
e47mYNVxAoGBAMFsUmPDssqzmOkmZxHDM+VmgHYPXjDqQdE299FtuClobUW4iU4g
By7o84aCIBQz2sp9f1KM+10lr+Bqw3s7QBbR5M67PA8Zm45DL9t70NR/NZHGzFRD
BR/NMbwzCqNtY2UGDhYQLGhW8heAwsYwir8ZqmOfKTd9aY1pu/S8m9AlAoGBAP6I
483EIN8R5y+beGcGynYeIrH5Gc+W2FxWIW9jh/G7vRbhMlW4z0GxV3uEAYmOlBH2
AqUkV6+uzU0P4B/m3vCYqLycBVDwifJazDj9nskVL5kGMxia62iwDMXs5nqNS4WJ
ZM5Gl2xIiwmgWnYnujM3eKF2wbm439wj4na80SldAoGANdIqatA9o+GtntKsw2iJ
vD91Z2SHVR0aC1k8Q+4/3GXOYiQjMLYAybDQcpEq0/RJ4SZik1nfZ9/gvJV4p4Wp
I7Br9opq/9ikTEWtv2kIhtiO02151ciAWIUEXdXmE+uQSMASk1kUwkPPQXL2v6cq
NFqz6tyS33nqMQtG3abNxHECgYA4AEA2nmcpDRRTSh50dG8JC9pQU+EU5jhWIHEc
w8Y+LjMNHKDpcU7QQkdgGowICsGTLhAo61ULhycORGboPfBg+QVu8djNlQ6Urttt
0ocj8LBXN6D4UeVnVAyLY3LWFc4+5Bq0s51PKqrEhG5Cvrzd1d+JjspSpVVDZvXF
cAeI1QKBgC/cMN3+2Sc+2biu46DnkdYpdF/N0VGMOgzz+unSVD4RA2mEJ9UdwGga
feshtrtcroHtEmc+WDYgTTnAq1MbsVFQYIwZ5fL/GJ1R8ccaWiPuX2HrKALKG4Y3
CMFpDUWhRgtaBsmuOpUq3FeS5cyPNMHk6axL1KyFoJk9AgfhqhTp
-----END RSA PRIVATE KEY-----

View file

@ -0,0 +1,23 @@
-----BEGIN CERTIFICATE-----
MIID5jCCAs6gAwIBAgIUU3F6URi0oTe9ontkf7JqXOo89QYwDQYJKoZIhvcNAQEL
BQAwbzELMAkGA1UEBhMCRlIxFjAUBgNVBAgTDUlsZS1kZS1GcmFuY2UxDjAMBgNV
BAcTBVBhcmlzMREwDwYDVQQKEwhDcm93ZHNlYzERMA8GA1UECxMIQ3Jvd2RzZWMx
EjAQBgNVBAMTCWxvY2FsaG9zdDAeFw0yNDEwMjMxMDAzMDBaFw0yNTEwMjMxMDAz
MDBaMG8xCzAJBgNVBAYTAkZSMRYwFAYDVQQIEw1JbGUtZGUtRnJhbmNlMQ4wDAYD
VQQHEwVQYXJpczERMA8GA1UEChMIQ3Jvd2RzZWMxETAPBgNVBAsTCENyb3dkc2Vj
MRIwEAYDVQQDEwlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEK
AoIBAQC/lnUubjBGe5x0LgIE5GeG52LRzj99iLWuvey4qbSwFZ07ECgv+JttVwDm
AjEeakj2ZR46WHvHAR9eBNkRCORyWX0iKVIzm09PXYi80KtwGLaA8YMEio9/08Cc
+LS0TuP0yiOcw+btrhmvvauDzcQhA6u55q8anCZiF2BlHfX9Sh6QKewA3NhOkzbU
VTxqrOqfcRsGNub7dheqfP5bfrPkF6Y6l/0Fhyx0NMsu1zaQ0hCls2hkTf0Y3XGt
IQNWoN22seexR3qRmPf0j3jBa0qOmGgd6kAd+YpsjDblgCNUIJZiVj51fVb0sGRx
ShkfKGU6t0eznTWPCqswujO/sn+pAgMBAAGjejB4MA4GA1UdDwEB/wQEAwIFoDAd
BgNVHSUEFjAUBggrBgEFBQcDAQYIKwYBBQUHAwIwDAYDVR0TAQH/BAIwADAdBgNV
HQ4EFgQUOiIF+7Wzx1J8Ki3DiBfx+E6zlSUwGgYDVR0RBBMwEYIJbG9jYWxob3N0
hwR/AAABMA0GCSqGSIb3DQEBCwUAA4IBAQA0dzlhBr/0wXPyj/iWxMOXxZ1FNJ9f
lxBMhLAgX0WrT2ys+284J7Hcn0lJeqelluYpmeKn9vmCAEj3MmUmHzZyf//lhuUJ
0DlYWIHUsGaJHJ7A+1hQqrcXHhkcRy5WGIM9VoddKbBbg2b6qzTSvxn8EnuD7H4h
28wLyGLCzsSXoVcAB8u+svYt29TPuy6xmMAokyIShV8FsE77fjVTgtCuxmx1PKv3
zd6+uEae7bbZ+GJH1zKF0vokejQvmByt+YuIXlNbMseaMUeDdpy+6qlRvbbN1dyp
rkQXfWvidMfSue5nH/akAn83v/CdKxG6tfW83d9Rud3naabUkywALDng
-----END CERTIFICATE-----

View file

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpQIBAAKCAQEAv5Z1Lm4wRnucdC4CBORnhudi0c4/fYi1rr3suKm0sBWdOxAo
L/ibbVcA5gIxHmpI9mUeOlh7xwEfXgTZEQjkcll9IilSM5tPT12IvNCrcBi2gPGD
BIqPf9PAnPi0tE7j9MojnMPm7a4Zr72rg83EIQOrueavGpwmYhdgZR31/UoekCns
ANzYTpM21FU8aqzqn3EbBjbm+3YXqnz+W36z5BemOpf9BYcsdDTLLtc2kNIQpbNo
ZE39GN1xrSEDVqDdtrHnsUd6kZj39I94wWtKjphoHepAHfmKbIw25YAjVCCWYlY+
dX1W9LBkcUoZHyhlOrdHs501jwqrMLozv7J/qQIDAQABAoIBAF1Vd/rJlV0Q5RQ4
QaWOe9zdpmedeZK3YgMh5UvE6RCLRxC5+0n7bASlSPvEf5dYofjfJA26g3pcUqKj
6/d/hIMsk2hsBu67L7TzVSTe51XxxB8nCPPSaLwWNZSDGM1qTWU4gIbjbQHHOh5C
YWcRfAW1WxhyiEWHYq+QwdYg9XCRrSg1UzvVvW1Yt2wDGcSZP5whbXipfw3BITDs
XU7ODYNkU1sjIzQZwzVGxOf9qKdhZFZ26Vhoz8OJNMLyJxY7EspuwR7HbDGt11Pb
CxOt/BV44LwdVYeqh57oIKtckQW33W/6EeaWr7GfMzyH5WSrsOJoK5IJVrZaPTcS
QiMYLA0CgYEA9vMVsGshBl3TeRGaU3XLHqooXD4kszbdnjfPrwGlfCO/iybhDqo5
WFypM/bYcIWzbTez/ihufHEHPSCUbFEcN4B+oczGcuxTcZjFyvJYvq2ycxPUiDIi
JnVUcVxgh1Yn39+CsQ/b6meP7MumTD2P3I87CeQGlWTO5Ys9mdw0BjcCgYEAxpv1
64l5UoFJGr4yElNKDIKnhEFbJZsLGKiiuVXcS1QVHW5Az5ar9fPxuepyHpz416l3
ppncuhJiUIP+jbu5e0s0LsN46mLS3wkHLgYJj06CNT3uOSLSg1iFl7DusdbyiaA7
wEJ/aotS1NZ4XaeryAWHwYJ6Kag3nz6NV3ZYuR8CgYEAxAFCuMj+6F+2RsTa+d1n
v8oMyNImLPyiQD9KHzyuTW7OTDMqtIoVg/Xf8re9KOpl9I0e1t7eevT3auQeCi8C
t2bMm7290V+UB3jbnO5n08hn+ADIUuV/x4ie4m8QyrpuYbm0sLbGtTFHwgoNzzuZ
oNUqZfpP42mk8fpnhWSLAlcCgYEAgpY7XRI4HkJ5ocbav2faMV2a7X/XgWNvKViA
HeJRhYoUlBRRMuz7xi0OjFKVlIFbsNlxna5fDk1WLWCMd/6tl168Qd8u2tX9lr6l
5OH9WSeiv4Un5JN73PbQaAvi9jXBpTIg92oBwzk2TlFyNQoxDcRtHZQ/5LIBWIhV
gOOEtLsCgYEA1wbGc4XlH+/nXVsvx7gmfK8pZG8XA4/ToeIEURwPYrxtQZLB4iZs
aqWGgIwiB4F4UkuKZIjMrgInU9y0fG6EL96Qty7Yjh7dGy1vJTZl6C+QU6o4sEwl
r5Id5BNLEaqISWQ0LvzfwdfABYlvFfBdaGbzUzLEitD79eyhxuNEOBw=
-----END RSA PRIVATE KEY-----

View file

@ -136,12 +136,9 @@ func (j *JournalCtlSource) runJournalCtl(ctx context.Context, out chan types.Eve
if j.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": j.src}).Inc()
}
var evt types.Event
if !j.config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(j.config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
case stderrLine := <-stderrChan:
logger.Warnf("Got stderr message : %s", stderrLine)

View file

@ -173,13 +173,8 @@ func (k *KafkaSource) ReadMessage(ctx context.Context, out chan types.Event) err
if k.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"topic": k.Config.Topic}).Inc()
}
var evt types.Event
if !k.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
}
}

View file

@ -322,12 +322,8 @@ func (k *KinesisSource) ParseAndPushRecords(records []*kinesis.Record, out chan
} else {
l.Src = k.Config.StreamName
}
var evt types.Event
if !k.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(k.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
}
}

View file

@ -207,11 +207,8 @@ func (ka *KubernetesAuditSource) webhookHandler(w http.ResponseWriter, r *http.R
Process: true,
Module: ka.GetName(),
}
ka.outChan <- types.Event{
Line: l,
Process: true,
Type: types.LOG,
ExpectMode: types.LIVE,
}
evt := types.MakeEvent(ka.config.UseTimeMachine, types.LOG, true)
evt.Line = l
ka.outChan <- evt
}
}

View file

@ -307,16 +307,9 @@ func (l *LokiSource) readOneEntry(entry lokiclient.Entry, labels map[string]stri
if l.metricsLevel != configuration.METRICS_NONE {
linesRead.With(prometheus.Labels{"source": l.Config.URL}).Inc()
}
expectMode := types.LIVE
if l.Config.UseTimeMachine {
expectMode = types.TIMEMACHINE
}
out <- types.Event{
Line: ll,
Process: true,
Type: types.LOG,
ExpectMode: expectMode,
}
evt := types.MakeEvent(l.Config.UseTimeMachine, types.LOG, true)
evt.Line = ll
out <- evt
}
func (l *LokiSource) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {

View file

@ -443,12 +443,8 @@ func (s *S3Source) readFile(bucket string, key string) error {
} else if s.MetricsLevel == configuration.METRICS_AGGREGATE {
l.Src = bucket
}
var evt types.Event
if !s.Config.UseTimeMachine {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
evt = types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(s.Config.UseTimeMachine, types.LOG, true)
evt.Line = l
s.out <- evt
}
}

View file

@ -235,11 +235,9 @@ func (s *SyslogSource) handleSyslogMsg(out chan types.Event, t *tomb.Tomb, c cha
l.Time = ts
l.Src = syslogLine.Client
l.Process = true
if !s.config.UseTimeMachine {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
} else {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
}
evt := types.MakeEvent(s.config.UseTimeMachine, types.LOG, true)
evt.Line = l
out <- evt
}
}
}

View file

@ -206,9 +206,9 @@ func (w *WinEventLogSource) getEvents(out chan types.Event, t *tomb.Tomb) error
l.Src = w.name
l.Process = true
if !w.config.UseTimeMachine {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.LIVE, Unmarshaled: make(map[string]interface{})}
} else {
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE, Unmarshaled: make(map[string]interface{})}
}
}
}
@ -430,7 +430,9 @@ OUTER_LOOP:
l.Time = time.Now()
l.Src = w.name
l.Process = true
out <- types.Event{Line: l, Process: true, Type: types.LOG, ExpectMode: types.TIMEMACHINE}
csevt := types.MakeEvent(w.config.UseTimeMachine, types.LOG, true)
csevt.Line = l
out <- csevt
}
}
}

View file

@ -149,15 +149,12 @@ func TruncateContext(values []string, contextValueLen int) (string, error) {
return ret, nil
}
func EvalAlertContextRules(evt *types.Event, match *types.MatchedRule, request *http.Request, tmpContext map[string][]string) []error {
func EvalAlertContextRules(evt types.Event, match *types.MatchedRule, request *http.Request, tmpContext map[string][]string) []error {
var errors []error
//if we're evaluating context for appsec event, match and request will be present.
//otherwise, only evt will be.
if evt == nil {
evt = types.NewEvent()
}
if match == nil {
match = types.NewMatchedRule()
}
@ -220,8 +217,9 @@ func AppsecEventToContext(event types.AppsecEvent, request *http.Request) (model
tmpContext := make(map[string][]string)
evt := types.MakeEvent(false, types.LOG, false)
for _, matched_rule := range event.MatchedRules {
tmpErrors := EvalAlertContextRules(nil, &matched_rule, request, tmpContext)
tmpErrors := EvalAlertContextRules(evt, &matched_rule, request, tmpContext)
errors = append(errors, tmpErrors...)
}
@ -240,7 +238,7 @@ func EventToContext(events []types.Event) (models.Meta, []error) {
tmpContext := make(map[string][]string)
for _, evt := range events {
tmpErrors := EvalAlertContextRules(&evt, nil, nil, tmpContext)
tmpErrors := EvalAlertContextRules(evt, nil, nil, tmpContext)
errors = append(errors, tmpErrors...)
}

View file

@ -7,20 +7,21 @@ package component
// Built is a map of all the known components, and whether they are built-in or not.
// This is populated as soon as possible by the respective init() functions
var Built = map[string]bool {
"datasource_appsec": false,
"datasource_cloudwatch": false,
"datasource_docker": false,
"datasource_file": false,
"datasource_journalctl": false,
"datasource_k8s-audit": false,
"datasource_kafka": false,
"datasource_kinesis": false,
"datasource_loki": false,
"datasource_s3": false,
"datasource_syslog": false,
"datasource_wineventlog":false,
"cscli_setup": false,
var Built = map[string]bool{
"datasource_appsec": false,
"datasource_cloudwatch": false,
"datasource_docker": false,
"datasource_file": false,
"datasource_journalctl": false,
"datasource_k8s-audit": false,
"datasource_kafka": false,
"datasource_kinesis": false,
"datasource_loki": false,
"datasource_s3": false,
"datasource_syslog": false,
"datasource_wineventlog": false,
"datasource_http": false,
"cscli_setup": false,
}
func Register(name string) {

View file

@ -47,13 +47,20 @@ type Event struct {
Meta map[string]string `yaml:"Meta,omitempty" json:"Meta,omitempty"`
}
func NewEvent() *Event {
return &Event{Type: LOG,
func MakeEvent(timeMachine bool, evtType int, process bool) Event {
evt := Event{
Parsed: make(map[string]string),
Enriched: make(map[string]string),
Meta: make(map[string]string),
Unmarshaled: make(map[string]interface{}),
Enriched: make(map[string]string),
ExpectMode: LIVE,
Process: process,
Type: evtType,
}
if timeMachine {
evt.ExpectMode = TIMEMACHINE
}
return evt
}
func (e *Event) SetMeta(key string, value string) bool {