From da6df521c734492331bee48834ece906df47c51a Mon Sep 17 00:00:00 2001 From: Tarun Pothulapati Date: Thu, 24 Apr 2025 15:09:44 +0530 Subject: [PATCH] feat(replay): add latency distributions --- tools/replay/go.mod | 1 + tools/replay/go.sum | 7 +++++++ tools/replay/main.go | 12 +++++++++++- tools/replay/workers.go | 11 +++++++++++ 4 files changed, 30 insertions(+), 1 deletion(-) diff --git a/tools/replay/go.mod b/tools/replay/go.mod index af46e0d70..fd415285c 100644 --- a/tools/replay/go.mod +++ b/tools/replay/go.mod @@ -13,6 +13,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/gookit/color v1.4.2 // indirect + github.com/influxdata/tdigest v0.0.1 // indirect github.com/mattn/go-runewidth v0.0.13 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rivo/uniseg v0.2.0 // indirect diff --git a/tools/replay/go.sum b/tools/replay/go.sum index ef19fb95b..11fa8447d 100644 --- a/tools/replay/go.sum +++ b/tools/replay/go.sum @@ -9,8 +9,11 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gookit/color v1.4.2 h1:tXy44JFSFkKnELV6WaMo/lLfu/meqITX3iAV52do7lk= github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= +github.com/influxdata/tdigest v0.0.1 h1:XpFptwYmnEKUqmkcDjrzffswZ3nvNeevbUSLPP/ZzIY= +github.com/influxdata/tdigest v0.0.1/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/mattn/go-runewidth v0.0.13 h1:lTGmDsbAYt5DmK6OnoV7EuIF1wEIFAcxld6ypU4OSgU= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -27,11 +30,15 @@ github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5Cc github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778 h1:QldyIu/L63oPpyvQmHgvgickp1Yw510KJOqX7H24mg8= github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= +golang.org/x/exp v0.0.0-20180321215751-8460e604b9de/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44 h1:Bli41pIlzTzf3KEY06n+xnzK/BESIg2ze4Pgfh/aI8c= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d h1:SZxvLBoTP5yHO3Frd4z4vrF+DBX9vMVanchswa69toE= golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/replay/main.go b/tools/replay/main.go index deaf4a3e6..ef7442e7d 100644 --- a/tools/replay/main.go +++ b/tools/replay/main.go @@ -19,14 +19,24 @@ var fPace = flag.Bool("pace", true, "whether to pace the traffic according to th var fSkip = flag.Uint("skip", 0, "skip N records") func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) { - tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}} + tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients", "p50(us)", "p75(us)", "p90(us)", "p99(us)"}} for i := range workers { + workers[i].latencyMu.Lock() + p50 := workers[i].latencyDigest.Quantile(0.5) + p75 := workers[i].latencyDigest.Quantile(0.75) + p90 := workers[i].latencyDigest.Quantile(0.9) + p99 := workers[i].latencyDigest.Quantile(0.99) + workers[i].latencyMu.Unlock() tableData = append(tableData, []string{ files[i], fmt.Sprint(atomic.LoadUint64(&workers[i].parsed)), fmt.Sprint(atomic.LoadUint64(&workers[i].processed)), fmt.Sprint(atomic.LoadUint64(&workers[i].delayed)), fmt.Sprint(atomic.LoadUint64(&workers[i].clients)), + fmt.Sprintf("%.0f", p50), + fmt.Sprintf("%.0f", p75), + fmt.Sprintf("%.0f", p90), + fmt.Sprintf("%.0f", p99), }) } content, _ := pterm.DefaultTable.WithHasHeader().WithBoxed().WithData(tableData).Srender() diff --git a/tools/replay/workers.go b/tools/replay/workers.go index cca206cb3..7f46a7edf 100644 --- a/tools/replay/workers.go +++ b/tools/replay/workers.go @@ -10,6 +10,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/tdigest" "github.com/redis/go-redis/v9" ) @@ -56,6 +57,9 @@ type FileWorker struct { delayed uint64 parsed uint64 clients uint64 + + latencyDigest *tdigest.TDigest + latencyMu sync.Mutex } func (c *ClientWorker) Run(pace bool, worker *FileWorker) { @@ -74,7 +78,13 @@ func (c *ClientWorker) Run(pace bool, worker *FileWorker) { time.Sleep(lag) } + start := time.Now() c.pipe.Do(context.Background(), msg.values...).Result() + latency := float64(time.Since(start).Microseconds()) + worker.latencyMu.Lock() + worker.latencyDigest.Add(latency, 1) + worker.latencyMu.Unlock() + atomic.AddUint64(&worker.processed, 1) if msg.HasMore == 0 { @@ -106,6 +116,7 @@ func NewClient(w *FileWorker, pace bool) *ClientWorker { } func (w *FileWorker) Run(file string, wg *sync.WaitGroup) { + w.latencyDigest = tdigest.NewWithCompression(1000) clients := make(map[uint32]*ClientWorker, 0) recordId := uint64(0) err := parseRecords(file, func(r Record) bool {