Merge branch 'main' into kpr2

This commit is contained in:
kostas 2025-05-06 12:05:08 +03:00
commit 0adaf119bf
No known key found for this signature in database
GPG key ID: 1860AC7B1177CACB
4 changed files with 113 additions and 89 deletions

View file

@ -72,6 +72,9 @@ ABSL_FLAG(bool, cluster_skip_tags, true,
"If true, skips tags (compatible with memtier benchmark) in cluster mode, " "If true, skips tags (compatible with memtier benchmark) in cluster mode, "
"othewise adds hash tags to keys"); "othewise adds hash tags to keys");
ABSL_FLAG(bool, ascii, true, "If true, use ascii characters for values"); ABSL_FLAG(bool, ascii, true, "If true, use ascii characters for values");
ABSL_FLAG(bool, connect_only, false,
"If true, will only connect to the server, without sending "
"loadtest commands");
using namespace std; using namespace std;
using namespace util; using namespace util;
@ -231,7 +234,7 @@ class KeyGenerator {
class CommandGenerator { class CommandGenerator {
public: public:
CommandGenerator(KeyGenerator* keygen); explicit CommandGenerator(KeyGenerator* keygen);
string Next(SlotRange range); string Next(SlotRange range);
@ -403,6 +406,7 @@ class Driver {
void Connect(unsigned index, const tcp::endpoint& ep); void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen); void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
void Shutdown();
float done() const { float done() const {
if (time_limit_ > 0) if (time_limit_ > 0)
@ -448,7 +452,9 @@ class TLocalClient {
TLocalClient(const TLocalClient&) = delete; TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep, const vector<tcp::endpoint>& shard_endpoints); void Connect(const tcp::endpoint& ep, const vector<tcp::endpoint>& shard_endpoints);
void Disconnect();
void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns); void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
void Join(); void Join();
@ -691,7 +697,10 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
while (!reqs_.empty()) { while (!reqs_.empty()) {
ThisFiber::SleepFor(1ms); ThisFiber::SleepFor(1ms);
} }
Shutdown();
}
void Driver::Shutdown() {
std::ignore = socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber. std::ignore = socket_->Shutdown(SHUT_RDWR); // breaks the receive fiber.
receive_fb_.Join(); receive_fb_.Join();
std::ignore = socket_->Close(); std::ignore = socket_->Close();
@ -832,8 +841,8 @@ void Driver::ParseMC() {
} }
} }
void TLocalClient::Connect(tcp::endpoint ep, const vector<tcp::endpoint>& endpoints) { void TLocalClient::Connect(const tcp::endpoint& ep, const vector<tcp::endpoint>& endpoints) {
VLOG(2) << "Connecting client..."; VLOG(2) << "Connecting client..." << ep;
unsigned conn_per_shard = GetFlag(FLAGS_c); unsigned conn_per_shard = GetFlag(FLAGS_c);
if (shard_slots_->Empty()) { if (shard_slots_->Empty()) {
@ -854,16 +863,20 @@ void TLocalClient::Connect(tcp::endpoint ep, const vector<tcp::endpoint>& endpoi
size_t shard = i / conn_per_shard; size_t shard = i / conn_per_shard;
shard_ep = endpoints[shard]; shard_ep = endpoints[shard];
} }
fbs[i] = MakeFiber([&, shard_ep, i] { fbs[i] =
ThisFiber::SetName(StrCat("connect/", i)); fb2::Fiber(StrCat("connect/", i), [&, shard_ep, i] { drivers_[i]->Connect(i, shard_ep); });
drivers_[i]->Connect(i, shard_ep);
});
} }
for (auto& fb : fbs) for (auto& fb : fbs)
fb.Join(); fb.Join();
} }
void TLocalClient::Disconnect() {
for (size_t i = 0; i < drivers_.size(); ++i) {
drivers_[i]->Shutdown();
}
}
void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) { void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns) {
key_gen_.emplace(key_min, key_max); key_gen_.emplace(key_min, key_max);
cmd_gen_.emplace(&key_gen_.value()); cmd_gen_.emplace(&key_gen_.value());
@ -1115,71 +1128,78 @@ int main(int argc, char* argv[]) {
ShardSlots shard_slots; ShardSlots shard_slots;
shard_slots.SetClusterSlotRanges(shards); shard_slots.SetClusterSlotRanges(shards);
std::vector<tcp::endpoint> shard_endpoints = shard_slots.Endpoints(); std::vector<tcp::endpoint> shard_endpoints = shard_slots.Endpoints();
pp->AwaitBrief([&](unsigned index, auto* p) {
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL); base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix(); auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed; VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed); bit_gen.seed(seed);
});
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
client = make_unique<TLocalClient>(p, &shard_slots); client = make_unique<TLocalClient>(p, &shard_slots);
client->Connect(ep, shard_endpoints); client->Connect(ep, shard_endpoints);
}); });
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum); absl::Duration duration;
const uint32_t key_maximum = GetFlag(FLAGS_key_maximum); if (absl::GetFlag(FLAGS_connect_only)) {
CHECK_LE(key_minimum, key_maximum); pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Disconnect(); });
uint32_t thread_key_step = 0;
uint32_t qps = abs(GetFlag(FLAGS_qps));
bool throttle = GetFlag(FLAGS_qps) > 0;
const int64_t interval = qps ? 1'000'000'000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num;
uint32_t time_limit = GetFlag(FLAGS_test_time);
if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
if (total_requests > (key_maximum - key_minimum)) {
CONSOLE_INFO << "Warning: only " << key_maximum - key_minimum
<< " unique entries will be accessed with " << total_requests
<< " total requests";
}
}
if (!time_limit) {
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall "
<< (throttle ? "with" : "without") << " throttling";
}
if (interval) {
CONSOLE_INFO << "At a rate of " << qps << " rps per connection, i.e. request every "
<< interval / 1000 << "us";
CONSOLE_INFO << "Overall scheduled RPS: " << qps * total_conn_num;
} else { } else {
CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server"; const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);
const uint32_t key_maximum = GetFlag(FLAGS_key_maximum);
CHECK_LE(key_minimum, key_maximum);
uint32_t thread_key_step = 0;
uint32_t qps = abs(GetFlag(FLAGS_qps));
bool throttle = GetFlag(FLAGS_qps) > 0;
const int64_t interval = qps ? 1'000'000'000LL / qps : 0;
uint64_t num_reqs = GetFlag(FLAGS_n);
uint64_t total_conn_num = GetFlag(FLAGS_c) * pp->size();
uint64_t total_requests = num_reqs * total_conn_num;
uint32_t time_limit = GetFlag(FLAGS_test_time);
if (dist_type == SEQUENTIAL) {
thread_key_step = std::max(1UL, (key_maximum - key_minimum + 1) / pp->size());
if (total_requests > (key_maximum - key_minimum)) {
CONSOLE_INFO << "Warning: only " << key_maximum - key_minimum
<< " unique entries will be accessed with " << total_requests
<< " total requests";
}
}
if (!time_limit) {
CONSOLE_INFO << "Running " << pp->size() << " threads, sending " << num_reqs
<< " requests per each connection, or " << total_requests << " requests overall "
<< (throttle ? "with" : "without") << " throttling";
}
if (interval) {
CONSOLE_INFO << "At a rate of " << qps << " rps per connection, i.e. request every "
<< interval / 1000 << "us";
CONSOLE_INFO << "Overall scheduled RPS: " << qps * total_conn_num;
} else {
CONSOLE_INFO << "Coordinated omission mode - the rate is determined by the server";
}
atomic_bool finish{false};
pp->AwaitBrief([&](unsigned index, auto* p) {
uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size())
? key_minimum + (index + 1) * thread_key_step - 1
: key_maximum;
client->Start(key_minimum + index * thread_key_step, key_max, interval);
});
auto watch_fb =
pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(shards.size(), &finish, pp.get()); });
const absl::Time start_time = absl::Now();
// The actual run.
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); });
duration = absl::Now() - start_time;
finish.store(true);
watch_fb.Join();
} }
atomic_bool finish{false};
pp->AwaitBrief([&](unsigned index, auto* p) {
uint32_t key_max = (thread_key_step > 0 && index + 1 < pp->size())
? key_minimum + (index + 1) * thread_key_step - 1
: key_maximum;
client->Start(key_minimum + index * thread_key_step, key_max, interval);
});
auto watch_fb =
pp->GetNextProactor()->LaunchFiber([&] { WatchFiber(shards.size(), &finish, pp.get()); });
const absl::Time start_time = absl::Now();
// The actual run.
pp->AwaitFiberOnAll([&](unsigned index, auto* p) { client->Join(); });
absl::Duration duration = absl::Now() - start_time;
finish.store(true);
watch_fb.Join();
fb2::Mutex mutex; fb2::Mutex mutex;
LOG(INFO) << "Resetting all threads"; LOG(INFO) << "Resetting all threads";

