From d1e5049f64066740e53c1722748299a72691d4cc Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 6 May 2025 08:36:33 +0300 Subject: [PATCH 1/3] feat(dfly_bench): introduce connect only flow (#5052) In this flow dfly_bench only connects to the server, reproducing high connection rate scenario. Signed-off-by: Roman Gershman --- src/server/dfly_bench.cc | 144 ++++++++++++++++++++++----------------- 1 file changed, 82 insertions(+), 62 deletions(-) diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 515900793..6102e8295 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -72,6 +72,9 @@ ABSL_FLAG(bool, cluster_skip_tags, true, "If true, skips tags (compatible with memtier benchmark) in cluster mode, " "othewise adds hash tags to keys"); ABSL_FLAG(bool, ascii, true, "If true, use ascii characters for values"); +ABSL_FLAG(bool, connect_only, false, + "If true, will only connect to the server, without sending " + "loadtest commands"); using namespace std; using namespace util; @@ -231,7 +234,7 @@ class KeyGenerator { class CommandGenerator { public: - CommandGenerator(KeyGenerator* keygen); + explicit CommandGenerator(KeyGenerator* keygen); string Next(SlotRange range); @@ -403,6 +406,7 @@ class Driver { void Connect(unsigned index, const tcp::endpoint& ep); void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); + void Shutdown(); float done() const { if (time_limit_ > 0) @@ -448,7 +452,9 @@ class TLocalClient { TLocalClient(const TLocalClient&) = delete; - void Connect(tcp::endpoint ep, const vector& shard_endpoints); + void Connect(const tcp::endpoint& ep, const vector& shard_endpoints); + void Disconnect(); + void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns); void Join(); @@ -691,7 +697,10 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { while (!reqs_.empty()) { ThisFiber::SleepFor(1ms); } + Shutdown(); +} +void Driver::Shutdown() { std::ignore = socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber. receive_fb_.Join(); std::ignore = socket_->Close(); @@ -832,8 +841,8 @@ void Driver::ParseMC() { } } -void TLocalClient::Connect(tcp::endpoint ep, const vector& endpoints) { - VLOG(2) << "Connecting client..."; +void TLocalClient::Connect(const tcp::endpoint& ep, const vector& endpoints) { + VLOG(2) << "Connecting client..." << ep; unsigned conn_per_shard = GetFlag(FLAGS_c); if (shard_slots_->Empty()) { @@ -854,16 +863,20 @@ void TLocalClient::Connect(tcp::endpoint ep, const vector& endpoi size_t shard = i / conn_per_shard; shard_ep = endpoints[shard]; } - fbs[i] = MakeFiber([&, shard_ep, i] { - ThisFiber::SetName(StrCat("connect/", i)); - drivers_[i]->Connect(i, shard_ep); - }); + fbs[i] = + fb2::Fiber(StrCat("connect/", i), [&, shard_ep, i] { drivers_[i]->Connect(i, shard_ep); }); } for (auto& fb : fbs) fb.Join(); } +void TLocalClient::Disconnect() { + for (size_t i = 0; i < drivers_.size(); ++i) { + drivers_[i]->Shutdown(); + } +} + void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) { key_gen_.emplace(key_min, key_max); cmd_gen_.emplace(&key_gen_.value()); @@ -1115,71 +1128,78 @@ int main(int argc, char* argv[]) { ShardSlots shard_slots; shard_slots.SetClusterSlotRanges(shards); std::vector shard_endpoints = shard_slots.Endpoints(); - - pp->AwaitFiberOnAll([&](unsigned index, auto* p) { + pp->AwaitBrief([&](unsigned index, auto* p) { base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL); auto seed = seed_mix(); VLOG(1) << "Seeding bitgen with seed " << seed; bit_gen.seed(seed); + }); + + pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client = make_unique(p, &shard_slots); client->Connect(ep, shard_endpoints); }); - const uint32_t key_minimum = GetFlag(FLAGS_key_minimum); - const uint32_t key_maximum = GetFlag(FLAGS_key_maximum); - CHECK_LE(key_minimum, key_maximum); - - uint32_t thread_key_step = 0; - uint32_t qps = abs(GetFlag(FLAGS_qps)); - bool throttle = GetFlag(FLAGS_qps) > 0; - const int64_t interval = qps ? 1'000'000'000LL / qps : 0; - uint64_t num_reqs = GetFlag(FLAGS_n); - - uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size(); - uint64_t total_requests = num_reqs * total_conn_num; - uint32_t time_limit = GetFlag(FLAGS_test_time); - - if (dist_type == SEQUENTIAL) { - thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size()); - if (total_requests > (key_maximum - key_minimum)) { - CONSOLE_INFO << "Warning: only " << key_maximum - key_minimum - << " unique entries will be accessed with " << total_requests - << " total requests"; - } - } - - if (!time_limit) { - CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs - << " requests per each connection, or " << total_requests << " requests overall " - << (throttle ? "with" : "without") << " throttling"; - } - if (interval) { - CONSOLE_INFO << "At a rate of " << qps << " rps per connection, i.e. request every " - << interval / 1000 << "us"; - CONSOLE_INFO << "Overall scheduled RPS: " << qps * total_conn_num; + absl::Duration duration; + if (absl::GetFlag(FLAGS_connect_only)) { + pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Disconnect(); }); } else { - CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server"; + const uint32_t key_minimum = GetFlag(FLAGS_key_minimum); + const uint32_t key_maximum = GetFlag(FLAGS_key_maximum); + CHECK_LE(key_minimum, key_maximum); + + uint32_t thread_key_step = 0; + uint32_t qps = abs(GetFlag(FLAGS_qps)); + bool throttle = GetFlag(FLAGS_qps) > 0; + const int64_t interval = qps ? 1'000'000'000LL / qps : 0; + uint64_t num_reqs = GetFlag(FLAGS_n); + + uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size(); + uint64_t total_requests = num_reqs * total_conn_num; + uint32_t time_limit = GetFlag(FLAGS_test_time); + + if (dist_type == SEQUENTIAL) { + thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size()); + if (total_requests > (key_maximum - key_minimum)) { + CONSOLE_INFO << "Warning: only " << key_maximum - key_minimum + << " unique entries will be accessed with " << total_requests + << " total requests"; + } + } + + if (!time_limit) { + CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs + << " requests per each connection, or " << total_requests << " requests overall " + << (throttle ? "with" : "without") << " throttling"; + } + if (interval) { + CONSOLE_INFO << "At a rate of " << qps << " rps per connection, i.e. request every " + << interval / 1000 << "us"; + CONSOLE_INFO << "Overall scheduled RPS: " << qps * total_conn_num; + } else { + CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server"; + } + + atomic_bool finish{false}; + pp->AwaitBrief([&](unsigned index, auto* p) { + uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size()) + ? key_minimum + (index + 1) * thread_key_step - 1 + : key_maximum; + client->Start(key_minimum + index * thread_key_step, key_max, interval); + }); + + auto watch_fb = + pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(shards.size(), &finish, pp.get()); }); + const absl::Time start_time = absl::Now(); + + // The actual run. + pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); }); + + duration = absl::Now() - start_time; + finish.store(true); + watch_fb.Join(); } - atomic_bool finish{false}; - pp->AwaitBrief([&](unsigned index, auto* p) { - uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size()) - ? key_minimum + (index + 1) * thread_key_step - 1 - : key_maximum; - client->Start(key_minimum + index * thread_key_step, key_max, interval); - }); - - auto watch_fb = - pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(shards.size(), &finish, pp.get()); }); - const absl::Time start_time = absl::Now(); - - // The actual run. - pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); }); - - absl::Duration duration = absl::Now() - start_time; - finish.store(true); - watch_fb.Join(); - fb2::Mutex mutex; LOG(INFO) << "Resetting all threads"; From a8b19c9b8812eb06204b8a248bb67a9a1e3b49c3 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 6 May 2025 10:38:02 +0300 Subject: [PATCH 2/3] chore: skip test_bug_in_json_memory_tracking (#5066) Signed-off-by: kostas --- tests/dragonfly/replication_test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index b3c41f679..3e1bba444 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2959,6 +2959,7 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa await fill_task +@pytest.mark.skip("Flaky test") async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): """ This test reproduces a bug in the JSON memory tracking. From 3da7e497125cdd9b9c15773fefe5295dc58e6a70 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Tue, 6 May 2025 10:44:16 +0200 Subject: [PATCH 3/3] feat(memory_cmd): Add WITHOUTKEY option for the MEMORY USAGE command. SECOND PR (#5068) feat(memory_cmd): Add WITHOUTKEY option for the MEMORY USAGE command Signed-off-by: Stepan Bagritsevich --- src/server/memory_cmd.cc | 55 +++++++++++++++++++++------------------- src/server/memory_cmd.h | 2 +- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index 00d1cb88d..d29941907 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -82,8 +82,8 @@ std::string MallocStatsCb(bool backing, unsigned tid) { return str; } -size_t MemoryUsage(PrimeIterator it) { - size_t key_size = it->first.MallocUsed(); +size_t MemoryUsage(PrimeIterator it, bool account_key_memory_usage) { + size_t key_size = account_key_memory_usage ? it->first.MallocUsed() : 0; return key_size + it->second.MallocUsed(true); } @@ -95,9 +95,9 @@ MemoryCmd::MemoryCmd(ServerFamily* owner, facade::SinkReplyBuilder* builder, } void MemoryCmd::Run(CmdArgList args) { - string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); + CmdArgParser parser(args); - if (sub_cmd == "HELP") { + if (parser.Check("HELP")) { string_view help_arr[] = { "MEMORY [ ...]. Subcommands are:", "STATS", @@ -110,8 +110,9 @@ void MemoryCmd::Run(CmdArgList args) { "ARENA SHOW", " Prints the arena summary report for the entire process.", " Requires MIMALLOC_VERBOSE=1 environment to be set. The output goes to stdout", - "USAGE ", + "USAGE [WITHOUTKEY]", " Show memory usage of a key.", + " If WITHOUTKEY is specified, the key itself is not accounted.", "DECOMMIT", " Force decommit the memory freed by the server back to OS.", "TRACK", @@ -137,35 +138,36 @@ void MemoryCmd::Run(CmdArgList args) { return rb->SendSimpleStrArr(help_arr); }; - if (sub_cmd == "STATS") { + if (parser.Check("STATS")) { return Stats(); } - if (sub_cmd == "USAGE" && args.size() > 1) { - string_view key = ArgS(args, 1); - return Usage(key); + if (parser.Check("USAGE") && args.size() > 1) { + string_view key = parser.Next(); + bool account_key_memory_usage = !parser.Check("WITHOUTKEY"); + return Usage(key, account_key_memory_usage); } - if (sub_cmd == "DECOMMIT") { + if (parser.Check("DECOMMIT")) { shard_set->pool()->AwaitBrief( [](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); }); return builder_->SendSimpleString("OK"); } - if (sub_cmd == "MALLOC-STATS") { + if (parser.Check("MALLOC-STATS")) { return MallocStats(); } - if (sub_cmd == "ARENA") { + if (parser.Check("ARENA")) { return ArenaStats(args); } - if (sub_cmd == "TRACK") { + if (parser.Check("TRACK")) { args.remove_prefix(1); return Track(args); } - if (sub_cmd == "DEFRAGMENT") { + if (parser.Check("DEFRAGMENT")) { shard_set->pool()->DispatchOnAll([](util::ProactorBase*) { if (auto* shard = EngineShard::tlocal(); shard) shard->ForceDefrag(); @@ -173,7 +175,7 @@ void MemoryCmd::Run(CmdArgList args) { return builder_->SendSimpleString("OK"); } - string err = UnknownSubCmd(sub_cmd, "MEMORY"); + string err = UnknownSubCmd(parser.Next(), "MEMORY"); return builder_->SendError(err, kSyntaxErrType); } @@ -346,18 +348,19 @@ void MemoryCmd::ArenaStats(CmdArgList args) { return rb->SendVerbatimString(mi_malloc_info); } -void MemoryCmd::Usage(std::string_view key) { +void MemoryCmd::Usage(std::string_view key, bool account_key_memory_usage) { ShardId sid = Shard(key, shard_set->size()); - ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this, sid]() -> ssize_t { - auto& db_slice = cntx_->ns->GetDbSlice(sid); - auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); - PrimeIterator it = pt->Find(key); - if (IsValid(it)) { - return MemoryUsage(it); - } else { - return -1; - } - }); + ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief( + [key, account_key_memory_usage, this, sid]() -> ssize_t { + auto& db_slice = cntx_->ns->GetDbSlice(sid); + auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); + PrimeIterator it = pt->Find(key); + if (IsValid(it)) { + return MemoryUsage(it, account_key_memory_usage); + } else { + return -1; + } + }); auto* rb = static_cast(builder_); if (memory_usage < 0) diff --git a/src/server/memory_cmd.h b/src/server/memory_cmd.h index 6986e0cae..5850d17d2 100644 --- a/src/server/memory_cmd.h +++ b/src/server/memory_cmd.h @@ -20,7 +20,7 @@ class MemoryCmd { void Stats(); void MallocStats(); void ArenaStats(CmdArgList args); - void Usage(std::string_view key); + void Usage(std::string_view key, bool account_key_memory_usage); void Track(CmdArgList args); ConnectionContext* cntx_;