fix(cluster_family): Cancel slot migration from incoming node on OOM

If applying command on incoming node will result in OOM (we overflow
max_memory_limit) we are closing migration and switch state to FATAL.

Signed-off-by: mkaruza <mario@dragonflydb.io>
This commit is contained in:
mkaruza 2025-04-25 13:14:31 +02:00
parent ad5aa66350
commit 6cddc4385e
No known key found for this signature in database
GPG key ID: EC66B3EC3BF9F1A3
11 changed files with 177 additions and 22 deletions

View file

@ -172,12 +172,7 @@ class ClusterShardInfos {
};
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
C_CONNECTING,
C_SYNC,
C_ERROR,
C_FINISHED,
};
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };
// return error message if slot doesn't belong to this node
facade::ErrorReply SlotOwnershipError(SlotId slot_id);

View file

@ -9,6 +9,7 @@
#include <string>
#include "absl/cleanup/cleanup.h"
#include "absl/strings/str_cat.h"
#include "base/flags.h"
#include "base/logging.h"
#include "facade/cmd_arg_parser.h"
@ -16,6 +17,9 @@
#include "facade/error.h"
#include "server/acl/acl_commands_def.h"
#include "server/channel_store.h"
#include "server/cluster/cluster_defs.h"
#include "server/cluster/incoming_slot_migration.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/dflycmd.h"
@ -726,6 +730,8 @@ static string_view StateToStr(MigrationState state) {
return "ERROR"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_FATAL:
return "FATAL"sv;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
@ -765,7 +771,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
};
for (const auto& m : incoming_migrations_jobs_) {
// TODO add error status
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(),
m->GetErrorStr());
}
@ -834,7 +839,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
removed_slots.Merge(slots);
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
<< migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish();
migration.Finish(MigrationState::C_FINISHED);
res.migrations.push_back(std::move(*it));
outgoing_migration_jobs_.erase(it);
}
@ -936,6 +941,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
DeleteSlots(slots);
}
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
}
migration->Init(flows_num);
return builder->SendOk();
@ -955,10 +964,15 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));
auto migration = GetIncomingMigration(source_id);
if (!migration) {
return builder->SendError(kIdNotFound);
}
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
}
DCHECK(cntx->sync_dispatch);
// we do this to be ignored by the dispatch tracker
// TODO provide a more clear approach
@ -967,6 +981,10 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
builder->SendOk();
migration->StartFlow(shard_id, cntx->conn()->socket());
if (migration->GetState() == MigrationState::C_FATAL) {
migration->Stop();
}
}
void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
@ -1041,7 +1059,11 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendError(kIdNotFound);
if (!migration->Join(attempt)) {
return builder->SendError("Join timeout happened");
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", IncomingSlotMigration::kMigrationOOM));
} else {
return builder->SendError("Join timeout happened");
}
}
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);

View file

