diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 61fbf0605..ba86f46be 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -406,7 +406,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { } else if (sub_cmd == "FLUSHSLOTS") { return DflyClusterFlushSlots(args, cntx); } else if (sub_cmd == "SLOT-MIGRATION-STATUS") { - return DflyIncomingSlotMigrationStatus(args, cntx); + return DflySlotMigrationStatus(args, cntx); } return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); @@ -609,7 +609,7 @@ bool ClusterFamily::StartSlotMigrations(std::vector migrations, return true; } -static std::string_view state_to_str(MigrationState state) { +static string_view StateToStr(MigrationState state) { switch (state) { case MigrationState::C_NO_STATE: return "NO_STATE"sv; @@ -626,44 +626,67 @@ static std::string_view state_to_str(MigrationState state) { return "UNDEFINED_STATE"sv; } -void ClusterFamily::DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) { - CmdArgParser parser(args); +static uint64_t GetKeyCount(const SlotRanges& slots) { + atomic_uint64_t keys = 0; + + shard_set->pool()->Await([&](auto*) { + EngineShard* shard = EngineShard::tlocal(); + if (shard == nullptr) + return; + + uint64_t shard_keys = 0; + for (const SlotRange& range : slots) { + for (SlotId slot = range.start; slot <= range.end; slot++) { + shard_keys += shard->db_slice().GetSlotStats(slot).key_count; + } + } + keys.fetch_add(shard_keys); + }); + + return keys.load(); +} + +void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) { auto* rb = static_cast(cntx->reply_builder()); + CmdArgParser parser(args); + lock_guard lk(migration_mu_); + + string_view node_id; if (parser.HasNext()) { - auto node_id = parser.Next(); - if (auto err = parser.Error(); err) + node_id = parser.Next(); + if (auto err = parser.Error(); err) { return rb->SendError(err->MakeReply()); - - lock_guard lk(migration_mu_); - // find incoming slot migration - for (const auto& m : incoming_migrations_jobs_) { - if (m->GetSourceID() == node_id) - return rb->SendSimpleString(state_to_str(m->GetState())); } - // find outgoing slot migration - for (const auto& migration : outgoing_migration_jobs_) { - if (migration->GetMigrationInfo().node_id == node_id) - return rb->SendSimpleString(state_to_str(migration->GetState())); - } - } else if (auto arr_size = incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size(); - arr_size != 0) { - rb->StartArray(arr_size); - const auto& send_answer = [rb](std::string_view direction, std::string_view node_id, - auto state) { - auto str = absl::StrCat(direction, " ", node_id, " ", state_to_str(state)); - rb->SendSimpleString(str); - }; - lock_guard lk(migration_mu_); - for (const auto& m : incoming_migrations_jobs_) { - send_answer("in", m->GetSourceID(), m->GetState()); - } - for (const auto& migration : outgoing_migration_jobs_) { - send_answer("out", migration->GetMigrationInfo().node_id, migration->GetState()); - } - return; } - return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE)); + + vector reply; + reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size()); + + auto append_answer = [rb, &reply](string_view direction, string_view node_id, string_view filter, + MigrationState state, const SlotRanges& slots) { + if (filter.empty() || filter == node_id) { + reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state), " ", + "keys:", GetKeyCount(slots))); + } + }; + + for (const auto& m : incoming_migrations_jobs_) { + append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetSlots()); + } + for (const auto& migration : outgoing_migration_jobs_) { + append_answer("out", migration->GetMigrationInfo().node_id, node_id, migration->GetState(), + migration->GetSlots()); + } + + if (reply.empty()) { + rb->SendSimpleString(StateToStr(MigrationState::C_NO_STATE)); + } else if (!node_id.empty()) { + DCHECK_EQ(reply.size(), 1UL); + rb->SendSimpleString(reply[0]); + } else { + rb->SendStringArr(reply); + } } void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index cb88e214e..4d4ac3a5b 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -56,7 +56,7 @@ class ClusterFamily { void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); private: // Slots migration section - void DflyIncomingSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx); + void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx); // DFLYMIGRATE is internal command defines several steps in slots migrations process void DflyMigrate(CmdArgList args, ConnectionContext* cntx); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index c3cec1541..95ef38934 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1042,9 +1042,8 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): await asyncio.sleep(0.5) - while ( - await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]) - != "FINISHED" + while "FINISHED" not in await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0] ): await asyncio.sleep(0.05) @@ -1245,7 +1244,7 @@ async def test_cluster_fuzzymigration( for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") print(states) - if not all(s.endswith("FINISHED") for s in states) and not states == "NO_STATE": + if not all("FINISHED" in s for s in states) and not states == "NO_STATE": break else: break diff --git a/tools/cluster_mgr.py b/tools/cluster_mgr.py index 86c759b14..fd99d57f3 100755 --- a/tools/cluster_mgr.py +++ b/tools/cluster_mgr.py @@ -343,7 +343,7 @@ def migrate(args): while True: sync_status = send_command(target_node, ["DFLYCLUSTER", "SLOT-MIGRATION-STATUS"]) assert len(sync_status) == 1 - if sync_status[0].endswith("STABLE_SYNC"): + if "STABLE_SYNC" in sync_status[0]: break print("Reached stable sync: ", sync_status)