chore: Add stats print for slot migrations (#4456)

* chore: Add stats print for slot migrations

Fixes #4415
This commit is contained in:
Shahar Mike 2025-01-15 13:06:09 +02:00 committed by GitHub
parent 0eff6c93f2
commit 5ba608b58d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 116 additions and 34 deletions

View file

@ -26,13 +26,18 @@ class CommandAggregator {
} }
enum class CommitMode { kAuto, kNoCommit }; enum class CommitMode { kAuto, kNoCommit };
void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
// Returns whether CommitPending() was called
bool AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) {
agg_bytes_ += arg.size(); agg_bytes_ += arg.size();
members_.push_back(std::move(arg)); members_.push_back(std::move(arg));
if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) { if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) {
CommitPending(); CommitPending();
return true;
} }
return false;
} }
private: private:
@ -65,26 +70,27 @@ CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer
: cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) { : cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) {
} }
void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, size_t CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms) { uint64_t expire_ms) {
// We send RESTORE commands for small objects, or objects we don't support breaking. // We send RESTORE commands for small objects, or objects we don't support breaking.
bool use_restore_serialization = true; bool use_restore_serialization = true;
size_t commands = 1;
if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) { if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) {
switch (pv.ObjType()) { switch (pv.ObjType()) {
case OBJ_SET: case OBJ_SET:
SerializeSet(key, pv); commands = SerializeSet(key, pv);
use_restore_serialization = false; use_restore_serialization = false;
break; break;
case OBJ_ZSET: case OBJ_ZSET:
SerializeZSet(key, pv); commands = SerializeZSet(key, pv);
use_restore_serialization = false; use_restore_serialization = false;
break; break;
case OBJ_HASH: case OBJ_HASH:
SerializeHash(key, pv); commands = SerializeHash(key, pv);
use_restore_serialization = false; use_restore_serialization = false;
break; break;
case OBJ_LIST: case OBJ_LIST:
SerializeList(key, pv); commands = SerializeList(key, pv);
use_restore_serialization = false; use_restore_serialization = false;
break; break;
case OBJ_STRING: case OBJ_STRING:
@ -105,6 +111,7 @@ void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const
SerializeStickIfNeeded(key, pk); SerializeStickIfNeeded(key, pk);
SerializeExpireIfNeeded(key, expire_ms); SerializeExpireIfNeeded(key, expire_ms);
} }
return commands;
} }
void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) { void CmdSerializer::SerializeCommand(string_view cmd, absl::Span<const string_view> args) {
@ -139,54 +146,62 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms)
SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)}); SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)});
} }
void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { size_t CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); }, key, [&](absl::Span<const string_view> args) { SerializeCommand("SADD", args); },
max_serialization_buffer_size_); max_serialization_buffer_size_);
size_t commands = 0;
container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) { container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString()); commands += aggregator.AddArg(ce.ToString());
return true; return true;
}); });
return commands;
} }
void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); }, key, [&](absl::Span<const string_view> args) { SerializeCommand("ZADD", args); },
max_serialization_buffer_size_); max_serialization_buffer_size_);
size_t commands = 0;
container_utils::IterateSortedSet( container_utils::IterateSortedSet(
pv.GetRobjWrapper(), pv.GetRobjWrapper(),
[&](container_utils::ContainerEntry ce, double score) { [&](container_utils::ContainerEntry ce, double score) {
aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit); aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(ce.ToString()); commands += aggregator.AddArg(ce.ToString());
return true; return true;
}, },
/*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true); /*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true);
return commands;
} }
void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { size_t CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); }, key, [&](absl::Span<const string_view> args) { SerializeCommand("HSET", args); },
max_serialization_buffer_size_); max_serialization_buffer_size_);
size_t commands = 0;
container_utils::IterateMap( container_utils::IterateMap(
pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) { pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) {
aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit); aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit);
aggregator.AddArg(v.ToString()); commands += aggregator.AddArg(v.ToString());
return true; return true;
}); });
return commands;
} }
void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) { size_t CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) {
CommandAggregator aggregator( CommandAggregator aggregator(
key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); }, key, [&](absl::Span<const string_view> args) { SerializeCommand("RPUSH", args); },
max_serialization_buffer_size_); max_serialization_buffer_size_);
size_t commands = 0;
container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) { container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) {
aggregator.AddArg(ce.ToString()); commands += aggregator.AddArg(ce.ToString());
return true; return true;
}); });
return commands;
} }
void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv, void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv,

View file

@ -23,18 +23,19 @@ class CmdSerializer {
explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size); explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size);
void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, // Returns how many commands we broke this entry into (like multiple HSETs etc)
uint64_t expire_ms); size_t SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms);
private: private:
void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args); void SerializeCommand(std::string_view cmd, absl::Span<const std::string_view> args);
void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk); void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk);
void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms); void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms);
void SerializeSet(std::string_view key, const PrimeValue& pv); size_t SerializeSet(std::string_view key, const PrimeValue& pv);
void SerializeZSet(std::string_view key, const PrimeValue& pv); size_t SerializeZSet(std::string_view key, const PrimeValue& pv);
void SerializeHash(std::string_view key, const PrimeValue& pv); size_t SerializeHash(std::string_view key, const PrimeValue& pv);
void SerializeList(std::string_view key, const PrimeValue& pv); size_t SerializeList(std::string_view key, const PrimeValue& pv);
void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms); uint64_t expire_ms);

View file

@ -224,7 +224,7 @@ void RestoreStreamer::Run() {
auto* blocking_counter = db_slice_->BlockingCounter(); auto* blocking_counter = db_slice_->BlockingCounter();
std::lock_guard blocking_counter_guard(*blocking_counter); std::lock_guard blocking_counter_guard(*blocking_counter);
WriteBucket(it); stats_.buckets_loop += WriteBucket(it);
}); });
if (++last_yield >= 100) { if (++last_yield >= 100) {
@ -232,10 +232,19 @@ void RestoreStreamer::Run() {
last_yield = 0; last_yield = 0;
} }
} while (cursor); } while (cursor);
VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
} }
void RestoreStreamer::SendFinalize(long attempt) { void RestoreStreamer::SendFinalize(long attempt) {
VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt; VLOG(1) << "RestoreStreamer LSN of " << my_slots_.ToSlotRanges().ToString() << ", shard "
<< db_slice_->shard_id() << " attempt " << attempt << " with " << stats_.commands
<< " commands. Buckets looped " << stats_.buckets_loop << ", buckets on_db_update "
<< stats_.buckets_on_db_update << ", buckets skipped " << stats_.buckets_skipped
<< ", buckets written " << stats_.buckets_written << ". Keys skipped "
<< stats_.keys_skipped << ", keys written " << stats_.keys_written;
journal::Entry entry(journal::Op::LSN, attempt); journal::Entry entry(journal::Op::LSN, attempt);
io::StringSink sink; io::StringSink sink;
@ -285,14 +294,19 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.Contains(slot_id); return my_slots_.Contains(slot_id);
} }
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;
if (it.GetVersion() < snapshot_version_) { if (it.GetVersion() < snapshot_version_) {
stats_.buckets_written++;
it.SetVersion(snapshot_version_); it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) { for (; !it.is_done(); ++it) {
const auto& pv = it->second; const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer); string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) { if (ShouldWrite(key)) {
stats_.keys_written++;
uint64_t expire = 0; uint64_t expire = 0;
if (pv.HasExpire()) { if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first); auto eit = db_slice_->databases()[0]->expire.Find(it->first);
@ -300,10 +314,17 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
} }
WriteEntry(key, it->first, pv, expire); WriteEntry(key, it->first, pv, expire);
written = true;
} else {
stats_.keys_skipped++;
} }
} }
} else {
stats_.buckets_skipped++;
} }
ThrottleIfNeeded(); ThrottleIfNeeded();
return written;
} }
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -313,12 +334,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
PrimeTable* table = db_slice_->GetTables(0).first; PrimeTable* table = db_slice_->GetTables(0).first;
if (const PrimeTable::bucket_iterator* bit = req.update()) { if (const PrimeTable::bucket_iterator* bit = req.update()) {
WriteBucket(*bit); stats_.buckets_on_db_update += WriteBucket(*bit);
} else { } else {
string_view key = get<string_view>(req.change); string_view key = get<string_view>(req.change);
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) { table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_); DCHECK_LT(it.GetVersion(), snapshot_version_);
WriteBucket(it); stats_.buckets_on_db_update += WriteBucket(it);
}); });
} }
} }
@ -331,7 +352,7 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
ThrottleIfNeeded(); ThrottleIfNeeded();
}, },
ServerState::tlocal()->serialization_max_chunk_size); ServerState::tlocal()->serialization_max_chunk_size);
serializer.SerializeEntry(key, pk, pv, expire_ms); stats_.commands += serializer.SerializeEntry(key, pk, pv, expire_ms);
} }
} // namespace dfly } // namespace dfly

View file

@ -101,11 +101,22 @@ class RestoreStreamer : public JournalStreamer {
bool ShouldWrite(std::string_view key) const; bool ShouldWrite(std::string_view key) const;
bool ShouldWrite(SlotId slot_id) const; bool ShouldWrite(SlotId slot_id) const;
// Returns whether anything was written // Returns true if any entry was actually written
void WriteBucket(PrimeTable::bucket_iterator it); bool WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv,
uint64_t expire_ms); uint64_t expire_ms);
struct Stats {
size_t buckets_skipped = 0;
size_t buckets_written = 0;
size_t buckets_loop = 0;
size_t buckets_on_db_update = 0;
size_t keys_written = 0;
size_t keys_skipped = 0;
size_t commands = 0;
};
DbSlice* db_slice_; DbSlice* db_slice_;
DbTableArray db_array_; DbTableArray db_array_;
uint64_t snapshot_version_ = 0; uint64_t snapshot_version_ = 0;
@ -113,6 +124,7 @@ class RestoreStreamer : public JournalStreamer {
bool fiber_cancelled_ = false; bool fiber_cancelled_ = false;
bool snapshot_finished_ = false; bool snapshot_finished_ = false;
ThreadLocalMutex big_value_mu_; ThreadLocalMutex big_value_mu_;
Stats stats_;
}; };
} // namespace dfly } // namespace dfly

View file

@ -214,6 +214,21 @@ async def get_node_id(connection):
return id return id
def stop_and_get_restore_log(instance):
instance.stop()
lines = instance.find_in_logs("RestoreStreamer LSN")
assert len(lines) == 1
line = lines[0]
logging.debug(f"Streamer log line: {line}")
return line
def extract_int_after_prefix(prefix, line):
match = re.search(prefix + "(\\d+)", line)
assert match
return int(match.group(1))
@dfly_args({}) @dfly_args({})
class TestNotEmulated: class TestNotEmulated:
async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis): async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis):
@ -2035,6 +2050,18 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
logging.debug(f"Memory before {mem_before} after {mem_after}") logging.debug(f"Memory before {mem_before} after {mem_after}")
assert mem_after < mem_before * 1.1 assert mem_after < mem_before * 1.1
line = stop_and_get_restore_log(nodes[0].instance)
# 'with X commands' - how many breakdowns we used for the keys
assert extract_int_after_prefix("with ", line) > 500_000
assert extract_int_after_prefix("Keys skipped ", line) == 0
assert extract_int_after_prefix("buckets skipped ", line) == 0
assert extract_int_after_prefix("keys written ", line) > 90
# We don't send updates during the migration
assert extract_int_after_prefix("buckets on_db_update ", line) == 0
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@pytest.mark.parametrize("chunk_size", [1_000_000, 30]) @pytest.mark.parametrize("chunk_size", [1_000_000, 30])
@ -2056,7 +2083,6 @@ async def test_cluster_migration_while_seeding(
nodes[0].slots = [(0, 16383)] nodes[0].slots = [(0, 16383)]
nodes[1].slots = [] nodes[1].slots = []
client0 = nodes[0].client client0 = nodes[0].client
client1 = nodes[1].client
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
@ -2098,6 +2124,12 @@ async def test_cluster_migration_while_seeding(
capture = await seeder.capture_fake_redis() capture = await seeder.capture_fake_redis()
assert await seeder.compare(capture, instances[1].port) assert await seeder.compare(capture, instances[1].port)
line = stop_and_get_restore_log(nodes[0].instance)
assert extract_int_after_prefix("Keys skipped ", line) == 0
assert extract_int_after_prefix("buckets skipped ", line) > 0
assert extract_int_after_prefix("keys written ", line) >= 9_000
assert extract_int_after_prefix("buckets on_db_update ", line) > 0
def parse_lag(replication_info: str): def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info) lags = re.findall("lag=([0-9]+)\r\n", replication_info)

View file

@ -380,16 +380,17 @@ class DflyInstance:
for metric_family in text_string_to_metric_families(data) for metric_family in text_string_to_metric_families(data)
} }
def is_in_logs(self, pattern): def find_in_logs(self, pattern):
if self.proc is not None: if self.proc is not None:
raise RuntimeError("Must close server first") raise RuntimeError("Must close server first")
results = []
matcher = re.compile(pattern) matcher = re.compile(pattern)
for path in self.log_files: for path in self.log_files:
for line in open(path): for line in open(path):
if matcher.search(line): if matcher.search(line):
return True results.append(line)
return False return results
@property @property
def rss(self): def rss(self):
@ -416,7 +417,7 @@ class DflyInstanceFactory:
args.setdefault("noversion_check", None) args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually # MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G") args.setdefault("maxmemory", "8G")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1" vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1,streamer=1"
args.setdefault("vmodule", vmod) args.setdefault("vmodule", vmod)
args.setdefault("jsonpathv2") args.setdefault("jsonpathv2")