feat: introduce more options for traffic logger (#4571)

1. Provide clear usage instructions
2. Add "pace" option, which when false, sends traffic as quickly as possible (default true).
3. Add skip option that sometimes can be useful to remove unneeded noise

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-02-07 11:10:13 +02:00 committed by GitHub
parent 5337711976
commit b0b9a72dbd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 39 additions and 6 deletions

View file

@ -3,6 +3,7 @@ package main
import (
"flag"
"fmt"
"os"
"sort"
"strings"
"sync"
@ -14,6 +15,8 @@ import (
var fHost = flag.String("host", "127.0.0.1:6379", "Redis host")
var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client")
var fPace = flag.Bool("pace", true, "whether to pace the traffic according to the original timings.false - to pace as fast as possible")
var fSkip = flag.Uint("skip", 0, "skip N records")
func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) {
tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}}
@ -163,7 +166,28 @@ func Analyze(files []string) {
}
func main() {
flag.Usage = func() {
binaryName := os.Args[0]
fmt.Fprintf(os.Stderr, "Usage: %s [options] <command> <files...>\n", binaryName)
fmt.Fprintln(os.Stderr, "\nOptions:")
flag.PrintDefaults()
fmt.Fprintln(os.Stderr, "\nCommands:")
fmt.Fprintln(os.Stderr, " run - replays the traffic")
fmt.Fprintln(os.Stderr, " print - prints the command")
fmt.Fprintln(os.Stderr, " analyze - analyzes the traffic")
fmt.Fprintln(os.Stderr, "\nExamples:")
fmt.Fprintf(os.Stderr, " %s -host 192.168.1.10:6379 -buffer 50 run *.bin\n", binaryName)
fmt.Fprintf(os.Stderr, " %s print *.bin\n", binaryName)
}
flag.Parse()
if flag.NArg() < 2 {
flag.Usage()
os.Exit(1)
}
cmd := flag.Arg(0)
files := flag.Args()[1:]

View file

@ -5,6 +5,7 @@ import (
"fmt"
"log"
"math"
"strings"
"sync"
"sync/atomic"
"time"
@ -57,7 +58,7 @@ type FileWorker struct {
clients uint64
}
func (c *ClientWorker) Run(worker *FileWorker) {
func (c *ClientWorker) Run(pace bool, worker *FileWorker) {
for msg := range c.incoming {
if c.processed == 0 && msg.DbIndex != 0 {
// There is no easy way to switch, we rely on connection pool consisting only of one connection
@ -68,7 +69,10 @@ func (c *ClientWorker) Run(worker *FileWorker) {
if lag < 0 {
atomic.AddUint64(&worker.delayed, 1)
}
time.Sleep(lag)
if pace {
time.Sleep(lag)
}
c.pipe.Do(context.Background(), msg.values...).Result()
atomic.AddUint64(&worker.processed, 1)
@ -88,7 +92,7 @@ func (c *ClientWorker) Run(worker *FileWorker) {
worker.clientGroup.Done()
}
func NewClient(w *FileWorker) *ClientWorker {
func NewClient(w *FileWorker, pace bool) *ClientWorker {
client := &ClientWorker{
redis: redis.NewClient(&redis.Options{Addr: *fHost, PoolSize: 1, DisableIndentity: true}),
incoming: make(chan Record, *fClientBuffer),
@ -97,20 +101,25 @@ func NewClient(w *FileWorker) *ClientWorker {
atomic.AddUint64(&w.clients, 1)
w.clientGroup.Add(1)
go client.Run(w)
go client.Run(pace, w)
return client
}
func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
clients := make(map[uint32]*ClientWorker, 0)
recordId := uint64(0)
err := parseRecords(file, func(r Record) bool {
client, ok := clients[r.Client]
if !ok {
client = NewClient(w)
client = NewClient(w, *fPace)
clients[r.Client] = client
}
cmdName := strings.ToLower(r.values[0].(string))
recordId += 1
if cmdName != "eval" && recordId < uint64(*fSkip) {
return true
}
atomic.AddUint64(&w.parsed, 1)
client.incoming <- r
return true
})