@ -7,9 +7,15 @@
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>
#include <utility>
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "facade/error.h"
#include "server/cluster/cluster_defs.h"
#include "server/cluster/outgoing_slot_migration.h"
#include "server/common.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/tx_executor.h"
@ -70,6 +76,20 @@ class ClusterShardMigration {
break;
}
auto oom_check = [&]() -> bool {
auto used_mem = used_mem_current.load(memory_order_relaxed);
if ((used_mem + tx_data->command.cmd_len) > max_memory_limit) {
cntx->ReportError(IncomingSlotMigration::kMigrationOOM);
in_migration_->ReportFatalError(GenericError(IncomingSlotMigration::kMigrationOOM));
return true;
}
return false;
};
if (oom_check()) {
break;
}
while (tx_data->opcode == journal::Op::LSN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
last_attempt_.store(tx_data->lsn);
@ -79,6 +99,11 @@ class ClusterShardMigration {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
}
if (oom_check()) {
VLOG(2) << "Flow finalization " << source_shard_id_
<< " canceled due memory limit reached";
return;
}
if (!tx_data->command.cmd_args.empty()) {
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
<< tx_data->command.cmd_args[0];
@ -181,6 +206,11 @@ bool IncomingSlotMigration::Join(long attempt) {
return false;
}
// If any of migration shards reported ERROR (OOM) we can return error
if (GetState() == MigrationState::C_FATAL) {
return false;
}
// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
// we check current state and if WaitFor false but GetLastAttempt() == attempt
// the Join is failed and we can return false
@ -251,7 +281,9 @@ void IncomingSlotMigration::Init(uint32_t shards_num) {
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
shard_flows_[shard]->Start(&cntx_, source);
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
VLOG(1) << "Incoming flow " << shard
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
<< source_id_;
}
size_t IncomingSlotMigration::GetKeyCount() const {

View file

@ -3,6 +3,9 @@
//
#pragma once
#include <string>
#include "absl/base/thread_annotations.h"
#include "helio/util/fiber_socket_base.h"
#include "server/cluster/cluster_defs.h"
#include "server/common.h"
@ -50,10 +53,20 @@ class IncomingSlotMigration {
return source_id_;
}
// Switch to FATAL state and store error message
void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk_state(state_mu_);
util::fb2::LockGuard lk_error(error_mu_);
state_ = MigrationState::C_FATAL;
last_error_ = std::move(err);
}
void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk(error_mu_);
last_error_ = std::move(err);
if (GetState() != MigrationState::C_FATAL)
last_error_ = std::move(err);
}
std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) {
@ -69,12 +82,15 @@ class IncomingSlotMigration {
void Pause(bool pause);
static constexpr char kMigrationOOM[] = "INCOMING_MIGRATION_OOM";
private:
std::string source_id_;
Service& service_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
ExecutionState cntx_;
mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
std::atomic<size_t> errors_count_ = 0;

View file

@ -12,6 +12,10 @@
#include "base/logging.h"
#include "cluster_family.h"
#include "cluster_utility.h"
#include "facade/resp_expr.h"
#include "server/cluster/cluster_defs.h"
#include "server/cluster/incoming_slot_migration.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
@ -36,7 +40,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
exec_st_.SwitchErrorHandler(
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
}
~SliceSlotMigration() {
@ -138,10 +143,8 @@ void OutgoingMigration::OnAllShards(
});
}
void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) {
if (error) {
next_state = MigrationState::C_ERROR;
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
exec_st_.ReportError(std::move(error));
@ -164,6 +167,7 @@ void OutgoingMigration::Finish(GenericError error) {
case MigrationState::C_SYNC:
case MigrationState::C_ERROR:
case MigrationState::C_FATAL:
should_cancel_flows = true;
break;
}
@ -271,12 +275,20 @@ void OutgoingMigration::SyncFb() {
}
long attempt = 0;
bool fatal_state = false;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
// Don't sleep if we ended up in FATAL state
if (GetState() == MigrationState::C_FATAL) {
fatal_state = true;
break;
} else {
// Process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
}
if (!exec_st_.IsRunning()) {
// End outgoing slot migration if we are FINISHED or are in FATAL state
if (!exec_st_.IsRunning() && !fatal_state) {
continue;
}
break;
@ -355,6 +367,20 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
return false;
}
if (CheckRespFirstTypes({RespExpr::ERROR})) {
auto error = facade::ToSV(LastResponseArgs().front().GetBuf());
LOG(WARNING) << "Error response for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt << " msg: " << error;
auto next_state = MigrationState::C_ERROR;
// Check if there is OOM response from incoming slot migration
if (error == IncomingSlotMigration::kMigrationOOM) {
SetLastError(GenericError(IncomingSlotMigration::kMigrationOOM));
next_state = MigrationState::C_FATAL;
}
Finish(next_state, std::string(error));
return false;
}
if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt
@ -371,7 +397,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
if (!exec_st_.GetError()) {
Finish();
Finish(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false);

View file

@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start()
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);

View file

@ -33,7 +33,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
start += part.size();
}
return {std::move(buf), std::move(slice_parts)};
return {std::move(buf), std::move(slice_parts), cmd_str.size()};
}
} // namespace

View file

@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
size_t cmd_size = 0;
SET_OR_RETURN(ReadUInt<uint64_t>(), cmd_size);
data->cmd_len = cmd_size;
// Read all strings consecutively.
data->command_buf = make_unique<uint8_t[]>(cmd_size);
@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
ptr += size;
cmd_size -= size;
}
data->cmd_len -= cmd_size;
return {};
}

View file

@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase {
struct CmdData {
std::unique_ptr<uint8_t[]> command_buf;
CmdArgVec cmd_args; // represents the parsed command.
size_t cmd_len{0};
};
CmdData cmd;

View file

@ -3236,3 +3236,60 @@ async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: Dfl
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
assert await c_nodes[1].type("list") == "none"
@dfly_args({"cluster_mode": "yes"})
async def test_slot_migration_oom(df_factory):
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=4,
maxmemory="1024MB",
),
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=2,
maxmemory="512MB",
),
]
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await nodes[0].client.execute_command("DEBUG POPULATE 100 test 10000000")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)
logging.info("Start migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
# Wait for FATAL status
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FATAL", 300)
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FATAL")
# Node_0 slot-migration-status
status = await nodes[0].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
)
# Direction
assert status[0][0] == "out"
# Error message
assert status[0][4] == "INCOMING_MIGRATION_OOM"
# Node_1 slot-migration-status
status = await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
# Direction
assert status[0][0] == "in"
# Error message
assert status[0][4] == "INCOMING_MIGRATION_OOM"

View file

@ -449,6 +449,8 @@ def migrate(args):
continue
if len(sync_status) != 1:
die_with_err(f"Unexpected number of migrations {len(sync_status)}: {sync_status}")
if "FATAL" in sync_status[0]:
die_with_err(f"Error in migration {len(sync_status)}: {sync_status}")
if "FINISHED" in sync_status[0]:
print(f"Migration finished: {sync_status[0]}")
break