Changes after review

This commit is contained in:
mkaruza 2025-05-07 13:29:55 +02:00
parent 6cddc4385e
commit f080a5208a
No known key found for this signature in database
GPG key ID: EC66B3EC3BF9F1A3
6 changed files with 52 additions and 68 deletions

View file

@ -174,6 +174,10 @@ class ClusterShardInfos {
// MigrationState constants are ordered in state changing order // MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL }; enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };
// Errors during slot migration
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM";
// return error message if slot doesn't belong to this node // return error message if slot doesn't belong to this node
facade::ErrorReply SlotOwnershipError(SlotId slot_id); facade::ErrorReply SlotOwnershipError(SlotId slot_id);

View file

@ -9,7 +9,6 @@
#include <string> #include <string>
#include "absl/cleanup/cleanup.h" #include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "facade/cmd_arg_parser.h" #include "facade/cmd_arg_parser.h"
@ -17,9 +16,6 @@
#include "facade/error.h" #include "facade/error.h"
#include "server/acl/acl_commands_def.h" #include "server/acl/acl_commands_def.h"
#include "server/channel_store.h" #include "server/channel_store.h"
#include "server/cluster/cluster_defs.h"
#include "server/cluster/incoming_slot_migration.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/command_registry.h" #include "server/command_registry.h"
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/dflycmd.h" #include "server/dflycmd.h"
@ -839,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
removed_slots.Merge(slots); removed_slots.Merge(slots);
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
<< migration.GetHostIp() << ":" << migration.GetPort(); << migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish(MigrationState::C_FINISHED); migration.Finish();
res.migrations.push_back(std::move(*it)); res.migrations.push_back(std::move(*it));
outgoing_migration_jobs_.erase(it); outgoing_migration_jobs_.erase(it);
} }
@ -930,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
if (!migration) { if (!migration) {
VLOG(1) << "Unrecognized incoming migration from " << source_id; VLOG(1) << "Unrecognized incoming migration from " << source_id;
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration); return builder->SendSimpleString(kUnknownMigration);
} }
if (migration->GetState() != MigrationState::C_CONNECTING) { if (migration->GetState() != MigrationState::C_CONNECTING) {
@ -942,7 +938,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
} }
if (migration->GetState() == MigrationState::C_FATAL) { if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
} }
migration->Init(flows_num); migration->Init(flows_num);
@ -969,10 +965,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
return builder->SendError(kIdNotFound); return builder->SendError(kIdNotFound);
} }
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
}
DCHECK(cntx->sync_dispatch); DCHECK(cntx->sync_dispatch);
// we do this to be ignored by the dispatch tracker // we do this to be ignored by the dispatch tracker
// TODO provide a more clear approach // TODO provide a more clear approach
@ -981,10 +973,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
builder->SendOk(); builder->SendOk();
migration->StartFlow(shard_id, cntx->conn()->socket()); migration->StartFlow(shard_id, cntx->conn()->socket());
if (migration->GetState() == MigrationState::C_FATAL) {
migration->Stop();
}
} }
void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
@ -1051,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; }); [source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
if (m_it == in_migrations.end()) { if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config"; LOG(WARNING) << "migration isn't in config";
return builder->SendError(OutgoingMigration::kUnknownMigration); return builder->SendError(kUnknownMigration);
} }
auto migration = GetIncomingMigration(source_id); auto migration = GetIncomingMigration(source_id);
@ -1060,7 +1048,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
if (!migration->Join(attempt)) { if (!migration->Join(attempt)) {
if (migration->GetState() == MigrationState::C_FATAL) { if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM)); return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
} else { } else {
return builder->SendError("Join timeout happened"); return builder->SendError("Join timeout happened");
} }

View file

