From f080a5208aefc8c283d86779fef20dc278033c64 Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 7 May 2025 13:29:55 +0200 Subject: [PATCH] Changes after review --- src/server/cluster/cluster_defs.h | 4 ++ src/server/cluster/cluster_family.cc | 22 ++------ src/server/cluster/incoming_slot_migration.cc | 33 ++++++------ src/server/cluster/incoming_slot_migration.h | 5 -- src/server/cluster/outgoing_slot_migration.cc | 52 ++++++++++--------- src/server/cluster/outgoing_slot_migration.h | 4 +- 6 files changed, 52 insertions(+), 68 deletions(-) diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 47771ebc5..b130f20b0 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -174,6 +174,10 @@ class ClusterShardInfos { // MigrationState constants are ordered in state changing order enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL }; +// Errors during slot migration +static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; +static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM"; + // return error message if slot doesn't belong to this node facade::ErrorReply SlotOwnershipError(SlotId slot_id); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ff0a9922e..37419d1c3 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -9,7 +9,6 @@ #include #include "absl/cleanup/cleanup.h" -#include "absl/strings/str_cat.h" #include "base/flags.h" #include "base/logging.h" #include "facade/cmd_arg_parser.h" @@ -17,9 +16,6 @@ #include "facade/error.h" #include "server/acl/acl_commands_def.h" #include "server/channel_store.h" -#include "server/cluster/cluster_defs.h" -#include "server/cluster/incoming_slot_migration.h" -#include "server/cluster/outgoing_slot_migration.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/dflycmd.h" @@ -839,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(MigrationState::C_FINISHED); + migration.Finish(); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } @@ -930,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { if (!migration) { VLOG(1) << "Unrecognized incoming migration from " << source_id; - return builder->SendSimpleString(OutgoingMigration::kUnknownMigration); + return builder->SendSimpleString(kUnknownMigration); } if (migration->GetState() != MigrationState::C_CONNECTING) { @@ -942,7 +938,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { } if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); } migration->Init(flows_num); @@ -969,10 +965,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, return builder->SendError(kIdNotFound); } - if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); - } - DCHECK(cntx->sync_dispatch); // we do this to be ignored by the dispatch tracker // TODO provide a more clear approach @@ -981,10 +973,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, builder->SendOk(); migration->StartFlow(shard_id, cntx->conn()->socket()); - - if (migration->GetState() == MigrationState::C_FATAL) { - migration->Stop(); - } } void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, @@ -1051,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { [source_id = source_id](const auto& m) { return m.node_info.id == source_id; }); if (m_it == in_migrations.end()) { LOG(WARNING) << "migration isn't in config"; - return builder->SendError(OutgoingMigration::kUnknownMigration); + return builder->SendError(kUnknownMigration); } auto migration = GetIncomingMigration(source_id); @@ -1060,7 +1048,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { if (!migration->Join(attempt)) { if (migration->GetState() == MigrationState::C_FATAL) { - return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); + return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM)); } else { return builder->SendError("Join timeout happened"); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 78a9c39ea..bb3c2e9c5 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -7,15 +7,10 @@ #include #include -#include - #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" -#include "facade/error.h" #include "server/cluster/cluster_defs.h" -#include "server/cluster/outgoing_slot_migration.h" -#include "server/common.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" @@ -76,17 +71,11 @@ class ClusterShardMigration { break; } - auto oom_check = [&]() -> bool { - auto used_mem = used_mem_current.load(memory_order_relaxed); - if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) { - cntx->ReportError(IncomingSlotMigration::kMigrationOOM); - in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM)); - return true; - } - return false; - }; - - if (oom_check()) { + auto used_mem = used_mem_current.load(memory_order_relaxed); + // If aplying transaction data will reach 90% of max_memory_limit we end migration. + if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) { + cntx->ReportError(std::string{kIncomingMigrationOOM}); + in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM}); break; } @@ -99,8 +88,8 @@ class ClusterShardMigration { VLOG(1) << "Finalized flow " << source_shard_id_; return; } - if (oom_check()) { - VLOG(2) << "Flow finalization " << source_shard_id_ + if (in_migration_->GetState() == MigrationState::C_FATAL) { + VLOG(1) << "Flow finalization " << source_shard_id_ << " canceled due memory limit reached"; return; } @@ -248,6 +237,11 @@ void IncomingSlotMigration::Stop() { } } + // Don't wait if we reached FATAL state + if (state_ == MigrationState::C_FATAL) { + return; + } + // we need to Join the migration process to prevent data corruption const absl::Time start = absl::Now(); const absl::Duration timeout = @@ -284,6 +278,9 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou VLOG(1) << "Incoming flow " << shard << (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for " << source_id_; + if (GetState() == MigrationState::C_FATAL) { + Stop(); + } } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 425de8706..481245c6d 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -3,9 +3,6 @@ // #pragma once -#include - -#include "absl/base/thread_annotations.h" #include "helio/util/fiber_socket_base.h" #include "server/cluster/cluster_defs.h" #include "server/common.h" @@ -82,8 +79,6 @@ class IncomingSlotMigration { void Pause(bool pause); - static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM"; - private: std::string source_id_; Service& service_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a813230db..58219a92c 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,10 +12,7 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" -#include "facade/resp_expr.h" #include "server/cluster/cluster_defs.h" -#include "server/cluster/incoming_slot_migration.h" -#include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -40,8 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler( - [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); + exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); } ~SliceSlotMigration() { @@ -143,8 +139,14 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { +void OutgoingMigration::Finish(GenericError error) { + auto next_state = MigrationState::C_FINISHED; if (error) { + if (error.Format() == kIncomingMigrationOOM) { + next_state = MigrationState::C_FATAL; + } else { + next_state = MigrationState::C_ERROR; + } LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -225,6 +227,15 @@ void OutgoingMigration::SyncFb() { continue; } + // Break outgoing migration if INIT from incoming node responded with OOM. Usually this will + // happen on second iteration after first failed with OOM. Sending second INIT is required to + // cleanup slots on incoming slot migration node. + if (CheckRespFirstTypes({RespExpr::ERROR}) && + facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) { + ChangeState(MigrationState::C_FATAL); + break; + } + if (!CheckRespIsSimpleReply("OK")) { if (CheckRespIsSimpleReply(kUnknownMigration)) { const absl::Duration passed = absl::Now() - start_time; @@ -275,20 +286,16 @@ void OutgoingMigration::SyncFb() { } long attempt = 0; - bool fatal_state = false; while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { - // Don't sleep if we ended up in FATAL state + // Break loop and don't sleep in case of C_FATAL if (GetState() == MigrationState::C_FATAL) { - fatal_state = true; break; - } else { - // Process commands that were on pause and try again - VLOG(1) << "Waiting for migration to finalize..."; - ThisFiber::SleepFor(500ms); } + // Process commands that were on pause and try again + VLOG(1) << "Waiting for migration to finalize..."; + ThisFiber::SleepFor(500ms); } - // End outgoing slot migration if we are FINISHED or are in FATAL state - if (!exec_st_.IsRunning() && !fatal_state) { + if (!exec_st_.IsRunning()) { continue; } break; @@ -369,16 +376,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { if (CheckRespFirstTypes({RespExpr::ERROR})) { auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); - LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id - << " attempt " << attempt << " msg: " << error; - auto next_state = MigrationState::C_ERROR; - // Check if there is OOM response from incoming slot migration - if (error == IncomingSlotMigration::kMigrationOOM) { - SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM)); - next_state = MigrationState::C_FATAL; + // Check if returned incoming slot OOM and finish migration + if (error == kIncomingMigrationOOM) { + Finish(std::string(error)); + return false; } - Finish(next_state, std::string(error)); - return false; } if (!CheckRespFirstTypes({RespExpr::INT64})) { @@ -397,7 +399,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(MigrationState::C_FINISHED); + Finish(); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 57cd8bb67..9ed9ccd4a 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); @@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient { size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_); - static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION"; - private: // should be run for all shards void StartFlow(journal::Journal* journal, io::Sink* dest);