feat: add 'testing_time' limit option to dfly_bench (#4487)

* feat: add 'testing_time' limit option to dfly_bench
---------

Signed-off-by: Roman Gershman <romange@gmail.com>
Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
Roman Gershman 2025-01-22 10:26:22 +02:00 committed by GitHub
parent 20bc3188fe
commit 4b8fa90a67
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -30,6 +30,7 @@ ABSL_FLAG(uint16_t, p, 6379, "Server port");
ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread"); ABSL_FLAG(uint32_t, c, 20, "Number of connections per thread");
ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server"); ABSL_FLAG(uint32_t, qps, 20, "QPS schedule at which the generator sends requests to the server");
ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection"); ABSL_FLAG(uint32_t, n, 1000, "Number of requests to send per connection");
ABSL_FLAG(uint32_t, test_time, 0, "Testing time in seconds");
ABSL_FLAG(uint32_t, d, 16, "Value size in bytes "); ABSL_FLAG(uint32_t, d, 16, "Value size in bytes ");
ABSL_FLAG(string, h, "localhost", "server hostname/ip"); ABSL_FLAG(string, h, "localhost", "server hostname/ip");
ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used"); ABSL_FLAG(uint64_t, key_minimum, 0, "Min value for keys used");
@ -242,9 +243,11 @@ struct ClientStats {
// Per connection driver. // Per connection driver.
class Driver { class Driver {
public: public:
explicit Driver(uint32_t num_reqs, ClientStats* stats, ProactorBase* p) explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), stats_(*stats) { : num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) {
socket_.reset(p->CreateSocket()); socket_.reset(p->CreateSocket());
if (time_limit_ > 0)
num_reqs_ = UINT32_MAX;
} }
Driver(const Driver&) = delete; Driver(const Driver&) = delete;
@ -255,6 +258,8 @@ class Driver {
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
float done() const { float done() const {
if (time_limit_ > 0)
return double(absl::GetCurrentTimeNanos() - start_ns_) / (time_limit_ * 1e9);
return double(received_) / num_reqs_; return double(received_) / num_reqs_;
} }
@ -273,7 +278,8 @@ class Driver {
bool might_hit; bool might_hit;
}; };
uint32_t num_reqs_, received_ = 0; uint32_t num_reqs_, time_limit_, received_ = 0;
int64_t start_ns_ = 0;
ClientStats& stats_; ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_; unique_ptr<FiberSocketBase> socket_;
@ -291,7 +297,7 @@ class TLocalClient {
explicit TLocalClient(ProactorBase* p) : p_(p) { explicit TLocalClient(ProactorBase* p) : p_(p) {
drivers_.resize(GetFlag(FLAGS_c)); drivers_.resize(GetFlag(FLAGS_c));
for (auto& driver : drivers_) { for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), &stats, p_}); driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
} }
} }
@ -415,16 +421,20 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
} }
void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
const int64_t start = absl::GetCurrentTimeNanos(); start_ns_ = absl::GetCurrentTimeNanos();
unsigned pipeline = GetFlag(FLAGS_pipeline); unsigned pipeline = GetFlag(FLAGS_pipeline);
stats_.num_clients++; stats_.num_clients++;
int64_t time_limit_ns =
time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
for (unsigned i = 0; i < num_reqs_; ++i) { for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos(); int64_t now = absl::GetCurrentTimeNanos();
if (now > time_limit_ns) {
break;
}
if (cycle_ns) { if (cycle_ns) {
int64_t target_ts = start + i * (*cycle_ns); int64_t target_ts = start_ns_ + i * (*cycle_ns);
int64_t sleep_ns = target_ts - now; int64_t sleep_ns = target_ts - now;
if (reqs_.size() > 10 && sleep_ns <= 0) { if (reqs_.size() > 10 && sleep_ns <= 0) {
sleep_ns = 10'000; sleep_ns = 10'000;
@ -468,7 +478,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
int64_t finish = absl::GetCurrentTimeNanos(); int64_t finish = absl::GetCurrentTimeNanos();
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
<< StrFormat("%.1fs", double(finish - start) / 1000000000) << StrFormat("%.1fs", double(finish - start_ns_) / 1000'000'000)
<< ". Waiting for server processing"; << ". Waiting for server processing";
// TODO: to change to a condvar or something. // TODO: to change to a condvar or something.
@ -662,6 +672,7 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t num_last_resp_cnt = 0; uint64_t num_last_resp_cnt = 0;
uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n); uint64_t resp_goal = GetFlag(FLAGS_c) * pp->size() * GetFlag(FLAGS_n);
uint32_t time_limit = GetFlag(FLAGS_test_time);
while (*finish_signal == false) { while (*finish_signal == false) {
// we sleep with resolution of 1s but print with lower frequency to be more responsive // we sleep with resolution of 1s but print with lower frequency to be more responsive
@ -692,7 +703,8 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
uint64_t total_ms = (now - start_time) / 1'000'000; uint64_t total_ms = (now - start_time) / 1'000'000;
uint64_t period_ms = (now - last_print) / 1'000'000; uint64_t period_ms = (now - last_print) / 1'000'000;
uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt; uint64_t period_resp_cnt = stats.num_responses - num_last_resp_cnt;
double done_perc = double(stats.num_responses) * 100 / resp_goal; double done_perc = time_limit > 0 ? double(total_ms) / (10 * time_limit)
: double(stats.num_responses) * 100 / resp_goal;
double hitrate = stats.hit_opportunities > 0 double hitrate = stats.hit_opportunities > 0
? 100 * double(stats.hit_count) / double(stats.hit_opportunities) ? 100 * double(stats.hit_count) / double(stats.hit_opportunities)
: 0; : 0;
@ -767,10 +779,11 @@ int main(int argc, char* argv[]) {
uint32_t thread_key_step = 0; uint32_t thread_key_step = 0;
const uint32_t qps = GetFlag(FLAGS_qps); const uint32_t qps = GetFlag(FLAGS_qps);
const int64_t interval = qps ? 1000000000LL / qps : 0; const int64_t interval = qps ? 1'000'000'000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n); uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size(); uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num; uint64_t total_requests = num_reqs * total_conn_num;
uint32_t time_limit = GetFlag(FLAGS_test_time);
if (dist_type == SEQUENTIAL) { if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size()); thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
@ -781,9 +794,10 @@ int main(int argc, char* argv[]) {
} }
} }
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs if (!time_limit) {
<< " requests per each connection, or " << total_requests << " requests overall"; CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall";
}
if (interval) { if (interval) {
CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps) CONSOLE_INFO << "At a rate of " << GetFlag(FLAGS_qps)
<< " rps per connection, i.e. request every " << interval / 1000 << "us"; << " rps per connection, i.e. request every " << interval / 1000 << "us";
@ -826,7 +840,8 @@ int main(int argc, char* argv[]) {
CONSOLE_INFO << "\nTotal time: " << duration CONSOLE_INFO << "\nTotal time: " << duration
<< ". Overall number of requests: " << summary.num_responses << ". Overall number of requests: " << summary.num_responses
<< ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan"); << ", QPS: " << (dur_sec ? StrCat(summary.num_responses / dur_sec) : "nan")
<< ", P99 lat: " << summary.hist.Percentile(99) << "us";
if (summary.num_errors) { if (summary.num_errors) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!"; CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";