View file

@ -82,8 +82,8 @@ std::string MallocStatsCb(bool backing, unsigned tid) {
return str; return str;
} }
size_t MemoryUsage(PrimeIterator it) { size_t MemoryUsage(PrimeIterator it, bool account_key_memory_usage) {
size_t key_size = it->first.MallocUsed(); size_t key_size = account_key_memory_usage ? it->first.MallocUsed() : 0;
return key_size + it->second.MallocUsed(true); return key_size + it->second.MallocUsed(true);
} }
@ -95,9 +95,9 @@ MemoryCmd::MemoryCmd(ServerFamily* owner, facade::SinkReplyBuilder* builder,
} }
void MemoryCmd::Run(CmdArgList args) { void MemoryCmd::Run(CmdArgList args) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); CmdArgParser parser(args);
if (sub_cmd == "HELP") { if (parser.Check("HELP")) {
string_view help_arr[] = { string_view help_arr[] = {
"MEMORY <subcommand> [<arg> ...]. Subcommands are:", "MEMORY <subcommand> [<arg> ...]. Subcommands are:",
"STATS", "STATS",
@ -110,8 +110,9 @@ void MemoryCmd::Run(CmdArgList args) {
"ARENA SHOW", "ARENA SHOW",
" Prints the arena summary report for the entire process.", " Prints the arena summary report for the entire process.",
" Requires MIMALLOC_VERBOSE=1 environment to be set. The output goes to stdout", " Requires MIMALLOC_VERBOSE=1 environment to be set. The output goes to stdout",
"USAGE <key>", "USAGE <key> [WITHOUTKEY]",
" Show memory usage of a key.", " Show memory usage of a key.",
" If WITHOUTKEY is specified, the key itself is not accounted.",
"DECOMMIT", "DECOMMIT",
" Force decommit the memory freed by the server back to OS.", " Force decommit the memory freed by the server back to OS.",
"TRACK", "TRACK",
@ -137,35 +138,36 @@ void MemoryCmd::Run(CmdArgList args) {
return rb->SendSimpleStrArr(help_arr); return rb->SendSimpleStrArr(help_arr);
}; };
if (sub_cmd == "STATS") { if (parser.Check("STATS")) {
return Stats(); return Stats();
} }
if (sub_cmd == "USAGE" && args.size() > 1) { if (parser.Check("USAGE") && args.size() > 1) {
string_view key = ArgS(args, 1); string_view key = parser.Next();
return Usage(key); bool account_key_memory_usage = !parser.Check("WITHOUTKEY");
return Usage(key, account_key_memory_usage);
} }
if (sub_cmd == "DECOMMIT") { if (parser.Check("DECOMMIT")) {
shard_set->pool()->AwaitBrief( shard_set->pool()->AwaitBrief(
[](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); }); [](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); });
return builder_->SendSimpleString("OK"); return builder_->SendSimpleString("OK");
} }
if (sub_cmd == "MALLOC-STATS") { if (parser.Check("MALLOC-STATS")) {
return MallocStats(); return MallocStats();
} }
if (sub_cmd == "ARENA") { if (parser.Check("ARENA")) {
return ArenaStats(args); return ArenaStats(args);
} }
if (sub_cmd == "TRACK") { if (parser.Check("TRACK")) {
args.remove_prefix(1); args.remove_prefix(1);
return Track(args); return Track(args);
} }
if (sub_cmd == "DEFRAGMENT") { if (parser.Check("DEFRAGMENT")) {
shard_set->pool()->DispatchOnAll([](util::ProactorBase*) { shard_set->pool()->DispatchOnAll([](util::ProactorBase*) {
if (auto* shard = EngineShard::tlocal(); shard) if (auto* shard = EngineShard::tlocal(); shard)
shard->ForceDefrag(); shard->ForceDefrag();
@ -173,7 +175,7 @@ void MemoryCmd::Run(CmdArgList args) {
return builder_->SendSimpleString("OK"); return builder_->SendSimpleString("OK");
} }
string err = UnknownSubCmd(sub_cmd, "MEMORY"); string err = UnknownSubCmd(parser.Next(), "MEMORY");
return builder_->SendError(err, kSyntaxErrType); return builder_->SendError(err, kSyntaxErrType);
} }
@ -346,18 +348,19 @@ void MemoryCmd::ArenaStats(CmdArgList args) {
return rb->SendVerbatimString(mi_malloc_info); return rb->SendVerbatimString(mi_malloc_info);
} }
void MemoryCmd::Usage(std::string_view key) { void MemoryCmd::Usage(std::string_view key, bool account_key_memory_usage) {
ShardId sid = Shard(key, shard_set->size()); ShardId sid = Shard(key, shard_set->size());
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this, sid]() -> ssize_t { ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief(
auto& db_slice = cntx_->ns->GetDbSlice(sid); [key, account_key_memory_usage, this, sid]() -> ssize_t {
auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); auto& db_slice = cntx_->ns->GetDbSlice(sid);
PrimeIterator it = pt->Find(key); auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index());
if (IsValid(it)) { PrimeIterator it = pt->Find(key);
return MemoryUsage(it); if (IsValid(it)) {
} else { return MemoryUsage(it, account_key_memory_usage);
return -1; } else {
} return -1;
}); }
});
auto* rb = static_cast<RedisReplyBuilder*>(builder_); auto* rb = static_cast<RedisReplyBuilder*>(builder_);
if (memory_usage < 0) if (memory_usage < 0)

View file

@ -20,7 +20,7 @@ class MemoryCmd {
void Stats(); void Stats();
void MallocStats(); void MallocStats();
void ArenaStats(CmdArgList args); void ArenaStats(CmdArgList args);
void Usage(std::string_view key); void Usage(std::string_view key, bool account_key_memory_usage);
void Track(CmdArgList args); void Track(CmdArgList args);
ConnectionContext* cntx_; ConnectionContext* cntx_;

View file

@ -2959,6 +2959,7 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
await fill_task await fill_task
@pytest.mark.skip("Flaky test")
async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
""" """
This test reproduces a bug in the JSON memory tracking. This test reproduces a bug in the JSON memory tracking.