From 3ec43afd309c0a9345760534663aec2e03bc96e3 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 1 Apr 2024 12:29:17 +0300 Subject: [PATCH] DFLYMIGRATE ACK refactoring (#2790) * refactor: #2743 send dflymigrate flow from source * refactor: DFLYMIGRATE ACK is sent from source node #2744 --- src/server/cluster/cluster_family.cc | 98 ++++++------- src/server/cluster/cluster_family.h | 8 +- src/server/cluster/cluster_shard_migration.cc | 56 ++------ src/server/cluster/cluster_shard_migration.h | 28 +--- src/server/cluster/cluster_slot_migration.cc | 132 +++--------------- src/server/cluster/cluster_slot_migration.h | 29 +--- src/server/cluster/outgoing_slot_migration.cc | 131 ++++++++++------- src/server/cluster/outgoing_slot_migration.h | 20 ++- src/server/journal/streamer.cc | 12 +- src/server/journal/streamer.h | 4 +- 10 files changed, 184 insertions(+), 334 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index de4d21f0a..2e8fd7710 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -692,7 +692,8 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { } ClusterSlotMigration* ClusterFamily::CreateIncomingMigration(std::string host_ip, uint16_t port, - SlotRanges slots) { + SlotRanges slots, + uint32_t shards_num) { lock_guard lk(migration_mu_); for (const auto& mj : incoming_migrations_jobs_) { if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) { @@ -700,11 +701,22 @@ ClusterSlotMigration* ClusterFamily::CreateIncomingMigration(std::string host_ip } } return incoming_migrations_jobs_ - .emplace_back(make_unique(this, std::string(host_ip), port, - &server_family_->service(), std::move(slots))) + .emplace_back(make_shared( + std::string(host_ip), port, &server_family_->service(), std::move(slots), shards_num)) .get(); } +std::shared_ptr ClusterFamily::GetIncomingMigration(std::string host_ip, + uint16_t port) { + lock_guard lk(migration_mu_); + for (const auto& mj : incoming_migrations_jobs_) { + if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) { + return mj; + } + } + return nullptr; +} + void ClusterFamily::RemoveFinishedMigrations() { lock_guard lk(migration_mu_); auto removed_items_it = @@ -714,6 +726,7 @@ void ClusterFamily::RemoveFinishedMigrations() { for (auto it = outgoing_migration_jobs_.begin(); it != outgoing_migration_jobs_.end();) { if (it->second->GetState() == MigrationState::C_FINISHED) { + VLOG(1) << "erase finished migration " << it->first; it = outgoing_migration_jobs_.erase(it); } else { ++it; @@ -724,7 +737,7 @@ void ClusterFamily::RemoveFinishedMigrations() { void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "Create incoming migration, args: " << args; CmdArgParser parser{args}; - auto [sync_id, port, flows_num] = parser.Next(); + auto [port, flows_num] = parser.Next(); SlotRanges slots; do { @@ -735,11 +748,9 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { if (auto err = parser.Error(); err) return cntx->SendError(err->MakeReply()); - auto* node = - CreateIncomingMigration(cntx->conn()->RemoteEndpointAddress(), port, std::move(slots)); - VLOG(1) << "Init migration " << cntx->conn()->RemoteEndpointAddress() << ":" << port; - node->Init(sync_id, flows_num); + + CreateIncomingMigration(cntx->conn()->RemoteEndpointAddress(), port, std::move(slots), flows_num); return cntx->SendOk(); } @@ -755,8 +766,8 @@ std::shared_ptr ClusterFamily::CreateOutgoingMigration(std::s // Todo add error processing, stop migration process // fb2::Fiber("stop_Migration", &ClusterFamily::StopMigration, this, sync_id).Detach(); }; - auto migration = make_shared(host, port, std::move(slots), sync_id, - err_handler, server_family_); + auto migration = make_shared(host, port, std::move(slots), this, err_handler, + server_family_); auto [it, inserted] = outgoing_migration_jobs_.emplace(sync_id, migration); CHECK(inserted); return migration; @@ -764,83 +775,62 @@ std::shared_ptr ClusterFamily::CreateOutgoingMigration(std::s void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser{args}; - auto [sync_id, shard_id] = parser.Next(); + auto [port, shard_id] = parser.Next(); if (auto err = parser.Error(); err) { return cntx->SendError(err->MakeReply()); } - VLOG(1) << "Create flow sync_id: " << sync_id << " shard_id: " << shard_id; + auto host_ip = cntx->conn()->RemoteEndpointAddress(); - cntx->conn()->SetName(absl::StrCat("migration_flow_", sync_id)); + VLOG(1) << "Create flow " << host_ip << ":" << port << " shard_id: " << shard_id; - auto migration = GetOutgoingMigration(sync_id); + cntx->conn()->SetName(absl::StrCat("migration_flow_", host_ip, ":", port)); + + auto migration = GetIncomingMigration(cntx->conn()->RemoteEndpointAddress(), port); if (!migration) return cntx->SendError(kIdNotFound); - cntx->conn()->Migrate(shard_set->pool()->at(shard_id)); - server_family_->journal()->StartInThread(); + DCHECK(cntx->sync_dispatch); + // we do this to be ignored by the dispatch tracker + // TODO provide a more clear approach + cntx->sync_dispatch = false; cntx->SendOk(); - EngineShard* shard = EngineShard::tlocal(); - DCHECK(shard->shard_id() == shard_id); - - migration->StartFlow(server_family_->journal(), cntx->conn()->socket()); + migration->StartFlow(shard_id, cntx->conn()->socket()); } -void ClusterFamily::FinalizeIncomingMigration(uint32_t local_sync_id) { - lock_guard lk(migration_mu_); - auto it = - find_if(incoming_migrations_jobs_.cbegin(), incoming_migrations_jobs_.cend(), - [local_sync_id](const auto& el) { return el->GetLocalSyncId() == local_sync_id; }); - DCHECK(it != incoming_migrations_jobs_.cend()); +void ClusterFamily::UpdateConfig(const std::vector& slots, bool enable) { + lock_guard gu(set_config_mu); - { - lock_guard gu(set_config_mu); + auto new_config = tl_cluster_config->CloneWithChanges(slots, enable); - auto new_config = tl_cluster_config->CloneWithChanges((*it)->GetSlots(), true); - - shard_set->pool()->AwaitFiberOnAll( - [&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; }); - DCHECK(tl_cluster_config != nullptr); - } + shard_set->pool()->AwaitFiberOnAll( + [&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; }); + DCHECK(tl_cluster_config != nullptr); } void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser{args}; - auto sync_id = parser.Next(); + auto port = parser.Next(); if (auto err = parser.Error(); err) { return cntx->SendError(err->MakeReply()); } - auto migration = GetOutgoingMigration(sync_id); + auto host_ip = cntx->conn()->RemoteEndpointAddress(); + auto migration = GetIncomingMigration(host_ip, port); if (!migration) return cntx->SendError(kIdNotFound); + migration->Join(); + if (migration->GetState() != MigrationState::C_FINISHED) { return cntx->SendError("Migration process is not in C_FINISHED state"); } - lock_guard lk(migration_mu_); - auto it = outgoing_migration_jobs_.find(sync_id); - DCHECK(it != outgoing_migration_jobs_.end()); - - { - lock_guard gu(set_config_mu); - - auto new_config = tl_cluster_config->CloneWithChanges(it->second->GetSlots(), false); - - shard_set->pool()->AwaitFiberOnAll( - [&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; }); - DCHECK(tl_cluster_config != nullptr); - } - - shard_set->pool()->AwaitFiberOnAll([&migration](auto*) mutable { - if (const auto* shard = EngineShard::tlocal(); shard) - migration->Cancel(shard->shard_id()); - }); + UpdateConfig(migration->GetSlots(), true); cntx->SendOk(); } diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 58c2b3032..ceb22f000 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -28,7 +28,7 @@ class ClusterFamily { // Returns a thread-local pointer. ClusterConfig* cluster_config(); - void FinalizeIncomingMigration(uint32_t local_sync_id); + void UpdateConfig(const std::vector& slots, bool enable); private: // Cluster commands compatible with Redis @@ -70,7 +70,9 @@ class ClusterFamily { // create a ClusterSlotMigration entity which will execute migration ClusterSlotMigration* CreateIncomingMigration(std::string host_ip, uint16_t port, - SlotRanges slots); + SlotRanges slots, uint32_t shards_num); + + std::shared_ptr GetIncomingMigration(std::string host_ip, uint16_t port); bool StartSlotMigrations(const std::vector& migrations, ConnectionContext* cntx); @@ -84,7 +86,7 @@ class ClusterFamily { mutable util::fb2::Mutex migration_mu_; // guard migrations operations // holds all incoming slots migrations that are currently in progress. - std::vector> incoming_migrations_jobs_ + std::vector> incoming_migrations_jobs_ ABSL_GUARDED_BY(migration_mu_); uint32_t next_sync_id_ ABSL_GUARDED_BY(migration_mu_) = 1; diff --git a/src/server/cluster/cluster_shard_migration.cc b/src/server/cluster/cluster_shard_migration.cc index 8f3f19e93..bcc15bf1f 100644 --- a/src/server/cluster/cluster_shard_migration.cc +++ b/src/server/cluster/cluster_shard_migration.cc @@ -21,48 +21,19 @@ using namespace facade; using namespace util; using absl::GetFlag; -ClusterShardMigration::ClusterShardMigration(ServerContext server_context, uint32_t local_sync_id, - uint32_t shard_id, uint32_t sync_id, Service* service) - : ProtocolClient(server_context), source_shard_id_(shard_id), sync_id_(sync_id) { +ClusterShardMigration::ClusterShardMigration(uint32_t local_sync_id, uint32_t shard_id, + Service* service) + : source_shard_id_(shard_id) { executor_ = std::make_unique(service); + // Check why do we need this executor_->connection_context()->slot_migration_id = local_sync_id; } ClusterShardMigration::~ClusterShardMigration() { - JoinFlow(); } -std::error_code ClusterShardMigration::StartSyncFlow(Context* cntx) { - RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_)); - - leftover_buf_.emplace(128); - ResetParser(/*server_mode=*/false); - - std::string cmd = absl::StrCat("DFLYMIGRATE FLOW ", sync_id_, " ", source_shard_id_); - VLOG(1) << "cmd: " << cmd; - - RETURN_ON_ERR(SendCommand(cmd)); - - auto read_resp = ReadRespReply(&*leftover_buf_); - if (!read_resp.has_value()) { - return read_resp.error(); - } - - PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); - - leftover_buf_->ConsumeInput(read_resp->left_in_buffer); - - sync_fb_ = - fb2::Fiber("shard_migration_full_sync", &ClusterShardMigration::FullSyncShardFb, this, cntx); - - return {}; -} - -void ClusterShardMigration::FullSyncShardFb(Context* cntx) { - DCHECK(leftover_buf_); - io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()}; - - JournalReader reader{&ps, 0}; +void ClusterShardMigration::Start(Context* cntx, io::Source* source) { + JournalReader reader{source, 0}; TransactionReader tx_reader{false}; while (!cntx->IsCancelled()) { @@ -70,14 +41,13 @@ void ClusterShardMigration::FullSyncShardFb(Context* cntx) { break; auto tx_data = tx_reader.NextTxData(&reader, cntx); - if (!tx_data) + if (!tx_data) { + VLOG(1) << "No tx data"; break; - - TouchIoTime(); + } if (tx_data->opcode == journal::Op::FIN) { VLOG(2) << "Flow " << source_shard_id_ << " is finalized"; - is_finalized_ = true; break; } else if (tx_data->opcode == journal::Op::PING) { // TODO check about ping logic @@ -102,12 +72,4 @@ void ClusterShardMigration::ExecuteTxWithNoShardSync(TransactionData&& tx_data, } } -void ClusterShardMigration::Cancel() { - CloseSocket(); -} - -void ClusterShardMigration::JoinFlow() { - sync_fb_.JoinIfNeeded(); -} - } // namespace dfly diff --git a/src/server/cluster/cluster_shard_migration.h b/src/server/cluster/cluster_shard_migration.h index ee74fedd2..f353e666e 100644 --- a/src/server/cluster/cluster_shard_migration.h +++ b/src/server/cluster/cluster_shard_migration.h @@ -5,7 +5,6 @@ #include "base/io_buf.h" #include "server/journal/executor.h" -#include "server/protocol_client.h" namespace dfly { @@ -15,42 +14,23 @@ class MultiShardExecution; // ClusterShardMigration manage data receiving in slots migration process. // It is created per shard on the target node to initiate FLOW step. -class ClusterShardMigration : public ProtocolClient { +class ClusterShardMigration { public: - ClusterShardMigration(ServerContext server_context, uint32_t local_sync_id, uint32_t shard_id, - uint32_t sync_id, Service* service); + ClusterShardMigration(uint32_t local_sync_id, uint32_t shard_id, Service* service); ~ClusterShardMigration(); - std::error_code StartSyncFlow(Context* cntx); - void Cancel(); - - void SetStableSync() { - is_stable_sync_.store(true); - } - bool IsStableSync() { - return is_stable_sync_.load(); - } - - bool IsFinalized() { - return is_finalized_; - } - - void JoinFlow(); + void Start(Context* cntx, io::Source* source); private: - void FullSyncShardFb(Context* cntx); + void FullSyncShardFb(Context* cntx, io::Source* source); void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); private: uint32_t source_shard_id_; - uint32_t sync_id_; std::optional leftover_buf_; std::unique_ptr executor_; - util::fb2::Fiber sync_fb_; - std::atomic_bool is_stable_sync_ = false; - bool is_finalized_ = false; }; } // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index e4a838ac8..a48dbfe92 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -8,7 +8,6 @@ #include #include "base/logging.h" -#include "server/cluster/cluster_family.h" #include "server/cluster/cluster_shard_migration.h" #include "server/error.h" #include "server/journal/tx_executor.h" @@ -40,136 +39,41 @@ atomic_uint32_t next_local_sync_id{1}; } // namespace -ClusterSlotMigration::ClusterSlotMigration(ClusterFamily* cl_fm, string host_ip, uint16_t port, - Service* se, SlotRanges slots) +ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, Service* se, + SlotRanges slots, uint32_t shards_num) : ProtocolClient(std::move(host_ip), port), - cluster_family_(cl_fm), service_(*se), - slots_(std::move(slots)) { + slots_(std::move(slots)), + state_(MigrationState::C_CONNECTING), + partitions_(Partition(shards_num)), + bc_(shards_num) { local_sync_id_ = next_local_sync_id.fetch_add(1); + + shard_flows_.resize(shards_num); + for (unsigned i = 0; i < shards_num; ++i) { + shard_flows_[i].reset(new ClusterShardMigration(local_sync_id_, i, &service_)); + } } ClusterSlotMigration::~ClusterSlotMigration() { sync_fb_.JoinIfNeeded(); } -error_code ClusterSlotMigration::Init(uint32_t sync_id, uint32_t shards_num) { - VLOG(1) << "Init slot migration"; - - state_ = MigrationState::C_CONNECTING; - - sync_id_ = sync_id; - source_shards_num_ = shards_num; - - VLOG(1) << "Resolving host DNS"; - error_code ec = ResolveHostDns(); - if (ec) - return ec; - - VLOG(1) << "Connecting to source"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); - if (ec) - return ec; - - ResetParser(false); - - sync_fb_ = fb2::Fiber("main_migration", &ClusterSlotMigration::MainMigrationFb, this); - - return ec; -} - ClusterSlotMigration::Info ClusterSlotMigration::GetInfo() const { const auto& ctx = server(); return {ctx.host, ctx.port}; } -bool ClusterSlotMigration::IsFinalized() const { - return std::all_of(shard_flows_.begin(), shard_flows_.end(), - [](const auto& el) { return el->IsFinalized(); }); +void ClusterSlotMigration::Join() { + bc_->Wait(); + state_ = MigrationState::C_FINISHED; } -void ClusterSlotMigration::Stop() { - for (auto& flow : shard_flows_) { - flow->Cancel(); - } -} +void ClusterSlotMigration::StartFlow(uint32_t shard, io::Source* source) { + VLOG(1) << "Start flow for shard: " << shard; -void ClusterSlotMigration::MainMigrationFb() { - VLOG(1) << "Main migration fiber started " << sync_id_; - - state_ = MigrationState::C_SYNC; - - // TODO add reconnection code - if (auto ec = InitiateSlotsMigration(); ec) { - LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " " - << ec.message(); - } - - if (IsFinalized()) { - state_ = MigrationState::C_FINISHED; - - auto cmd = absl::StrCat("DFLYMIGRATE ACK ", sync_id_); - VLOG(1) << "send " << cmd; - - auto err = SendCommandAndReadResponse(cmd); - auto success = !err && CheckRespIsSimpleReply("OK"); - - LOG_IF(WARNING, !success) << ToSV(LastResponseArgs().front().GetBuf()); - - cluster_family_->FinalizeIncomingMigration(local_sync_id_); - } -} - -std::error_code ClusterSlotMigration::InitiateSlotsMigration() { - shard_flows_.resize(source_shards_num_); - for (unsigned i = 0; i < source_shards_num_; ++i) { - shard_flows_[i].reset( - new ClusterShardMigration(server(), local_sync_id_, i, sync_id_, &service_)); - } - - absl::Cleanup cleanup = [this]() { - // We do the following operations regardless of outcome. - for (auto& flow : shard_flows_) { - flow->JoinFlow(); - } - }; - - // Switch to new error handler that closes flow sockets. - auto err_handler = [this](const auto& ge) mutable { - // Make sure the flows are not in a state transition - lock_guard lk{flows_op_mu_}; - - // Unblock all sockets. - DefaultErrorHandler(ge); - for (auto& flow : shard_flows_) - flow->Cancel(); - }; - RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); - - std::atomic_uint32_t synced_shards = 0; - auto partition = Partition(source_shards_num_); - auto shard_cb = [&](unsigned index, auto*) { - for (auto id : partition[index]) { - auto ec = shard_flows_[id]->StartSyncFlow(&cntx_); - if (!ec) { - ++synced_shards; - } else { - cntx_.ReportError(ec); - } - } - }; - // Lock to prevent the error handler from running instantly - // while the flows are in a mixed state. - lock_guard lk{flows_op_mu_}; - shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); - - VLOG(1) << synced_shards << " from " << source_shards_num_ << " shards were set flow"; - if (synced_shards != source_shards_num_) { - cntx_.ReportError(std::make_error_code(errc::state_not_recoverable), - "incorrect shards num, only for tests"); - } - - return cntx_.GetError(); + shard_flows_[shard]->Start(&cntx_, source); + bc_->Dec(); } } // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index 564a92ded..699303f84 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -9,7 +9,6 @@ namespace dfly { class ClusterShardMigration; class Service; -class ClusterFamily; // The main entity on the target side that manage slots migration process // Creates initial connection between the target and source node, @@ -21,18 +20,14 @@ class ClusterSlotMigration : private ProtocolClient { uint16_t port; }; - ClusterSlotMigration(ClusterFamily* cl_fm, std::string host_ip, uint16_t port, Service* se, - SlotRanges slots); + // TODO refactor instead of host_ip and port we can use NODE_ID + ClusterSlotMigration(std::string host_ip, uint16_t port, Service* se, SlotRanges slots, + uint32_t shards_num); ~ClusterSlotMigration(); - // Initiate connection with source node and create migration fiber - // will be refactored in the future - std::error_code Init(uint32_t sync_id, uint32_t shards_num); + void StartFlow(uint32_t shard, io::Source* source); Info GetInfo() const; - uint32_t GetSyncId() const { - return sync_id_; - } // Remote sync ids uniquely identifies a sync *remotely*. However, multiple remote sources can // use the same id, so we need a local id as well. @@ -44,31 +39,21 @@ class ClusterSlotMigration : private ProtocolClient { return state_; } - void Stop(); + void Join(); const SlotRanges& GetSlots() const { return slots_; } private: - void MainMigrationFb(); - // Creates flows, one per shard on the source node and manage migration process - std::error_code InitiateSlotsMigration(); - - // may be called after we finish all flows - bool IsFinalized() const; - - private: - ClusterFamily* cluster_family_; Service& service_; - util::fb2::Mutex flows_op_mu_; std::vector> shard_flows_; SlotRanges slots_; - uint32_t source_shards_num_ = 0; - uint32_t sync_id_ = 0; uint32_t local_sync_id_ = 0; MigrationState state_ = MigrationState::C_NO_STATE; + std::vector> partitions_; + util::fb2::BlockingCounter bc_; util::fb2::Fiber sync_fb_; }; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 6befa475a..422c6aa3d 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -10,6 +10,7 @@ #include "absl/cleanup/cleanup.h" #include "base/logging.h" +#include "cluster_family.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -21,17 +22,34 @@ ABSL_DECLARE_FLAG(int, source_connect_timeout_ms); ABSL_DECLARE_FLAG(uint16_t, admin_port); using namespace std; +using namespace facade; using namespace util; namespace dfly { -class OutgoingMigration::SliceSlotMigration { +class OutgoingMigration::SliceSlotMigration : private ProtocolClient { public: - SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, - Context* cntx, io::Sink* dest) - : streamer_(slice, std::move(slots), sync_id, journal, cntx) { - state_.store(MigrationState::C_SYNC, memory_order_relaxed); - sync_fb_ = fb2::Fiber("slot-snapshot", [this, dest] { streamer_.Start(dest); }); + SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, + journal::Journal* journal, Context* cntx) + : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, cntx) { + } + ~SliceSlotMigration() { + sync_fb_.JoinIfNeeded(); + } + + std::error_code Start(uint32_t shard_id) { + RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_)); + ResetParser(/*server_mode=*/false); + + auto port = absl::GetFlag(FLAGS_admin_port); + std::string cmd = absl::StrCat("DFLYMIGRATE FLOW ", port, " ", shard_id); + VLOG(1) << "cmd: " << cmd; + + RETURN_ON_ERR(SendCommandAndReadResponse(cmd)); + LOG_IF(WARNING, !CheckRespIsSimpleReply("OK")) << ToSV(LastResponseArgs().front().GetBuf()); + + sync_fb_ = fb2::Fiber("slot-snapshot", [this] { streamer_.Start(Sock()); }); + return {}; } void Cancel() { @@ -44,56 +62,28 @@ class OutgoingMigration::SliceSlotMigration { void Finalize() { streamer_.SendFinalize(); - state_.store(MigrationState::C_FINISHED, memory_order_relaxed); - } - - MigrationState GetState() const { - return state_.load(memory_order_relaxed); } private: RestoreStreamer streamer_; - // Atomic only for simple read operation, writes - from the same thread, reads - from any thread - atomic state_ = MigrationState::C_CONNECTING; fb2::Fiber sync_fb_; }; OutgoingMigration::OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, - uint32_t sync_id, Context::ErrHandler err_handler, + ClusterFamily* cf, Context::ErrHandler err_handler, ServerFamily* sf) : ProtocolClient(ip, port), - host_ip_(ip), - port_(port), - sync_id_(sync_id), slots_(slots), cntx_(err_handler), slot_migrations_(shard_set->size()), - server_family_(sf) { + server_family_(sf), + cf_(cf) { } OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); } -void OutgoingMigration::StartFlow(journal::Journal* journal, io::Sink* dest) { - EngineShard* shard = EngineShard::tlocal(); - DbSlice* slice = &shard->db_slice(); - - const auto shard_id = slice->shard_id(); - - MigrationState state = MigrationState::C_NO_STATE; - { - std::lock_guard lck(flows_mu_); - slot_migrations_[shard_id] = - std::make_unique(slice, slots_, sync_id_, journal, &cntx_, dest); - state = GetStateImpl(); - } - - if (state == MigrationState::C_SYNC) { - main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); - } -} - void OutgoingMigration::Finalize(uint32_t shard_id) { slot_migrations_[shard_id]->Finalize(); } @@ -103,23 +93,23 @@ void OutgoingMigration::Cancel(uint32_t shard_id) { } MigrationState OutgoingMigration::GetState() const { - std::lock_guard lck(flows_mu_); - return GetStateImpl(); -} - -MigrationState OutgoingMigration::GetStateImpl() const { - MigrationState min_state = MigrationState::C_MAX_INVALID; - for (const auto& slot_migration : slot_migrations_) { - if (slot_migration) { - min_state = std::min(min_state, slot_migration->GetState()); - } else { - min_state = MigrationState::C_NO_STATE; - } - } - return min_state; + return state_.load(); } void OutgoingMigration::SyncFb() { + auto start_cb = [this](util::ProactorBase* pb) { + if (auto* shard = EngineShard::tlocal(); shard) { + server_family_->journal()->StartInThread(); + slot_migrations_[shard->shard_id()] = std::make_unique( + &shard->db_slice(), server(), slots_, server_family_->journal(), &cntx_); + slot_migrations_[shard->shard_id()]->Start(shard->shard_id()); + } + }; + + state_.store(MigrationState::C_SYNC); + + shard_set->pool()->AwaitFiberOnAll(std::move(start_cb)); + for (auto& migration : slot_migrations_) { migration->WaitForSnapshotFinished(); } @@ -144,18 +134,51 @@ void OutgoingMigration::SyncFb() { auto cb = [this](util::ProactorBase* pb) { if (const auto* shard = EngineShard::tlocal(); shard) { // TODO add error processing to move back into STABLE_SYNC state + VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id(); Finalize(shard->shard_id()); } }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); - // TODO add ACK here and config update + auto port = absl::GetFlag(FLAGS_admin_port); + auto cmd = absl::StrCat("DFLYMIGRATE ACK ", port); + VLOG(1) << "send " << cmd; + + auto err = SendCommandAndReadResponse(cmd); + LOG_IF(WARNING, err) << err; + + if (!err) { + LOG_IF(WARNING, !CheckRespIsSimpleReply("OK")) << ToSV(LastResponseArgs().front().GetBuf()); + + shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) + Cancel(shard->shard_id()); + }); + + state_.store(MigrationState::C_FINISHED); + } + + cf_->UpdateConfig(slots_, false); +} + +void OutgoingMigration::Ack() { + auto cb = [this](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) { + Cancel(shard->shard_id()); + } + }; + + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + + state_.store(MigrationState::C_FINISHED); } std::error_code OutgoingMigration::Start(ConnectionContext* cntx) { VLOG(1) << "Starting outgoing migration"; + state_.store(MigrationState::C_CONNECTING); + auto check_connection_error = [&cntx](error_code ec, const char* msg) -> error_code { if (ec) { cntx->SendError(absl::StrCat(msg, ec.message())); @@ -174,13 +197,15 @@ std::error_code OutgoingMigration::Start(ConnectionContext* cntx) { VLOG(1) << "Migration initiating"; ResetParser(false); auto port = absl::GetFlag(FLAGS_admin_port); - auto cmd = absl::StrCat("DFLYMIGRATE INIT ", sync_id_, " ", port, " ", slot_migrations_.size()); + auto cmd = absl::StrCat("DFLYMIGRATE INIT ", port, " ", slot_migrations_.size()); for (const auto& s : slots_) { absl::StrAppend(&cmd, " ", s.start, " ", s.end); } RETURN_ON_ERR(SendCommandAndReadResponse(cmd)); LOG_IF(ERROR, !CheckRespIsSimpleReply("OK")) << facade::ToSV(LastResponseArgs().front().GetBuf()); + main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); + return {}; } diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index b6a4ccf26..366194c37 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -16,12 +16,13 @@ class Journal; class DbSlice; class ServerFamily; +class ClusterFamily; // Whole outgoing slots migration manager class OutgoingMigration : private ProtocolClient { public: - OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, uint32_t sync_id, - Context::ErrHandler, ServerFamily* sf); + OutgoingMigration(std::string ip, uint16_t port, SlotRanges slots, ClusterFamily* cf, + Context::ErrHandler err_handler, ServerFamily* sf); ~OutgoingMigration(); // start migration process, sends INIT command to the target node @@ -35,12 +36,16 @@ class OutgoingMigration : private ProtocolClient { MigrationState GetState() const; + // Temporary method, will be removed in one of the PR + // This method stop migration connections + void Ack(); + const std::string& GetHostIp() const { - return host_ip_; + return server().host; }; uint16_t GetPort() const { - return port_; + return server().port; }; const SlotRanges& GetSlots() const { @@ -55,16 +60,17 @@ class OutgoingMigration : private ProtocolClient { void SyncFb(); private: - std::string host_ip_; - uint16_t port_; - uint32_t sync_id_; SlotRanges slots_; Context cntx_; mutable util::fb2::Mutex flows_mu_; std::vector> slot_migrations_ ABSL_GUARDED_BY(flows_mu_); ServerFamily* server_family_; + ClusterFamily* cf_; util::fb2::Fiber main_sync_fb_; + + // Atomic only for simple read operation, writes - from the same thread, reads - from any thread + std::atomic state_ = MigrationState::C_NO_STATE; }; } // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index f9e80eea2..d35706246 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -49,12 +49,9 @@ void JournalStreamer::WriterFb(io::Sink* dest) { } } -RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, - journal::Journal* journal, Context* cntx) - : JournalStreamer(journal, cntx), - db_slice_(slice), - my_slots_(std::move(slots)), - sync_id_(sync_id) { +RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, + Context* cntx) + : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { DCHECK(slice != nullptr); } @@ -92,7 +89,8 @@ void RestoreStreamer::Start(io::Sink* dest) { } void RestoreStreamer::SendFinalize() { - VLOG(2) << "DFLYMIGRATE FINALIZE for " << sync_id_ << " : " << db_slice_->shard_id(); + VLOG(2) << "DFLYMIGRATE FINALIZE for " + << " : " << db_slice_->shard_id(); journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/); JournalWriter writer{this}; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index f0016ea20..611f62501 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -53,8 +53,7 @@ class JournalStreamer : protected BufferedStreamerBase { // Only handles relevant slots, while ignoring all others. class RestoreStreamer : public JournalStreamer { public: - RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal, - Context* cntx); + RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); ~RestoreStreamer() override; void Start(io::Sink* dest) override; @@ -81,7 +80,6 @@ class RestoreStreamer : public JournalStreamer { DbSlice* db_slice_; uint64_t snapshot_version_ = 0; SlotSet my_slots_; - uint32_t sync_id_; Cancellation fiber_cancellation_; bool snapshot_finished_ = false; };