wrap exec instead

This commit is contained in:
Tarun Pothulapati 2025-04-24 16:03:00 +05:30
parent da6df521c7
commit c335e56e03
No known key found for this signature in database
GPG key ID: 82B504CB6F6C9902

View file

@ -78,24 +78,28 @@ func (c *ClientWorker) Run(pace bool, worker *FileWorker) {
time.Sleep(lag)
}
start := time.Now()
c.pipe.Do(context.Background(), msg.values...).Result()
latency := float64(time.Since(start).Microseconds())
worker.latencyMu.Lock()
worker.latencyDigest.Add(latency, 1)
worker.latencyMu.Unlock()
atomic.AddUint64(&worker.processed, 1)
if msg.HasMore == 0 {
size := c.pipe.Len()
start := time.Now()
c.pipe.Exec(context.Background())
batchLatency := float64(time.Since(start).Microseconds())
worker.latencyMu.Lock()
worker.latencyDigest.Add(batchLatency, 1)
worker.latencyMu.Unlock()
c.processed += uint(size)
}
}
if size := c.pipe.Len(); size >= 0 {
start := time.Now()
c.pipe.Exec(context.Background())
batchLatency := float64(time.Since(start).Microseconds())
worker.latencyMu.Lock()
worker.latencyDigest.Add(batchLatency, 1)
worker.latencyMu.Unlock()
c.processed += uint(size)
}