tests: log.Fatal -> return err (#3056)

* tests: log.Fatal -> return err

* lint
This commit is contained in:
mmetc 2024-06-05 11:04:54 +02:00 committed by GitHub
parent 2865b69855
commit 3dd17b9081
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 80 additions and 60 deletions

View file

@ -71,9 +71,8 @@ group_id: crowdsec`,
},
}
subLogger := log.WithFields(log.Fields{
"type": "kafka",
})
subLogger := log.WithField("type", "kafka")
for _, test := range tests {
k := KafkaSource{}
err := k.Configure([]byte(test.config), subLogger, configuration.METRICS_NONE)
@ -82,7 +81,6 @@ group_id: crowdsec`,
}
func writeToKafka(w *kafka.Writer, logs []string) {
for idx, log := range logs {
err := w.WriteMessages(context.Background(), kafka.Message{
Key: []byte(strconv.Itoa(idx)),
@ -106,7 +104,9 @@ func createTopic(topic string, broker string) {
if err != nil {
panic(err)
}
var controllerConn *kafka.Conn
controllerConn, err = kafka.Dial("tcp", net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))
if err != nil {
panic(err)
@ -131,6 +131,7 @@ func TestStreamingAcquisition(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
tests := []struct {
name string
logs []string
@ -159,13 +160,14 @@ func TestStreamingAcquisition(t *testing.T) {
Topic: "crowdsecplaintext",
})
if w == nil {
log.Fatalf("Unable to setup a kafka producer")
t.Fatal("Unable to setup a kafka producer")
}
for _, ts := range tests {
ts := ts
t.Run(ts.name, func(t *testing.T) {
k := KafkaSource{}
err := k.Configure([]byte(`
source: kafka
brokers:
@ -174,12 +176,14 @@ topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE)
if err != nil {
t.Fatalf("could not configure kafka source : %s", err)
}
tomb := tomb.Tomb{}
out := make(chan types.Event)
err = k.StreamingAcquisition(out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)
actualLines := 0
go writeToKafka(w, ts.logs)
READLOOP:
for {
@ -195,13 +199,13 @@ topic: crowdsecplaintext`), subLogger, configuration.METRICS_NONE)
tomb.Wait()
})
}
}
func TestStreamingAcquisitionWithSSL(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("Skipping test on windows")
}
tests := []struct {
name string
logs []string
@ -229,13 +233,14 @@ func TestStreamingAcquisitionWithSSL(t *testing.T) {
Topic: "crowdsecssl",
})
if w2 == nil {
log.Fatalf("Unable to setup a kafka producer")
t.Fatal("Unable to setup a kafka producer")
}
for _, ts := range tests {
ts := ts
t.Run(ts.name, func(t *testing.T) {
k := KafkaSource{}
err := k.Configure([]byte(`
source: kafka
brokers:
@ -250,12 +255,14 @@ tls:
if err != nil {
t.Fatalf("could not configure kafka source : %s", err)
}
tomb := tomb.Tomb{}
out := make(chan types.Event)
err = k.StreamingAcquisition(out, &tomb)
cstest.AssertErrorContains(t, err, ts.expectedErr)
actualLines := 0
go writeToKafka(w2, ts.logs)
READLOOP:
for {
@ -271,5 +278,4 @@ tls:
tomb.Wait()
})
}
}