mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore(traffic loger): use pipelining and print/analyze commands (#3527)
Add run, print, analyze commands to traffic logger; add support for pipelines
This commit is contained in:
parent
f90ae4fcff
commit
84a697dd75
2 changed files with 248 additions and 112 deletions
|
@ -1,126 +1,20 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pterm/pterm"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var fHost = flag.String("host", "127.0.0.1:6379", "Redis host")
|
||||
var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client")
|
||||
|
||||
type RecordHeader struct {
|
||||
Client uint32
|
||||
Time uint64
|
||||
DbIndex uint32
|
||||
HasMore uint32
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
RecordHeader
|
||||
values []interface{} // instead of []string to unwrap into variadic
|
||||
}
|
||||
|
||||
// Determine earliest time
|
||||
func DetermineBaseTime(files []string) time.Time {
|
||||
var minTime uint64 = math.MaxUint64
|
||||
for _, file := range files {
|
||||
parseRecords(file, func(r Record) bool {
|
||||
if r.Time < minTime {
|
||||
minTime = r.Time
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
return time.Unix(0, int64(minTime))
|
||||
}
|
||||
|
||||
// Handles a single connection/client
|
||||
type ClientWorker struct {
|
||||
redis *redis.Client
|
||||
incoming chan Record
|
||||
processed uint
|
||||
}
|
||||
|
||||
// Handles a single file and distributes messages to clients
|
||||
type FileWorker struct {
|
||||
clientGroup sync.WaitGroup
|
||||
timeOffset time.Duration
|
||||
// stats for output, updated by clients, read by rendering goroutine
|
||||
processed uint64
|
||||
delayed uint64
|
||||
parsed uint64
|
||||
clients uint64
|
||||
}
|
||||
|
||||
func (c ClientWorker) Run(worker *FileWorker) {
|
||||
for msg := range c.incoming {
|
||||
if c.processed == 0 && msg.DbIndex != 0 {
|
||||
// There is no easy way to switch, we rely on connection pool consisting only of one connection
|
||||
c.redis.Do(context.Background(), []interface{}{"SELECT", fmt.Sprint(msg.DbIndex)})
|
||||
}
|
||||
|
||||
lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time))))
|
||||
if lag < 0 {
|
||||
atomic.AddUint64(&worker.delayed, 1)
|
||||
}
|
||||
time.Sleep(lag)
|
||||
|
||||
c.redis.Do(context.Background(), msg.values...).Result()
|
||||
atomic.AddUint64(&worker.processed, 1)
|
||||
c.processed += 1
|
||||
}
|
||||
worker.clientGroup.Done()
|
||||
}
|
||||
|
||||
func NewClient(w *FileWorker) *ClientWorker {
|
||||
client := &ClientWorker{
|
||||
redis: redis.NewClient(&redis.Options{Addr: *fHost, PoolSize: 1, DisableIndentity: true}),
|
||||
incoming: make(chan Record, *fClientBuffer),
|
||||
}
|
||||
atomic.AddUint64(&w.clients, 1)
|
||||
w.clientGroup.Add(1)
|
||||
go client.Run(w)
|
||||
return client
|
||||
}
|
||||
|
||||
func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
|
||||
clients := make(map[uint32]*ClientWorker, 0)
|
||||
err := parseRecords(file, func(r Record) bool {
|
||||
client, ok := clients[r.Client]
|
||||
if !ok {
|
||||
client = NewClient(w)
|
||||
clients[r.Client] = client
|
||||
}
|
||||
atomic.AddUint64(&w.parsed, 1)
|
||||
|
||||
client.incoming <- r
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Could not parse records for file %s: %v", file, err)
|
||||
}
|
||||
|
||||
for _, client := range clients {
|
||||
close(client.incoming)
|
||||
}
|
||||
w.clientGroup.Wait()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func (w *FileWorker) HappensAt(recordTime time.Time) time.Time {
|
||||
return recordTime.Add(w.timeOffset)
|
||||
}
|
||||
|
||||
func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) {
|
||||
tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}}
|
||||
for i := range workers {
|
||||
|
@ -136,10 +30,7 @@ func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker)
|
|||
area.Update(content)
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
files := flag.Args()
|
||||
|
||||
func Run(files []string) {
|
||||
timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files))
|
||||
fmt.Println("Offset -> ", timeOffset)
|
||||
|
||||
|
@ -171,3 +62,117 @@ func main() {
|
|||
|
||||
RenderTable(area, files, workers) // to show last stats
|
||||
}
|
||||
|
||||
func Print(files []string) {
|
||||
type StreamTop struct {
|
||||
record Record
|
||||
ch chan Record
|
||||
}
|
||||
|
||||
// Start file reader goroutines
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(files))
|
||||
|
||||
tops := make([]StreamTop, len(files))
|
||||
for i, file := range files {
|
||||
tops[i].ch = make(chan Record, 100)
|
||||
go func(ch chan Record, file string) {
|
||||
parseRecords(file, func(r Record) bool {
|
||||
ch <- r
|
||||
return true
|
||||
})
|
||||
close(ch)
|
||||
wg.Done()
|
||||
}(tops[i].ch, file)
|
||||
}
|
||||
|
||||
// Pick record with minimum time from each channel
|
||||
for {
|
||||
minTime := ^uint64(0)
|
||||
minIndex := -1
|
||||
for i := range tops {
|
||||
if tops[i].record.Time == 0 {
|
||||
if r, ok := <-tops[i].ch; ok {
|
||||
tops[i].record = r
|
||||
}
|
||||
}
|
||||
|
||||
if rt := tops[i].record.Time; rt > 0 && rt < minTime {
|
||||
minTime = rt
|
||||
minIndex = i
|
||||
}
|
||||
}
|
||||
|
||||
if minIndex == -1 {
|
||||
break
|
||||
}
|
||||
|
||||
fmt.Println(tops[minIndex].record.values...)
|
||||
tops[minIndex].record = Record{}
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func Analyze(files []string) {
|
||||
total := 0
|
||||
chained := 0
|
||||
clients := 0
|
||||
cmdCounts := make(map[string]uint)
|
||||
|
||||
// count stats
|
||||
for _, file := range files {
|
||||
fileClients := make(map[uint32]bool)
|
||||
|
||||
parseRecords(file, func(r Record) bool {
|
||||
total += 1
|
||||
if r.HasMore > 0 {
|
||||
chained += 1
|
||||
}
|
||||
|
||||
fileClients[r.Client] = true
|
||||
cmdCounts[r.values[0].(string)] += 1
|
||||
|
||||
return true
|
||||
})
|
||||
|
||||
clients += len(fileClients)
|
||||
}
|
||||
|
||||
// sort commands by frequencies
|
||||
type Freq struct {
|
||||
cmd string
|
||||
count uint
|
||||
}
|
||||
var sortedCmds []Freq
|
||||
for cmd, count := range cmdCounts {
|
||||
sortedCmds = append(sortedCmds, Freq{cmd, count})
|
||||
}
|
||||
sort.Slice(sortedCmds, func(i, j int) bool {
|
||||
return sortedCmds[i].count > sortedCmds[j].count
|
||||
})
|
||||
|
||||
// Print all the info
|
||||
fmt.Println("Total commands", total)
|
||||
fmt.Println("Has more%", 100*float32(chained)/float32(total))
|
||||
fmt.Println("Total clients", clients)
|
||||
|
||||
for _, freq := range sortedCmds {
|
||||
fmt.Printf("%8d | %v \n", freq.count, freq.cmd)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
cmd := flag.Arg(0)
|
||||
files := flag.Args()[1:]
|
||||
|
||||
switch strings.ToLower(cmd) {
|
||||
case "run":
|
||||
Run(files)
|
||||
case "print":
|
||||
Print(files)
|
||||
case "analyze":
|
||||
Analyze(files)
|
||||
}
|
||||
}
|
||||
|
|
131
tools/replay/workers.go
Normal file
131
tools/replay/workers.go
Normal file
|
@ -0,0 +1,131 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type RecordHeader struct {
|
||||
Client uint32
|
||||
Time uint64
|
||||
DbIndex uint32
|
||||
HasMore uint32
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
RecordHeader
|
||||
values []interface{} // instead of []string to unwrap into variadic
|
||||
}
|
||||
|
||||
// Determine earliest time
|
||||
func DetermineBaseTime(files []string) time.Time {
|
||||
var minTime uint64 = math.MaxUint64
|
||||
for _, file := range files {
|
||||
parseRecords(file, func(r Record) bool {
|
||||
if r.Time < minTime {
|
||||
minTime = r.Time
|
||||
}
|
||||
return false
|
||||
})
|
||||
}
|
||||
return time.Unix(0, int64(minTime))
|
||||
}
|
||||
|
||||
// Handles a single connection/client
|
||||
type ClientWorker struct {
|
||||
redis *redis.Client
|
||||
incoming chan Record
|
||||
processed uint
|
||||
pipe redis.Pipeliner
|
||||
}
|
||||
|
||||
// Handles a single file and distributes messages to clients
|
||||
type FileWorker struct {
|
||||
clientGroup sync.WaitGroup
|
||||
timeOffset time.Duration
|
||||
// stats for output, updated by clients, read by rendering goroutine
|
||||
processed uint64
|
||||
delayed uint64
|
||||
parsed uint64
|
||||
clients uint64
|
||||
}
|
||||
|
||||
func (c *ClientWorker) Run(worker *FileWorker) {
|
||||
for msg := range c.incoming {
|
||||
if c.processed == 0 && msg.DbIndex != 0 {
|
||||
// There is no easy way to switch, we rely on connection pool consisting only of one connection
|
||||
c.redis.Do(context.Background(), []interface{}{"SELECT", fmt.Sprint(msg.DbIndex)})
|
||||
}
|
||||
|
||||
lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time))))
|
||||
if lag < 0 {
|
||||
atomic.AddUint64(&worker.delayed, 1)
|
||||
}
|
||||
time.Sleep(lag)
|
||||
|
||||
c.pipe.Do(context.Background(), msg.values...).Result()
|
||||
atomic.AddUint64(&worker.processed, 1)
|
||||
|
||||
if msg.HasMore == 0 {
|
||||
size := c.pipe.Len()
|
||||
c.pipe.Exec(context.Background())
|
||||
c.processed += uint(size)
|
||||
}
|
||||
}
|
||||
|
||||
if size := c.pipe.Len(); size >= 0 {
|
||||
c.pipe.Exec(context.Background())
|
||||
c.processed += uint(size)
|
||||
}
|
||||
|
||||
worker.clientGroup.Done()
|
||||
}
|
||||
|
||||
func NewClient(w *FileWorker) *ClientWorker {
|
||||
client := &ClientWorker{
|
||||
redis: redis.NewClient(&redis.Options{Addr: *fHost, PoolSize: 1, DisableIndentity: true}),
|
||||
incoming: make(chan Record, *fClientBuffer),
|
||||
}
|
||||
client.pipe = client.redis.Pipeline()
|
||||
|
||||
atomic.AddUint64(&w.clients, 1)
|
||||
w.clientGroup.Add(1)
|
||||
go client.Run(w)
|
||||
return client
|
||||
}
|
||||
|
||||
func (w *FileWorker) Run(file string, wg *sync.WaitGroup) {
|
||||
clients := make(map[uint32]*ClientWorker, 0)
|
||||
err := parseRecords(file, func(r Record) bool {
|
||||
client, ok := clients[r.Client]
|
||||
if !ok {
|
||||
client = NewClient(w)
|
||||
clients[r.Client] = client
|
||||
}
|
||||
atomic.AddUint64(&w.parsed, 1)
|
||||
|
||||
client.incoming <- r
|
||||
return true
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Fatalf("Could not parse records for file %s: %v", file, err)
|
||||
}
|
||||
|
||||
for _, client := range clients {
|
||||
close(client.incoming)
|
||||
}
|
||||
w.clientGroup.Wait()
|
||||
wg.Done()
|
||||
}
|
||||
|
||||
func (w *FileWorker) HappensAt(recordTime time.Time) time.Time {
|
||||
return recordTime.Add(w.timeOffset)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue