From 22756eeb8192485a3b3086fd3ae344eb5106a4aa Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 16 Jul 2024 13:06:34 +0200 Subject: [PATCH] fix(migration): Use transactions! (#3266) Signed-off-by: Vladislav Oleshko --- src/server/cluster/outgoing_slot_migration.cc | 100 +++++++++++------- src/server/cluster/outgoing_slot_migration.h | 7 ++ src/server/db_slice.cc | 26 +---- src/server/db_slice.h | 14 --- src/server/detail/save_stages_controller.cc | 8 -- src/server/dflycmd.cc | 30 +----- src/server/journal/streamer.cc | 4 + src/server/journal/streamer.h | 3 + src/server/transaction.cc | 18 ++-- src/server/transaction.h | 8 ++ 10 files changed, 101 insertions(+), 117 deletions(-) diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 9014a90ac..b2b7cc5a6 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -16,6 +16,7 @@ #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/streamer.h" +#include "server/main_service.h" #include "server/server_family.h" ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations"); @@ -33,7 +34,10 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) { } - void Sync(const std::string& node_id, uint32_t shard_id) { + // Send DFLYMIGRATE FLOW + void PrepareFlow(const std::string& node_id) { + uint32_t shard_id = EngineShard::tlocal()->shard_id(); + VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { @@ -57,10 +61,18 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { cntx_.ReportError("Incorrect response for FLOW cmd"); return; } + } + // Register db_slice and journal change listeners + void PrepareSync() { streamer_.Start(Sock()); } + // Run restore streamer + void RunSync() { + streamer_.Run(); + } + void Cancel() { streamer_.Cancel(); } @@ -82,18 +94,17 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv migration_info_(std::move(info)), slot_migrations_(shard_set->size()), server_family_(sf), - cf_(cf) { + cf_(cf), + tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) { + tx_->InitByArgs(0, {}); } OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); - // Destroy each flow in its dedicated thread, because we could be the last owner of the db tables - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) { - slot_migrations_[shard->shard_id()].reset(); - } - }); + // Destroy each flow in its dedicated thread, because we could be the last + // owner of the db tables + OnAllShards([](auto& migration) { migration.reset(); }); } bool OutgoingMigration::ChangeState(MigrationState new_state) { @@ -106,6 +117,15 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) { return true; } +void OutgoingMigration::OnAllShards( + std::function&)> func) { + shard_set->pool()->AwaitFiberOnAll([this, &func](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) { + func(slot_migrations_[shard->shard_id()]); + } + }); +} + void OutgoingMigration::Finish(bool is_error) { VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : " << migration_info_.node_info.id; @@ -132,12 +152,9 @@ void OutgoingMigration::Finish(bool is_error) { } if (should_cancel_flows) { - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) { - auto& flow = slot_migrations_[shard->shard_id()]; - CHECK(flow != nullptr); - flow->Cancel(); - } + OnAllShards([](auto& migration) { + CHECK(migration != nullptr); + migration->Cancel(); }); } } @@ -161,8 +178,7 @@ void OutgoingMigration::SyncFb() { if (last_error_) { LOG(ERROR) << last_error_.Format(); - // if error is happened on the previous attempt we wait for some time and try again - ThisFiber::SleepFor(1000ms); + ThisFiber::SleepFor(1000ms); // wait some time before next retry } VLOG(2) << "Connecting to source"; @@ -195,27 +211,34 @@ void OutgoingMigration::SyncFb() { continue; } - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (auto* shard = EngineShard::tlocal(); shard) { - server_family_->journal()->StartInThread(); - slot_migrations_[shard->shard_id()] = std::make_unique( - &shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal()); - } + OnAllShards([this](auto& migration) { + auto* shard = EngineShard::tlocal(); + server_family_->journal()->StartInThread(); + migration = std::make_unique( + &shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal()); }); if (!ChangeState(MigrationState::C_SYNC)) { break; } - shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { - if (auto* shard = EngineShard::tlocal(); shard) { - auto& migration = slot_migrations_[shard->shard_id()]; - CHECK(migration != nullptr); - migration->Sync(cf_->MyID(), shard->shard_id()); - if (migration->GetError()) { - Finish(true); - } - } + OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); + if (CheckFlowsForErrors()) { + LOG(WARNING) << "Preparation error detected, retrying outgoing migration"; + continue; + } + + // Global transactional cut for migration to register db_slice and journal + // listeners + { + Transaction::Guard tg{tx_.get()}; + OnAllShards([](auto& migration) { migration->PrepareSync(); }); + } + + OnAllShards([this](auto& migration) { + migration->RunSync(); + if (migration->GetError()) + Finish(true); }); if (CheckFlowsForErrors()) { @@ -240,8 +263,8 @@ void OutgoingMigration::SyncFb() { } bool OutgoingMigration::FinalizeMigration(long attempt) { - // if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more - // time + // if it's not the 1st attempt and flows are work correctly we try to + // reconnect and ACK one more time VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id; if (attempt > 1) { if (CheckFlowsForErrors()) { @@ -255,6 +278,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } } + + // Migration finalization has to be done via client pause because commands need to + // be blocked on coordinator level to avoid intializing transactions with stale cluster slot info // TODO implement blocking on migrated slots only bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; @@ -270,14 +296,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { pause_fb_opt->JoinIfNeeded(); }); - auto cb = [this, attempt](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) { - slot_migrations_[shard->shard_id()]->Finalize(attempt); - } - }; - VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id; - shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + OnAllShards([attempt](auto& migration) { migration->Finalize(attempt); }); auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt); VLOG(1) << "send " << cmd; diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 749ef4ebf..83dddd557 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -3,9 +3,12 @@ // #pragma once +#include + #include "io/io.h" #include "server/cluster/cluster_defs.h" #include "server/protocol_client.h" +#include "server/transaction.h" namespace dfly { class DbSlice; @@ -75,6 +78,8 @@ class OutgoingMigration : private ProtocolClient { bool ChangeState(MigrationState new_state) ABSL_LOCKS_EXCLUDED(state_mu_); + void OnAllShards(std::function&)>); + private: MigrationInfo migration_info_; std::vector> slot_migrations_; @@ -87,6 +92,8 @@ class OutgoingMigration : private ProtocolClient { mutable util::fb2::Mutex state_mu_; MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE; + boost::intrusive_ptr tx_; + // when migration is finished we need to store number of migrated keys // because new request can add or remove keys and we get incorrect statistic size_t keys_number_ = 0; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cd67ffad7..8858a7f2f 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1089,19 +1089,7 @@ void DbSlice::ExpireAllIfNeeded() { } uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { - // TODO rewrite this logic to be more clear - // this mutex lock is needed to check that this method is not called simultaneously with - // change_cb_ calls and journal_slice::change_cb_arr_ calls. - // It can be unlocked anytime because DbSlice::RegisterOnChange - // and journal_slice::RegisterOnChange calls without preemption - std::lock_guard lk(cb_mu_); - - uint64_t ver = NextVersion(); - change_cb_.emplace_back(ver, std::move(cb)); - DCHECK(std::is_sorted(change_cb_.begin(), change_cb_.end(), - [](auto& a, auto& b) { return a.first < b.first; })); - - return ver; + return change_cb_.emplace_back(NextVersion(), std::move(cb)).first; } void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { @@ -1125,14 +1113,10 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { - lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it - for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) { - if (it->first == id) { - change_cb_.erase(it); - return; - } - } - LOG(DFATAL) << "Could not find " << id << " to unregister"; + auto it = find_if(change_cb_.begin(), change_cb_.end(), + [id](const auto& cb) { return cb.first == id; }); + CHECK(it != change_cb_.end()); + change_cb_.erase(it); } auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index caf96eff3..2c032f3ea 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -469,14 +469,6 @@ class DbSlice { void PerformDeletion(Iterator del_it, DbTable* table); void PerformDeletion(PrimeIterator del_it, DbTable* table); - void LockChangeCb() const { - return cb_mu_.lock_shared(); - } - - void UnlockChangeCb() const { - return cb_mu_.unlock_shared(); - } - private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -552,12 +544,6 @@ class DbSlice { // Used in temporary computations in Acquire/Release. mutable absl::flat_hash_set uniq_fps_; - // To ensure correct data replication, we must serialize the buckets that each running command - // will modify, followed by serializing the command to the journal. We use a mutex to prevent - // interleaving between bucket and journal registrations, and the command execution with its - // journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after - // journaling is completed. Register to bucket and journal changes is also does without preemption - mutable util::fb2::SharedMutex cb_mu_; // ordered from the smallest to largest version. std::vector> change_cb_; diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 2d748144c..fc823074b 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -254,11 +254,7 @@ void SaveStagesController::SaveDfs() { // Save shard files. auto cb = [this](Transaction* t, EngineShard* shard) { - auto& db_slice = shard->db_slice(); - // a hack to avoid deadlock in Transaction::RunCallback(...) - db_slice.UnlockChangeCb(); SaveDfsSingle(shard); - db_slice.LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); @@ -298,11 +294,7 @@ void SaveStagesController::SaveRdb() { } auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) { - // a hack to avoid deadlock in Transaction::RunCallback(...) - auto& db_slice = shard->db_slice(); - db_slice.UnlockChangeCb(); snapshot->StartInShard(shard); - db_slice.LockChangeCb(); return OpStatus::OK; }; trans_->ScheduleSingleHop(std::move(cb)); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0d4d4fc0f..d611d030d 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -71,32 +71,6 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) { return "unsupported"; } -struct TransactionGuard { - static OpStatus ExitGuardCb(Transaction* t, EngineShard* shard) { - t->GetDbSlice(shard->shard_id()).SetExpireAllowed(true); - return OpStatus::OK; - }; - - explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) { - t->Execute( - [disable_expirations](Transaction* t, EngineShard* shard) { - if (disable_expirations) { - t->GetDbSlice(shard->shard_id()).SetExpireAllowed(!disable_expirations); - } - return OpStatus::OK; - }, - false); - VLOG(2) << "Transaction guard engaged"; - } - - ~TransactionGuard() { - VLOG(2) << "Releasing transaction guard"; - t->Execute(ExitGuardCb, true); - } - - Transaction* t; -}; - OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr replica, EngineShard* shard) { // We don't want any writes to the journal after we send the `PING`, @@ -299,7 +273,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { // Start full sync. { - TransactionGuard tg{cntx->transaction}; + Transaction::Guard tg{cntx->transaction}; AggregateStatus status; // Use explicit assignment for replica_ptr, because capturing structured bindings is C++20. @@ -337,7 +311,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { return; { - TransactionGuard tg{cntx->transaction}; + Transaction::Guard tg{cntx->transaction}; AggregateStatus status; auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 4812c6f80..824a37ba1 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -201,6 +201,10 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); JournalStreamer::Start(dest, send_lsn); +} + +void RestoreStreamer::Run() { + VLOG(1) << "RestoreStreamer run"; PrimeTable::Cursor cursor; uint64_t last_yield = 0; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 6773dc2af..2837fae83 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -77,6 +77,9 @@ class RestoreStreamer : public JournalStreamer { ~RestoreStreamer() override; void Start(util::FiberSocketBase* dest, bool send_lsn = false) override; + + void Run(); + // Cancel() must be called if Start() is called void Cancel() override; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 64fa7b48d..27e222d75 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -127,6 +127,16 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { return cv_status::no_timeout; } +Transaction::Guard::Guard(Transaction* tx) : tx(tx) { + DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS); + tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false); +} + +Transaction::Guard::~Guard() { + tx->Conclude(); + tx->Refurbish(); +} + Transaction::Transaction(const CommandId* cid) : cid_{cid} { InitTxTime(); string_view cmd_name(cid_->name()); @@ -620,7 +630,6 @@ void Transaction::RunCallback(EngineShard* shard) { RunnableResult result; auto& db_slice = GetDbSlice(shard->shard_id()); - db_slice.LockChangeCb(); try { result = (*cb_ptr_)(this, shard); @@ -658,10 +667,7 @@ void Transaction::RunCallback(EngineShard* shard) { // Log to journal only once the command finished running if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) { LogAutoJournalOnShard(shard, result); - db_slice.UnlockChangeCb(); MaybeInvokeTrackingCb(); - } else { - db_slice.UnlockChangeCb(); } } @@ -1223,11 +1229,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { auto* shard = EngineShard::tlocal(); auto& db_slice = GetDbSlice(shard->shard_id()); - db_slice.LockChangeCb(); + auto result = cb(this, shard); db_slice.OnCbFinish(); + LogAutoJournalOnShard(shard, result); - db_slice.UnlockChangeCb(); MaybeInvokeTrackingCb(); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it diff --git a/src/server/transaction.h b/src/server/transaction.h index b4933d690..4c5b8e9d7 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -170,6 +170,14 @@ class Transaction { RAN_IMMEDIATELY = 1 << 7, // Whether the shard executed immediately (during schedule) }; + struct Guard { + explicit Guard(Transaction* tx); + ~Guard(); + + private: + Transaction* tx; + }; + explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx