From 5189dae118f29594b29ea2d95e94919a20bce963 Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 1 Feb 2024 17:24:54 +0200 Subject: [PATCH] feat(cluster): add migration finalization (#2507) * feat(cluster): add migration finalization --- src/server/cluster/cluster_config.h | 8 ++- src/server/cluster/cluster_family.cc | 62 +++++++++++++++++-- src/server/cluster/cluster_family.h | 6 +- src/server/cluster/cluster_shard_migration.cc | 10 ++- src/server/cluster/cluster_shard_migration.h | 8 ++- src/server/cluster/cluster_slot_migration.cc | 16 +++++ src/server/cluster/cluster_slot_migration.h | 4 ++ src/server/cluster/outgoing_slot_migration.cc | 18 +++++- src/server/cluster/outgoing_slot_migration.h | 6 +- src/server/journal/serializer.cc | 2 +- src/server/journal/streamer.cc | 9 +++ src/server/journal/streamer.h | 2 + src/server/journal/tx_executor.cc | 6 +- src/server/journal/tx_executor.h | 2 +- src/server/journal/types.h | 1 + src/server/replica.cc | 2 +- src/server/transaction.cc | 4 +- tests/dragonfly/cluster_test.py | 33 +++++++--- 18 files changed, 174 insertions(+), 25 deletions(-) diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 1a501a209..cfd8fd15d 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -23,7 +23,13 @@ using SlotId = uint16_t; using SlotSet = absl::flat_hash_set; // MigrationState constants are ordered in state changing order -enum class MigrationState : uint8_t { C_NO_STATE, C_CONNECTING, C_FULL_SYNC, C_STABLE_SYNC }; +enum class MigrationState : uint8_t { + C_NO_STATE, + C_CONNECTING, + C_FULL_SYNC, + C_STABLE_SYNC, + C_FINISHED +}; class ClusterConfig { public: diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e7d770aec..1c84974ff 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -403,7 +403,9 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { } else if (sub_cmd == "START-SLOT-MIGRATION") { return DflyClusterStartSlotMigration(args, cntx); } else if (sub_cmd == "SLOT-MIGRATION-STATUS") { - return DflySlotMigrationStatus(args, cntx); + return DflyClusterSlotMigrationStatus(args, cntx); + } else if (sub_cmd == "SLOT-MIGRATION-FINALIZE") { + return DflyClusterMigrationFinalize(args, cntx); } return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); @@ -531,6 +533,9 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) tracker.TrackOnThread(); }; + // TODO think about another place for it + RemoveFinishedIncomingMigrations(); + server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); DCHECK(tl_cluster_config != nullptr); @@ -629,7 +634,7 @@ void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionCon } node->Start(cntx); - return cntx->SendOk(); + return cntx->SendLong(node->GetSyncId()); } static std::string_view state_to_str(MigrationState state) { @@ -642,12 +647,14 @@ static std::string_view state_to_str(MigrationState state) { return "FULL_SYNC"sv; case MigrationState::C_STABLE_SYNC: return "STABLE_SYNC"sv; + case MigrationState::C_FINISHED: + return "FINISHED"sv; } DCHECK(false) << "Unknown State value " << static_cast>(state); return "UNDEFINED_STATE"sv; } -void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) { +void ClusterFamily::DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser(args); auto* rb = static_cast(cntx->reply_builder()); @@ -689,6 +696,40 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* return rb->SendSimpleString(state_to_str(MigrationState::C_NO_STATE)); } +void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx) { + CmdArgParser parser{args}; + auto sync_id = parser.Next(); + + if (auto err = parser.Error(); err) { + return cntx->SendError(err->MakeReply()); + } + + auto migration = GetOutgoingMigration(sync_id); + if (!migration) + return cntx->SendError(kIdNotFound); + + if (migration->GetState() != MigrationState::C_STABLE_SYNC) { + return cntx->SendError("Migration process is not in STABLE_SYNC state"); + } + + shard_set->pool()->AwaitFiberOnAll([migration](auto*) { + if (const auto* shard = EngineShard::tlocal(); shard) + migration->Finalize(shard->shard_id()); + }); + + // TODO do next after ACK + util::ThisFiber::SleepFor(500ms); + + shard_set->pool()->AwaitFiberOnAll([migration](auto*) { + if (const auto* shard = EngineShard::tlocal(); shard) + migration->Cancel(shard->shard_id()); + }); + + RemoveOutgoingMigration(sync_id); + + return cntx->SendOk(); +} + void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); @@ -718,6 +759,19 @@ ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t .get(); } +void ClusterFamily::RemoveFinishedIncomingMigrations() { + lock_guard lk(migration_mu_); + auto removed_items_it = + std::remove_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(), + [](const auto& m) { return m->GetState() == MigrationState::C_FINISHED; }); + incoming_migrations_jobs_.erase(removed_items_it, incoming_migrations_jobs_.end()); +} + +void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) { + lock_guard lk(migration_mu_); + outgoing_migration_jobs_.erase(sync_id); +} + void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Create slot migration config"; CmdArgParser parser{args}; @@ -793,6 +847,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(kIdNotFound); cntx->conn()->Migrate(shard_set->pool()->at(shard_id)); + server_family_->journal()->StartInThread(); cntx->SendOk(); @@ -825,7 +880,6 @@ void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* c (*migration_it)->SetStableSyncForFlow(shard_id); if ((*migration_it)->GetState() == MigrationState::C_STABLE_SYNC) { - (*migration_it)->Stop(); LOG(INFO) << "STABLE-SYNC state is set for sync_id " << sync_id; } diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 615b932b4..687d56cb5 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -51,7 +51,8 @@ class ClusterFamily { private: // Slots migration section void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx); - void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx); + void DflyClusterSlotMigrationStatus(CmdArgList args, ConnectionContext* cntx); + void DflyClusterMigrationFinalize(CmdArgList args, ConnectionContext* cntx); // DFLYMIGRATE is internal command defines several steps in slots migrations process void DflyMigrate(CmdArgList args, ConnectionContext* cntx); @@ -74,6 +75,9 @@ class ClusterFamily { ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, std::vector slots); + void RemoveFinishedIncomingMigrations(); + void RemoveOutgoingMigration(uint32_t sync_id); + // store info about migration and create unique session id uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, std::vector slots); diff --git a/src/server/cluster/cluster_shard_migration.cc b/src/server/cluster/cluster_shard_migration.cc index 90ec26fb6..55cec97d4 100644 --- a/src/server/cluster/cluster_shard_migration.cc +++ b/src/server/cluster/cluster_shard_migration.cc @@ -74,10 +74,14 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) { TouchIoTime(); - if (!tx_data->is_ping) { - ExecuteTxWithNoShardSync(std::move(*tx_data), cntx); - } else { + if (tx_data->opcode == journal::Op::FIN) { + VLOG(2) << "Flow " << source_shard_id_ << " is finalized"; + is_finalized_ = true; + break; + } else if (tx_data->opcode == journal::Op::PING) { // TODO check about ping logic + } else { + ExecuteTxWithNoShardSync(std::move(*tx_data), cntx); } } } diff --git a/src/server/cluster/cluster_shard_migration.h b/src/server/cluster/cluster_shard_migration.h index 6f01e448d..f2e94ac0c 100644 --- a/src/server/cluster/cluster_shard_migration.h +++ b/src/server/cluster/cluster_shard_migration.h @@ -31,9 +31,14 @@ class ClusterShardMigration : public ProtocolClient { return is_stable_sync_.load(); } + bool IsFinalized() { + return is_finalized_; + } + + void JoinFlow(); + private: void FullSyncShardFb(Context* cntx); - void JoinFlow(); void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); @@ -45,6 +50,7 @@ class ClusterShardMigration : public ProtocolClient { std::unique_ptr executor_; Fiber sync_fb_; std::atomic_bool is_stable_sync_ = false; + bool is_finalized_ = false; }; } // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index 6ed08eac6..497794966 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -113,6 +113,11 @@ void ClusterSlotMigration::SetStableSyncForFlow(uint32_t flow) { } } +bool ClusterSlotMigration::IsFinalized() const { + return std::all_of(shard_flows_.begin(), shard_flows_.end(), + [](const auto& el) { return el->IsFinalized(); }); +} + void ClusterSlotMigration::Stop() { for (auto& flow : shard_flows_) { flow->Cancel(); @@ -129,6 +134,10 @@ void ClusterSlotMigration::MainMigrationFb() { LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " " << ec.message(); } + + if (IsFinalized()) { + state_ = MigrationState::C_FINISHED; + } } std::error_code ClusterSlotMigration::InitiateSlotsMigration() { @@ -137,6 +146,13 @@ std::error_code ClusterSlotMigration::InitiateSlotsMigration() { shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_)); } + absl::Cleanup cleanup = [this]() { + // We do the following operations regardless of outcome. + for (auto& flow : shard_flows_) { + flow->JoinFlow(); + } + }; + // Switch to new error handler that closes flow sockets. auto err_handler = [this](const auto& ge) mutable { // Make sure the flows are not in a state transition diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index a6605963b..6065df6fb 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -36,6 +36,7 @@ class ClusterSlotMigration : ProtocolClient { } void SetStableSyncForFlow(uint32_t flow); + void Stop(); private: @@ -45,6 +46,9 @@ class ClusterSlotMigration : ProtocolClient { // Creates flows, one per shard on the source node and manage migration process std::error_code InitiateSlotsMigration(); + // may be called after we finish all flows + bool IsFinalized() const; + private: Service& service_; Mutex flows_op_mu_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 799abd5b7..a88125e86 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -18,10 +18,14 @@ class OutgoingMigration::SliceSlotMigration { state_ = MigrationState::C_FULL_SYNC; } - ~SliceSlotMigration() { + void Cancel() { streamer_.Cancel(); } + void Finalize() { + streamer_.SendFinalize(); + } + MigrationState GetState() const { return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished() ? MigrationState::C_STABLE_SYNC @@ -56,8 +60,20 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou std::make_unique(slice, std::move(sset), sync_id, journal, &cntx_, dest); } +void OutgoingMigration::Finalize(uint32_t shard_id) { + slot_migrations_[shard_id]->Finalize(); +} + +void OutgoingMigration::Cancel(uint32_t shard_id) { + slot_migrations_[shard_id]->Cancel(); +} + MigrationState OutgoingMigration::GetState() const { std::lock_guard lck(flows_mu_); + return GetStateImpl(); +} + +MigrationState OutgoingMigration::GetStateImpl() const { MigrationState min_state = MigrationState::C_STABLE_SYNC; for (const auto& slot_migration : slot_migrations_) { if (slot_migration) diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index ca812682c..770238bad 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -15,7 +15,7 @@ class Journal; class DbSlice; -// Whole slots migration process information +// Whole outgoing slots migration manager class OutgoingMigration { public: OutgoingMigration() = default; @@ -25,6 +25,9 @@ class OutgoingMigration { void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest); + void Finalize(uint32_t shard_id); + void Cancel(uint32_t shard_id); + MigrationState GetState() const; const std::string& GetHostIp() const { @@ -35,6 +38,7 @@ class OutgoingMigration { }; private: + MigrationState GetStateImpl() const; // SliceSlotMigration manages state and data transfering for the corresponding shard class SliceSlotMigration; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 658e8a7e5..9130de7ba 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -195,7 +195,7 @@ io::Result JournalReader::ReadEntry() { entry.dbid = dbid_; entry.opcode = opcode; - if (opcode == journal::Op::PING) { + if (opcode == journal::Op::PING || opcode == journal::Op::FIN) { return entry; } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index d2ae854f8..e9b2c4ab2 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -92,6 +92,15 @@ void RestoreStreamer::Start(io::Sink* dest) { }); } +void RestoreStreamer::SendFinalize() { + VLOG(2) << "DFLYMIGRATE FINALIZE for " << sync_id_ << " : " << db_slice_->shard_id(); + journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/); + + JournalWriter writer{this}; + writer.Write(entry); + NotifyWritten(true); +} + void RestoreStreamer::Cancel() { fiber_cancellation_.Cancel(); snapshot_fb_.JoinIfNeeded(); diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index dcf5a88a5..667620faf 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -60,6 +60,8 @@ class RestoreStreamer : public JournalStreamer { // Cancel() must be called if Start() is called void Cancel() override; + void SendFinalize(); + bool IsSnapshotFinished() const { return snapshot_finished_; } diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index c9edd53a6..58f92e1ed 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -49,10 +49,12 @@ void MultiShardExecution::CancelAllBlockingEntities() { bool TransactionData::AddEntry(journal::ParsedEntry&& entry) { ++journal_rec_count; + opcode = entry.opcode; switch (entry.opcode) { case journal::Op::PING: - is_ping = true; + return true; + case journal::Op::FIN: return true; case journal::Op::EXPIRED: case journal::Op::COMMAND: @@ -112,7 +114,7 @@ std::optional TransactionReader::NextTxData(JournalReader* read // Check if journal command can be executed right away. // Expiration checks lock on master, so it never conflicts with running multi transactions. if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND || - res->opcode == journal::Op::PING) + res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN) return TransactionData::FromSingle(std::move(res.value())); // Otherwise, continue building multi command. diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index 15ba2398f..9d0e6d2ea 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -51,7 +51,7 @@ struct TransactionData { uint32_t shard_cnt{0}; absl::InlinedVector commands{0}; uint32_t journal_rec_count{0}; // Count number of source entries to check offset. - bool is_ping = false; // For Op::PING entries. + journal::Op opcode = journal::Op::NOOP; }; // Utility for reading TransactionData from a journal reader. diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 0a21ddc99..82677a62c 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -22,6 +22,7 @@ enum class Op : uint8_t { MULTI_COMMAND = 11, EXEC = 12, PING = 13, + FIN = 14 }; struct EntryBase { diff --git a/src/server/replica.cc b/src/server/replica.cc index 2bf547392..545f71151 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -803,7 +803,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { last_io_time_ = Proactor()->GetMonotonicTimeNs(); - if (!tx_data->is_ping) { + if (tx_data->opcode != journal::Op::PING) { if (use_multi_shard_exe_sync_) { InsertTxDataToShardResource(std::move(*tx_data)); } else { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 874970ef3..a56ba42b7 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -307,8 +307,10 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { unique_shard_cnt_ = 1; if (is_stub) // stub transactions don't migrate DCHECK_EQ(unique_shard_id_, Shard(kv_args_.front(), shard_set->size())); - else + else { + unique_slot_checker_.Add(kv_args_.front()); unique_shard_id_ = Shard(kv_args_.front(), shard_set->size()); + } // Multi transactions that execute commands on their own (not stubs) can't shrink the backing // array, as it still might be read by leftover callbacks. diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 53fc37bc9..5bc4dfe98 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -787,7 +787,7 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): res = await c_nodes_admin[1].execute_command( "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259" ) - assert "OK" == res + assert 1 == res while ( await c_nodes_admin[1].execute_command( @@ -862,12 +862,9 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): c_nodes_admin, ) - assert await c_nodes[0].set("KEY0", "value") - assert await c_nodes[0].set("KEY1", "value") assert await c_nodes[1].set("KEY2", "value") assert await c_nodes[1].set("KEY3", "value") - assert await c_nodes[0].set("KEY4", "value") - assert await c_nodes[0].set("KEY5", "value") + assert await c_nodes[1].set("KEY6", "value") assert await c_nodes[1].set("KEY7", "value") assert await c_nodes[0].set("KEY8", "value") @@ -882,12 +879,14 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): assert await c_nodes[0].set("KEY17", "value") assert await c_nodes[1].set("KEY18", "value") assert await c_nodes[1].set("KEY19", "value") - assert await c_nodes[0].execute_command("DBSIZE") == 10 res = await c_nodes_admin[1].execute_command( "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000" ) - assert "OK" == res + assert 1 == res + + assert await c_nodes[0].set("KEY0", "value") + assert await c_nodes[0].set("KEY1", "value") while ( await c_nodes_admin[1].execute_command( @@ -897,6 +896,26 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): ): await asyncio.sleep(0.05) + assert await c_nodes[0].set("KEY4", "value") + assert await c_nodes[0].set("KEY5", "value") + assert await c_nodes[0].execute_command("DBSIZE") == 10 + + # TODO remove when we add slot blocking + await asyncio.sleep(0.5) + + res = await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", "1") + assert "OK" == res + + await asyncio.sleep(0.5) + + while ( + await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) + ) + != "FINISHED" + ): + await asyncio.sleep(0.05) + await push_config( config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"), c_nodes_admin,