@ -7,15 +7,10 @@
#include <absl/cleanup/cleanup.h> #include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h> #include <absl/strings/str_cat.h>
#include <utility>
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "cluster_utility.h" #include "cluster_utility.h"
#include "facade/error.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/common.h"
#include "server/error.h" #include "server/error.h"
#include "server/journal/executor.h" #include "server/journal/executor.h"
#include "server/journal/tx_executor.h" #include "server/journal/tx_executor.h"
@ -76,17 +71,11 @@ class ClusterShardMigration {
break; break;
} }
auto oom_check = [&]() -> bool { auto used_mem = used_mem_current.load(memory_order_relaxed);
auto used_mem = used_mem_current.load(memory_order_relaxed); // If aplying transaction data will reach 90% of max_memory_limit we end migration.
if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) { if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) {
cntx->ReportError(IncomingSlotMigration::kMigrationOOM); cntx->ReportError(std::string{kIncomingMigrationOOM});
in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM)); in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
return true;
}
return false;
};
if (oom_check()) {
break; break;
} }
@ -99,8 +88,8 @@ class ClusterShardMigration {
VLOG(1) << "Finalized flow " << source_shard_id_; VLOG(1) << "Finalized flow " << source_shard_id_;
return; return;
} }
if (oom_check()) { if (in_migration_->GetState() == MigrationState::C_FATAL) {
VLOG(2) << "Flow finalization " << source_shard_id_ VLOG(1) << "Flow finalization " << source_shard_id_
<< " canceled due memory limit reached"; << " canceled due memory limit reached";
return; return;
} }
@ -248,6 +237,11 @@ void IncomingSlotMigration::Stop() {
} }
} }
// Don't wait if we reached FATAL state
if (state_ == MigrationState::C_FATAL) {
return;
}
// we need to Join the migration process to prevent data corruption // we need to Join the migration process to prevent data corruption
const absl::Time start = absl::Now(); const absl::Time start = absl::Now();
const absl::Duration timeout = const absl::Duration timeout =
@ -284,6 +278,9 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
VLOG(1) << "Incoming flow " << shard VLOG(1) << "Incoming flow " << shard
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for " << (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
<< source_id_; << source_id_;
if (GetState() == MigrationState::C_FATAL) {
Stop();
}
} }
size_t IncomingSlotMigration::GetKeyCount() const { size_t IncomingSlotMigration::GetKeyCount() const {

View file

@ -3,9 +3,6 @@
// //
#pragma once #pragma once
#include <string>
#include "absl/base/thread_annotations.h"
#include "helio/util/fiber_socket_base.h" #include "helio/util/fiber_socket_base.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "server/common.h" #include "server/common.h"
@ -82,8 +79,6 @@ class IncomingSlotMigration {
void Pause(bool pause); void Pause(bool pause);
static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM";
private: private:
std::string source_id_; std::string source_id_;
Service& service_; Service& service_;

View file

@ -12,10 +12,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "cluster_family.h" #include "cluster_family.h"
#include "cluster_utility.h" #include "cluster_utility.h"
#include "facade/resp_expr.h"
#include "server/cluster/cluster_defs.h" #include "server/cluster/cluster_defs.h"
#include "server/cluster/incoming_slot_migration.h"
#include "server/common.h"
#include "server/db_slice.h" #include "server/db_slice.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
@ -40,8 +37,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal, OutgoingMigration* om) journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
exec_st_.SwitchErrorHandler( exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
} }
~SliceSlotMigration() { ~SliceSlotMigration() {
@ -143,8 +139,14 @@ void OutgoingMigration::OnAllShards(
}); });
} }
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
if (error) { if (error) {
if (error.Format() == kIncomingMigrationOOM) {
next_state = MigrationState::C_FATAL;
} else {
next_state = MigrationState::C_ERROR;
}
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format(); << migration_info_.node_info.id << " with error: " << error.Format();
exec_st_.ReportError(std::move(error)); exec_st_.ReportError(std::move(error));
@ -225,6 +227,15 @@ void OutgoingMigration::SyncFb() {
continue; continue;
} }
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
// happen on second iteration after first failed with OOM. Sending second INIT is required to
// cleanup slots on incoming slot migration node.
if (CheckRespFirstTypes({RespExpr::ERROR}) &&
facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) {
ChangeState(MigrationState::C_FATAL);
break;
}
if (!CheckRespIsSimpleReply("OK")) { if (!CheckRespIsSimpleReply("OK")) {
if (CheckRespIsSimpleReply(kUnknownMigration)) { if (CheckRespIsSimpleReply(kUnknownMigration)) {
const absl::Duration passed = absl::Now() - start_time; const absl::Duration passed = absl::Now() - start_time;
@ -275,20 +286,16 @@ void OutgoingMigration::SyncFb() {
} }
long attempt = 0; long attempt = 0;
bool fatal_state = false;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) { while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// Don't sleep if we ended up in FATAL state // Break loop and don't sleep in case of C_FATAL
if (GetState() == MigrationState::C_FATAL) { if (GetState() == MigrationState::C_FATAL) {
fatal_state = true;
break; break;
} else {
// Process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
} }
// Process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
} }
// End outgoing slot migration if we are FINISHED or are in FATAL state if (!exec_st_.IsRunning()) {
if (!exec_st_.IsRunning() && !fatal_state) {
continue; continue;
} }
break; break;
@ -369,16 +376,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
if (CheckRespFirstTypes({RespExpr::ERROR})) { if (CheckRespFirstTypes({RespExpr::ERROR})) {
auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); auto error = facade::ToSV(LastResponseArgs().front().GetBuf());
LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id // Check if returned incoming slot OOM and finish migration
<< " attempt " << attempt << " msg: " << error; if (error == kIncomingMigrationOOM) {
auto next_state = MigrationState::C_ERROR; Finish(std::string(error));
// Check if there is OOM response from incoming slot migration return false;
if (error == IncomingSlotMigration::kMigrationOOM) {
SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM));
next_state = MigrationState::C_FATAL;
} }
Finish(next_state, std::string(error));
return false;
} }
if (!CheckRespFirstTypes({RespExpr::INT64})) { if (!CheckRespFirstTypes({RespExpr::INT64})) {
@ -397,7 +399,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
} }
if (!exec_st_.GetError()) { if (!exec_st_.GetError()) {
Finish(MigrationState::C_FINISHED); Finish();
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false); false);

View file

@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start() // can be called from any thread, but only after Start()
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens // if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient {
size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_); size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
private: private:
// should be run for all shards // should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest); void StartFlow(journal::Journal* journal, io::Sink* dest);