diff --git a/src/server/journal/cmd_serializer.cc b/src/server/journal/cmd_serializer.cc index f9c98b99f..bcddcc862 100644 --- a/src/server/journal/cmd_serializer.cc +++ b/src/server/journal/cmd_serializer.cc @@ -26,13 +26,18 @@ class CommandAggregator { } enum class CommitMode { kAuto, kNoCommit }; - void AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) { + + // Returns whether CommitPending() was called + bool AddArg(string arg, CommitMode commit_mode = CommitMode::kAuto) { agg_bytes_ += arg.size(); members_.push_back(std::move(arg)); if (commit_mode != CommitMode::kNoCommit && agg_bytes_ >= max_aggragation_bytes_) { CommitPending(); + return true; } + + return false; } private: @@ -65,26 +70,27 @@ CmdSerializer::CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer : cb_(std::move(cb)), max_serialization_buffer_size_(max_serialization_buffer_size) { } -void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, - uint64_t expire_ms) { +size_t CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, + uint64_t expire_ms) { // We send RESTORE commands for small objects, or objects we don't support breaking. bool use_restore_serialization = true; + size_t commands = 1; if (max_serialization_buffer_size_ > 0 && pv.MallocUsed() > max_serialization_buffer_size_) { switch (pv.ObjType()) { case OBJ_SET: - SerializeSet(key, pv); + commands = SerializeSet(key, pv); use_restore_serialization = false; break; case OBJ_ZSET: - SerializeZSet(key, pv); + commands = SerializeZSet(key, pv); use_restore_serialization = false; break; case OBJ_HASH: - SerializeHash(key, pv); + commands = SerializeHash(key, pv); use_restore_serialization = false; break; case OBJ_LIST: - SerializeList(key, pv); + commands = SerializeList(key, pv); use_restore_serialization = false; break; case OBJ_STRING: @@ -105,6 +111,7 @@ void CmdSerializer::SerializeEntry(string_view key, const PrimeValue& pk, const SerializeStickIfNeeded(key, pk); SerializeExpireIfNeeded(key, expire_ms); } + return commands; } void CmdSerializer::SerializeCommand(string_view cmd, absl::Span args) { @@ -139,54 +146,62 @@ void CmdSerializer::SerializeExpireIfNeeded(string_view key, uint64_t expire_ms) SerializeCommand("PEXIRE", {key, absl::StrCat(expire_ms)}); } -void CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { +size_t CmdSerializer::SerializeSet(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( key, [&](absl::Span args) { SerializeCommand("SADD", args); }, max_serialization_buffer_size_); + size_t commands = 0; container_utils::IterateSet(pv, [&](container_utils::ContainerEntry ce) { - aggregator.AddArg(ce.ToString()); + commands += aggregator.AddArg(ce.ToString()); return true; }); + return commands; } -void CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { +size_t CmdSerializer::SerializeZSet(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( key, [&](absl::Span args) { SerializeCommand("ZADD", args); }, max_serialization_buffer_size_); + size_t commands = 0; container_utils::IterateSortedSet( pv.GetRobjWrapper(), [&](container_utils::ContainerEntry ce, double score) { aggregator.AddArg(absl::StrCat(score), CommandAggregator::CommitMode::kNoCommit); - aggregator.AddArg(ce.ToString()); + commands += aggregator.AddArg(ce.ToString()); return true; }, /*start=*/0, /*end=*/-1, /*reverse=*/false, /*use_score=*/true); + return commands; } -void CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { +size_t CmdSerializer::SerializeHash(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( key, [&](absl::Span args) { SerializeCommand("HSET", args); }, max_serialization_buffer_size_); + size_t commands = 0; container_utils::IterateMap( pv, [&](container_utils::ContainerEntry k, container_utils::ContainerEntry v) { aggregator.AddArg(k.ToString(), CommandAggregator::CommitMode::kNoCommit); - aggregator.AddArg(v.ToString()); + commands += aggregator.AddArg(v.ToString()); return true; }); + return commands; } -void CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) { +size_t CmdSerializer::SerializeList(string_view key, const PrimeValue& pv) { CommandAggregator aggregator( key, [&](absl::Span args) { SerializeCommand("RPUSH", args); }, max_serialization_buffer_size_); + size_t commands = 0; container_utils::IterateList(pv, [&](container_utils::ContainerEntry ce) { - aggregator.AddArg(ce.ToString()); + commands += aggregator.AddArg(ce.ToString()); return true; }); + return commands; } void CmdSerializer::SerializeRestore(string_view key, const PrimeValue& pk, const PrimeValue& pv, diff --git a/src/server/journal/cmd_serializer.h b/src/server/journal/cmd_serializer.h index 6c9e2a51d..5963cd7f7 100644 --- a/src/server/journal/cmd_serializer.h +++ b/src/server/journal/cmd_serializer.h @@ -23,18 +23,19 @@ class CmdSerializer { explicit CmdSerializer(FlushSerialized cb, size_t max_serialization_buffer_size); - void SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, - uint64_t expire_ms); + // Returns how many commands we broke this entry into (like multiple HSETs etc) + size_t SerializeEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, + uint64_t expire_ms); private: void SerializeCommand(std::string_view cmd, absl::Span args); void SerializeStickIfNeeded(std::string_view key, const PrimeValue& pk); void SerializeExpireIfNeeded(std::string_view key, uint64_t expire_ms); - void SerializeSet(std::string_view key, const PrimeValue& pv); - void SerializeZSet(std::string_view key, const PrimeValue& pv); - void SerializeHash(std::string_view key, const PrimeValue& pv); - void SerializeList(std::string_view key, const PrimeValue& pv); + size_t SerializeSet(std::string_view key, const PrimeValue& pv); + size_t SerializeZSet(std::string_view key, const PrimeValue& pv); + size_t SerializeHash(std::string_view key, const PrimeValue& pv); + size_t SerializeList(std::string_view key, const PrimeValue& pv); void SerializeRestore(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index e2b04116b..7cc3e038e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -224,7 +224,7 @@ void RestoreStreamer::Run() { auto* blocking_counter = db_slice_->BlockingCounter(); std::lock_guard blocking_counter_guard(*blocking_counter); - WriteBucket(it); + stats_.buckets_loop += WriteBucket(it); }); if (++last_yield >= 100) { @@ -232,10 +232,19 @@ void RestoreStreamer::Run() { last_yield = 0; } } while (cursor); + + VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString() + << ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop; } void RestoreStreamer::SendFinalize(long attempt) { - VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt; + VLOG(1) << "RestoreStreamer LSN of " << my_slots_.ToSlotRanges().ToString() << ", shard " + << db_slice_->shard_id() << " attempt " << attempt << " with " << stats_.commands + << " commands. Buckets looped " << stats_.buckets_loop << ", buckets on_db_update " + << stats_.buckets_on_db_update << ", buckets skipped " << stats_.buckets_skipped + << ", buckets written " << stats_.buckets_written << ". Keys skipped " + << stats_.keys_skipped << ", keys written " << stats_.keys_written; + journal::Entry entry(journal::Op::LSN, attempt); io::StringSink sink; @@ -285,14 +294,19 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { return my_slots_.Contains(slot_id); } -void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { +bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { + bool written = false; + if (it.GetVersion() < snapshot_version_) { + stats_.buckets_written++; + it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it for (; !it.is_done(); ++it) { const auto& pv = it->second; string_view key = it->first.GetSlice(&key_buffer); if (ShouldWrite(key)) { + stats_.keys_written++; uint64_t expire = 0; if (pv.HasExpire()) { auto eit = db_slice_->databases()[0]->expire.Find(it->first); @@ -300,10 +314,17 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { } WriteEntry(key, it->first, pv, expire); + written = true; + } else { + stats_.keys_skipped++; } } + } else { + stats_.buckets_skipped++; } ThrottleIfNeeded(); + + return written; } void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { @@ -313,12 +334,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { - WriteBucket(*bit); + stats_.buckets_on_db_update += WriteBucket(*bit); } else { string_view key = get(req.change); table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); - WriteBucket(it); + stats_.buckets_on_db_update += WriteBucket(it); }); } } @@ -331,7 +352,7 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr ThrottleIfNeeded(); }, ServerState::tlocal()->serialization_max_chunk_size); - serializer.SerializeEntry(key, pk, pv, expire_ms); + stats_.commands += serializer.SerializeEntry(key, pk, pv, expire_ms); } } // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 6677bdcb0..e46713dd3 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -101,11 +101,22 @@ class RestoreStreamer : public JournalStreamer { bool ShouldWrite(std::string_view key) const; bool ShouldWrite(SlotId slot_id) const; - // Returns whether anything was written - void WriteBucket(PrimeTable::bucket_iterator it); + // Returns true if any entry was actually written + bool WriteBucket(PrimeTable::bucket_iterator it); + void WriteEntry(std::string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms); + struct Stats { + size_t buckets_skipped = 0; + size_t buckets_written = 0; + size_t buckets_loop = 0; + size_t buckets_on_db_update = 0; + size_t keys_written = 0; + size_t keys_skipped = 0; + size_t commands = 0; + }; + DbSlice* db_slice_; DbTableArray db_array_; uint64_t snapshot_version_ = 0; @@ -113,6 +124,7 @@ class RestoreStreamer : public JournalStreamer { bool fiber_cancelled_ = false; bool snapshot_finished_ = false; ThreadLocalMutex big_value_mu_; + Stats stats_; }; } // namespace dfly diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index effa56b52..0d7d1b044 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -214,6 +214,21 @@ async def get_node_id(connection): return id +def stop_and_get_restore_log(instance): + instance.stop() + lines = instance.find_in_logs("RestoreStreamer LSN") + assert len(lines) == 1 + line = lines[0] + logging.debug(f"Streamer log line: {line}") + return line + + +def extract_int_after_prefix(prefix, line): + match = re.search(prefix + "(\\d+)", line) + assert match + return int(match.group(1)) + + @dfly_args({}) class TestNotEmulated: async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis): @@ -2035,6 +2050,18 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) logging.debug(f"Memory before {mem_before} after {mem_after}") assert mem_after < mem_before * 1.1 + line = stop_and_get_restore_log(nodes[0].instance) + + # 'with X commands' - how many breakdowns we used for the keys + assert extract_int_after_prefix("with ", line) > 500_000 + + assert extract_int_after_prefix("Keys skipped ", line) == 0 + assert extract_int_after_prefix("buckets skipped ", line) == 0 + assert extract_int_after_prefix("keys written ", line) > 90 + + # We don't send updates during the migration + assert extract_int_after_prefix("buckets on_db_update ", line) == 0 + @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) @pytest.mark.parametrize("chunk_size", [1_000_000, 30]) @@ -2056,7 +2083,6 @@ async def test_cluster_migration_while_seeding( nodes[0].slots = [(0, 16383)] nodes[1].slots = [] client0 = nodes[0].client - client1 = nodes[1].client await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) @@ -2098,6 +2124,12 @@ async def test_cluster_migration_while_seeding( capture = await seeder.capture_fake_redis() assert await seeder.compare(capture, instances[1].port) + line = stop_and_get_restore_log(nodes[0].instance) + assert extract_int_after_prefix("Keys skipped ", line) == 0 + assert extract_int_after_prefix("buckets skipped ", line) > 0 + assert extract_int_after_prefix("keys written ", line) >= 9_000 + assert extract_int_after_prefix("buckets on_db_update ", line) > 0 + def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 8e9e44536..4a96aa67c 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -380,16 +380,17 @@ class DflyInstance: for metric_family in text_string_to_metric_families(data) } - def is_in_logs(self, pattern): + def find_in_logs(self, pattern): if self.proc is not None: raise RuntimeError("Must close server first") + results = [] matcher = re.compile(pattern) for path in self.log_files: for line in open(path): if matcher.search(line): - return True - return False + results.append(line) + return results @property def rss(self): @@ -416,7 +417,7 @@ class DflyInstanceFactory: args.setdefault("noversion_check", None) # MacOs does not set it automatically, so we need to set it manually args.setdefault("maxmemory", "8G") - vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1" + vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1,streamer=1" args.setdefault("vmodule", vmod) args.setdefault("jsonpathv2")