diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 57e751451..833942dbe 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -28,7 +28,7 @@ endif() add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc - command_registry.cc cluster/unique_slot_checker.cc + command_registry.cc cluster/cluster_utility.cc journal/tx_executor.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 87d456a6f..b4732d253 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -660,26 +660,6 @@ static string_view StateToStr(MigrationState state) { return "UNDEFINED_STATE"sv; } -static uint64_t GetKeyCount(const SlotRanges& slots) { - atomic_uint64_t keys = 0; - - shard_set->pool()->AwaitFiberOnAll([&](auto*) { - EngineShard* shard = EngineShard::tlocal(); - if (shard == nullptr) - return; - - uint64_t shard_keys = 0; - for (const SlotRange& range : slots) { - for (SlotId slot = range.start; slot <= range.end; slot++) { - shard_keys += shard->db_slice().GetSlotStats(slot).key_count; - } - } - keys.fetch_add(shard_keys); - }); - - return keys.load(); -} - void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) { auto* rb = static_cast(cntx->reply_builder()); CmdArgParser parser(args); @@ -698,22 +678,21 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size()); auto append_answer = [rb, &reply](string_view direction, string_view node_id, string_view filter, - MigrationState state, const SlotRanges& slots, - string_view error) { + MigrationState state, size_t keys_number, string_view error) { if (filter.empty() || filter == node_id) { error = error.empty() ? "0" : error; reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state), - " keys:", GetKeyCount(slots), " errors: ", error)); + " keys:", keys_number, " errors: ", error)); } }; for (const auto& m : incoming_migrations_jobs_) { // TODO add error status - append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetSlots(), ""); + append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(), ""); } for (const auto& migration : outgoing_migration_jobs_) { append_answer("out", migration->GetMigrationInfo().node_id, node_id, migration->GetState(), - migration->GetSlots(), migration->GetErrorStr()); + migration->GetKeyCount(), migration->GetErrorStr()); } if (reply.empty()) { @@ -927,6 +906,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { if (!migration) return cntx->SendError(kIdNotFound); + // TODO add timeout for join because it can fail migration->Join(); VLOG(1) << "Migration is joined for " << source_id; diff --git a/src/server/cluster/unique_slot_checker.cc b/src/server/cluster/cluster_utility.cc similarity index 52% rename from src/server/cluster/unique_slot_checker.cc rename to src/server/cluster/cluster_utility.cc index af15bbbe1..c624425b7 100644 --- a/src/server/cluster/unique_slot_checker.cc +++ b/src/server/cluster/cluster_utility.cc @@ -1,6 +1,7 @@ -#include "server/cluster/unique_slot_checker.h" +#include "server/cluster/cluster_utility.h" #include "server/cluster/cluster_defs.h" +#include "server/engine_shard_set.h" using namespace std; @@ -37,4 +38,24 @@ optional UniqueSlotChecker::GetUniqueSlotId() const { return slot_id_; } +uint64_t GetKeyCount(const SlotRanges& slots) { + std::atomic_uint64_t keys = 0; + + shard_set->pool()->AwaitFiberOnAll([&](auto*) { + EngineShard* shard = EngineShard::tlocal(); + if (shard == nullptr) + return; + + uint64_t shard_keys = 0; + for (const SlotRange& range : slots) { + for (SlotId slot = range.start; slot <= range.end; slot++) { + shard_keys += shard->db_slice().GetSlotStats(slot).key_count; + } + } + keys.fetch_add(shard_keys); + }); + + return keys.load(); +} + } // namespace dfly::cluster diff --git a/src/server/cluster/unique_slot_checker.h b/src/server/cluster/cluster_utility.h similarity index 92% rename from src/server/cluster/unique_slot_checker.h rename to src/server/cluster/cluster_utility.h index 7f2bb63cc..31ef71d8c 100644 --- a/src/server/cluster/unique_slot_checker.h +++ b/src/server/cluster/cluster_utility.h @@ -24,4 +24,6 @@ class UniqueSlotChecker { std::optional slot_id_; }; +uint64_t GetKeyCount(const SlotRanges& slots); + } // namespace dfly::cluster diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index c4b959b6d..d4ea48a9a 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -6,6 +6,7 @@ #include "absl/cleanup/cleanup.h" #include "base/logging.h" +#include "cluster_utility.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" @@ -50,7 +51,7 @@ class ClusterShardMigration { while (tx_data->opcode == journal::Op::FIN) { VLOG(2) << "Attempt to finalize flow " << source_shard_id_; bc->Dec(); // we can Join the flow now - // if we get new data attempt is failed + // if we get new data, attempt is failed if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) { VLOG(1) << "Finalized flow " << source_shard_id_; return; @@ -120,8 +121,10 @@ IncomingSlotMigration::~IncomingSlotMigration() { } void IncomingSlotMigration::Join() { + // TODO add timeout bc_->Wait(); - state_ = MigrationState::C_FINISHED; + state_.store(MigrationState::C_FINISHED); + keys_number_ = cluster::GetKeyCount(slots_); } void IncomingSlotMigration::Cancel() { @@ -141,4 +144,11 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou shard_flows_[shard]->Start(&cntx_, source, bc_); } +size_t IncomingSlotMigration::GetKeyCount() const { + if (state_.load() == MigrationState::C_FINISHED) { + return keys_number_; + } + return cluster::GetKeyCount(slots_); +} + } // namespace dfly::cluster diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 470bf4d53..ab27c94e6 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -47,6 +47,8 @@ class IncomingSlotMigration { return source_id_; } + size_t GetKeyCount() const; + private: std::string source_id_; Service& service_; @@ -54,6 +56,9 @@ class IncomingSlotMigration { SlotRanges slots_; std::atomic state_ = MigrationState::C_NO_STATE; Context cntx_; + // when migration is finished we need to store number of migrated keys + // because new request can add or remove keys and we get incorrect statistic + size_t keys_number_ = 0; util::fb2::BlockingCounter bc_; }; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 14efc8de2..89d5fe483 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -11,6 +11,7 @@ #include "absl/cleanup/cleanup.h" #include "base/logging.h" #include "cluster_family.h" +#include "cluster_utility.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -270,6 +271,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { auto is_error = CheckFlowsForErrors(); Finish(is_error); if (!is_error) { + keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->UpdateConfig(migration_info_.slot_ranges, false); VLOG(1) << "Config is updated for " << cf_->MyID(); } @@ -295,4 +297,11 @@ bool OutgoingMigration::CheckFlowsForErrors() { } return false; } + +size_t OutgoingMigration::GetKeyCount() const { + if (state_ == MigrationState::C_FINISHED) { + return keys_number_; + } + return cluster::GetKeyCount(migration_info_.slot_ranges); +} } // namespace dfly::cluster diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index aa2a1b0d3..7b1462abe 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -53,6 +53,8 @@ class OutgoingMigration : private ProtocolClient { return last_error_.Format(); } + size_t GetKeyCount() const; + static constexpr long kInvalidAttempt = -1; static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; @@ -84,6 +86,10 @@ class OutgoingMigration : private ProtocolClient { mutable util::fb2::Mutex state_mu_; MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE; + + // when migration is finished we need to store number of migrated keys + // because new request can add or remove keys and we get incorrect statistic + size_t keys_number_ = 0; }; } // namespace dfly::cluster diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0dfda23a9..1e85c631e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -35,7 +35,7 @@ extern "C" { #include "server/bitops_family.h" #include "server/bloom_family.h" #include "server/cluster/cluster_family.h" -#include "server/cluster/unique_slot_checker.h" +#include "server/cluster/cluster_utility.h" #include "server/conn_context.h" #include "server/error.h" #include "server/generic_family.h" diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index c69060208..e06ec81fa 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -8,7 +8,7 @@ #include "base/logging.h" #include "facade/dragonfly_connection.h" -#include "server/cluster/unique_slot_checker.h" +#include "server/cluster/cluster_utility.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" diff --git a/src/server/transaction.h b/src/server/transaction.h index 07ffc6260..20347e28c 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -18,7 +18,7 @@ #include "core/intent_lock.h" #include "core/tx_queue.h" #include "facade/op_status.h" -#include "server/cluster/unique_slot_checker.h" +#include "server/cluster/cluster_utility.h" #include "server/common.h" #include "server/journal/types.h" #include "server/table.h" diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index cd25d9314..51f3b35d9 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1067,12 +1067,17 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): c_nodes_admin, ) - await asyncio.sleep(0.5) + await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED") - while "FINISHED" not in await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0] - ): - await asyncio.sleep(0.05) + assert await c_nodes[1].set("KEY20", "value") + assert await c_nodes[1].set("KEY21", "value") + + assert ( + await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[1]) + ).startswith(f"""out {node_ids[1]} FINISHED keys:7""") + assert ( + await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]) + ).startswith(f"""in {node_ids[0]} FINISHED keys:7""") await push_config( config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"), @@ -1099,7 +1104,9 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): assert await c_nodes[1].get("KEY17") == "value" assert await c_nodes[1].get("KEY18") == "value" assert await c_nodes[1].get("KEY19") == "value" - assert await c_nodes[1].execute_command("DBSIZE") == 17 + assert await c_nodes[1].get("KEY20") == "value" + assert await c_nodes[1].get("KEY21") == "value" + assert await c_nodes[1].execute_command("DBSIZE") == 19 assert ( await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"