diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e7dbdc9df..59cca32dd 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -494,6 +494,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) lock_guard gu(set_config_mu); + lock_guard config_update_lk( + config_update_mu_); // to prevent simultaneous update config from outgoing migration // TODO we shouldn't provide cntx into StartSlotMigrations if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) { return cntx->SendError("Can't start the migration"); @@ -706,19 +708,17 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { } } -IncomingSlotMigration* ClusterFamily::CreateIncomingMigration(std::string source_id, - SlotRanges slots, - uint32_t shards_num) { +std::shared_ptr ClusterFamily::CreateIncomingMigration(std::string source_id, + SlotRanges slots, + uint32_t shards_num) { lock_guard lk(migration_mu_); for (const auto& mj : incoming_migrations_jobs_) { if (mj->GetSourceID() == source_id) { return nullptr; } } - return incoming_migrations_jobs_ - .emplace_back(make_shared( - std::move(source_id), &server_family_->service(), std::move(slots), shards_num)) - .get(); + return incoming_migrations_jobs_.emplace_back(make_shared( + std::move(source_id), &server_family_->service(), std::move(slots), shards_num)); } std::shared_ptr ClusterFamily::GetIncomingMigration( @@ -742,7 +742,7 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector& m OutgoingMigration& migration = *it->get(); LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(migration.GetSlots()) << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Cancel(); + migration.Finish(); outgoing_migration_jobs_.erase(it); } @@ -833,8 +833,10 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id)); auto migration = GetIncomingMigration(source_id); - if (!migration) + if (!migration) { + // TODO process error when migration is canceled return cntx->SendError(kIdNotFound); + } DCHECK(cntx->sync_dispatch); // we do this to be ignored by the dispatch tracker @@ -847,7 +849,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { } void ClusterFamily::UpdateConfig(const std::vector& slots, bool enable) { - lock_guard gu(set_config_mu); + lock_guard gu(config_update_mu_); auto new_config = tl_cluster_config->CloneWithChanges(slots, enable); @@ -870,6 +872,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { [source_id](const auto& m) { return m.node_id == source_id; }); if (m_it == in_migrations.end()) { LOG(WARNING) << "migration isn't in config"; + // TODO process error if migration was canceled return cntx->SendLong(OutgoingMigration::kInvalidAttempt); } diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 4d4ac3a5b..45052ba9a 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -73,8 +73,9 @@ class ClusterFamily { void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx); // create a IncomingSlotMigration entity which will execute migration - IncomingSlotMigration* CreateIncomingMigration(std::string source_id, SlotRanges slots, - uint32_t shards_num); + std::shared_ptr CreateIncomingMigration(std::string source_id, + SlotRanges slots, + uint32_t shards_num); std::shared_ptr GetIncomingMigration(std::string_view source_id); @@ -97,6 +98,8 @@ class ClusterFamily { private: ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const; + mutable util::fb2::Mutex config_update_mu_; + std::string id_; ServerFamily* server_family_ = nullptr; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 6a593d65c..9c97199fd 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -81,15 +81,12 @@ OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); } -void OutgoingMigration::Cancel() { - state_.store(MigrationState::C_CANCELLED); - - auto start_cb = [this](util::ProactorBase* pb) { - if (auto* shard = EngineShard::tlocal(); shard) { +void OutgoingMigration::Finish() { + shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) slot_migrations_[shard->shard_id()]->Cancel(); - } - }; - shard_set->pool()->AwaitFiberOnAll(std::move(start_cb)); + }); + state_.store(MigrationState::C_FINISHED); } MigrationState OutgoingMigration::GetState() const { @@ -108,8 +105,6 @@ void OutgoingMigration::SyncFb() { } }; - state_.store(MigrationState::C_SYNC); - shard_set->pool()->AwaitFiberOnAll(std::move(start_cb)); for (auto& migration : slot_migrations_) { @@ -121,13 +116,13 @@ void OutgoingMigration::SyncFb() { // TODO implement blocking on migrated slots only long attempt = 0; - while (state_.load() != MigrationState::C_CANCELLED && !FinishMigration(++attempt)) { + while (state_.load() != MigrationState::C_FINISHED && !FinalyzeMigration(++attempt)) { // process commands that were on pause and try again ThisFiber::SleepFor(500ms); } } -bool OutgoingMigration::FinishMigration(long attempt) { +bool OutgoingMigration::FinalyzeMigration(long attempt) { bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr, @@ -181,12 +176,8 @@ bool OutgoingMigration::FinishMigration(long attempt) { } } while (attempt_res != attempt); - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) - slot_migrations_[shard->shard_id()]->Cancel(); - }); + Finish(); - state_.store(MigrationState::C_FINISHED); cf_->UpdateConfig(migration_info_.slot_ranges, false); VLOG(1) << "Config is updated for " << cf_->MyID(); return true; diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 5d2af19b8..2d9eddfb2 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -28,10 +28,8 @@ class OutgoingMigration : private ProtocolClient { // start migration process, sends INIT command to the target node std::error_code Start(ConnectionContext* cntx); - // should be run for all shards - void StartFlow(journal::Journal* journal, io::Sink* dest); - - void Cancel(); + // mark migration as FINISHED and cancel migration if it's not finished yet + void Finish(); MigrationState GetState() const; @@ -54,18 +52,21 @@ class OutgoingMigration : private ProtocolClient { static constexpr long kInvalidAttempt = -1; private: + // should be run for all shards + void StartFlow(journal::Journal* journal, io::Sink* dest); + MigrationState GetStateImpl() const; // SliceSlotMigration manages state and data transfering for the corresponding shard class SliceSlotMigration; void SyncFb(); - bool FinishMigration(long attempt); + bool FinalyzeMigration(long attempt); private: MigrationInfo migration_info_; Context cntx_; - mutable util::fb2::Mutex flows_mu_; - std::vector> slot_migrations_ ABSL_GUARDED_BY(flows_mu_); + mutable util::fb2::Mutex finish_mu_; + std::vector> slot_migrations_; ServerFamily* server_family_; ClusterFamily* cf_; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b9250a2c5..dd925beb6 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -112,11 +112,12 @@ RestoreStreamer::~RestoreStreamer() { } void RestoreStreamer::Cancel() { - if (snapshot_version_ != 0) { + auto sver = snapshot_version_; + snapshot_version_ = 0; // to prevent double cancel in another fiber + if (sver != 0) { fiber_cancellation_.Cancel(); - db_slice_->UnregisterOnChange(snapshot_version_); + db_slice_->UnregisterOnChange(sver); JournalStreamer::Cancel(); - snapshot_version_ = 0; } }