feat(dfly_bench): introduce connect only flow (#5052)

In this flow dfly_bench only connects to the server, reproducing high connection rate scenario.
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-05-06 08:36:33 +03:00 committed by GitHub
parent 6a84ad0208
commit d1e5049f64
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -72,6 +72,9 @@ ABSL_FLAG(bool, cluster_skip_tags, true,
"If true, skips tags (compatible with memtier benchmark) in cluster mode, "
"othewise adds hash tags to keys");
ABSL_FLAG(bool, ascii, true, "If true, use ascii characters for values");
ABSL_FLAG(bool, connect_only, false,
"If true, will only connect to the server, without sending "
"loadtest commands");
using namespace std;
using namespace util;
@ -231,7 +234,7 @@ class KeyGenerator {
class CommandGenerator {
public:
CommandGenerator(KeyGenerator* keygen);
explicit CommandGenerator(KeyGenerator* keygen);
string Next(SlotRange range);
@ -403,6 +406,7 @@ class Driver {
void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
void Shutdown();
float done() const {
if (time_limit_ > 0)
@ -448,7 +452,9 @@ class TLocalClient {
TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep, const vector<tcp::endpoint>& shard_endpoints);
void Connect(const tcp::endpoint& ep, const vector<tcp::endpoint>& shard_endpoints);
void Disconnect();
void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
void Join();
@ -691,7 +697,10 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
while (!reqs_.empty()) {
ThisFiber::SleepFor(1ms);
}
Shutdown();
}
void Driver::Shutdown() {
std::ignore = socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber.
receive_fb_.Join();
std::ignore = socket_->Close();
@ -832,8 +841,8 @@ void Driver::ParseMC() {
}
}
void TLocalClient::Connect(tcp::endpoint ep, const vector<tcp::endpoint>& endpoints) {
VLOG(2) << "Connecting client...";
void TLocalClient::Connect(const tcp::endpoint& ep, const vector<tcp::endpoint>& endpoints) {
VLOG(2) << "Connecting client..." << ep;
unsigned conn_per_shard = GetFlag(FLAGS_c);
if (shard_slots_->Empty()) {
@ -854,16 +863,20 @@ void TLocalClient::Connect(tcp::endpoint ep, const vector<tcp::endpoint>& endpoi
size_t shard = i / conn_per_shard;
shard_ep = endpoints[shard];
}
fbs[i] = MakeFiber([&, shard_ep, i] {
ThisFiber::SetName(StrCat("connect/", i));
drivers_[i]->Connect(i, shard_ep);
});
fbs[i] =
fb2::Fiber(StrCat("connect/", i), [&, shard_ep, i] { drivers_[i]->Connect(i, shard_ep); });
}
for (auto& fb : fbs)
fb.Join();
}
void TLocalClient::Disconnect() {
for (size_t i = 0; i < drivers_.size(); ++i) {
drivers_[i]->Shutdown();
}
}
void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) {
key_gen_.emplace(key_min, key_max);
cmd_gen_.emplace(&key_gen_.value());
@ -1115,16 +1128,22 @@ int main(int argc, char* argv[]) {
ShardSlots shard_slots;
shard_slots.SetClusterSlotRanges(shards);
std::vector<tcp::endpoint> shard_endpoints = shard_slots.Endpoints();
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
pp->AwaitBrief([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed);
});
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
client = make_unique<TLocalClient>(p, &shard_slots);
client->Connect(ep, shard_endpoints);
});
absl::Duration duration;
if (absl::GetFlag(FLAGS_connect_only)) {
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Disconnect(); });
} else {
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);
const uint32_t key_maximum = GetFlag(FLAGS_key_maximum);
CHECK_LE(key_minimum, key_maximum);
@ -1176,9 +1195,10 @@ int main(int argc, char* argv[]) {
// The actual run.
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); });
absl::Duration duration = absl::Now() - start_time;
duration = absl::Now() - start_time;
finish.store(true);
watch_fb.Join();
}
fb2::Mutex mutex;