This commit is contained in:
mkaruza 2025-05-10 07:28:25 +02:00 committed by GitHub
commit 0f996b0378
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 159 additions and 23 deletions

View file

@ -172,12 +172,11 @@ class ClusterShardInfos {
};
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
C_CONNECTING,
C_SYNC,
C_ERROR,
C_FINISHED,
};
enum class MigrationState : uint8_t { C_CONNECTING, C_SYNC, C_ERROR, C_FINISHED, C_FATAL };
// Errors during slot migration
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
static constexpr std::string_view kIncomingMigrationOOM = "INCOMING_MIGRATION_OOM";
// return error message if slot doesn't belong to this node
facade::ErrorReply SlotOwnershipError(SlotId slot_id);

View file

@ -726,6 +726,8 @@ static string_view StateToStr(MigrationState state) {
return "ERROR"sv;
case MigrationState::C_FINISHED:
return "FINISHED"sv;
case MigrationState::C_FATAL:
return "FATAL"sv;
}
DCHECK(false) << "Unknown State value " << static_cast<underlying_type_t<MigrationState>>(state);
return "UNDEFINED_STATE"sv;
@ -765,7 +767,6 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* b
};
for (const auto& m : incoming_migrations_jobs_) {
// TODO add error status
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(),
m->GetErrorStr());
}
@ -834,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
removed_slots.Merge(slots);
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
<< migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish();
migration.Finish(MigrationState::C_FINISHED);
res.migrations.push_back(std::move(*it));
outgoing_migration_jobs_.erase(it);
}
@ -925,7 +926,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
if (!migration) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
return builder->SendSimpleString(kUnknownMigration);
}
if (migration->GetState() != MigrationState::C_CONNECTING) {
@ -936,6 +937,10 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
DeleteSlots(slots);
}
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
}
migration->Init(flows_num);
return builder->SendOk();
@ -955,6 +960,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));
auto migration = GetIncomingMigration(source_id);
if (!migration) {
return builder->SendError(kIdNotFound);
}
@ -1033,7 +1039,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
return builder->SendError(OutgoingMigration::kUnknownMigration);
return builder->SendError(kUnknownMigration);
}
auto migration = GetIncomingMigration(source_id);
@ -1041,7 +1047,11 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendError(kIdNotFound);
if (!migration->Join(attempt)) {
return builder->SendError("Join timeout happened");
if (migration->GetState() == MigrationState::C_FATAL) {
return builder->SendError(absl::StrCat("-", kIncomingMigrationOOM));
} else {
return builder->SendError("Join timeout happened");
}
}
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);

View file

@ -70,6 +70,14 @@ class ClusterShardMigration {
break;
}
auto used_mem = used_mem_current.load(memory_order_relaxed);
// If aplying transaction data will reach 90% of max_memory_limit we end migration.
if ((used_mem + tx_data->command.cmd_len) > (0.9 * max_memory_limit)) {
cntx->ReportError(std::string{kIncomingMigrationOOM});
in_migration_->ReportFatalError(std::string{kIncomingMigrationOOM});
break;
}
while (tx_data->opcode == journal::Op::LSN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
last_attempt_.store(tx_data->lsn);
@ -79,6 +87,11 @@ class ClusterShardMigration {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
}
if (in_migration_->GetState() == MigrationState::C_FATAL) {
VLOG(1) << "Flow finalization " << source_shard_id_
<< " canceled due memory limit reached";
return;
}
if (!tx_data->command.cmd_args.empty()) {
VLOG(1) << "Flow finalization failed " << source_shard_id_ << " by "
<< tx_data->command.cmd_args[0];
@ -181,6 +194,11 @@ bool IncomingSlotMigration::Join(long attempt) {
return false;
}
// If any of migration shards reported ERROR (OOM) we can return error
if (GetState() == MigrationState::C_FATAL) {
return false;
}
// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
// we check current state and if WaitFor false but GetLastAttempt() == attempt
// the Join is failed and we can return false
@ -218,6 +236,11 @@ void IncomingSlotMigration::Stop() {
}
}
// Don't wait if we reached FATAL state
if (state_ == MigrationState::C_FATAL) {
return;
}
// we need to Join the migration process to prevent data corruption
const absl::Time start = absl::Now();
const absl::Duration timeout =
@ -251,7 +274,12 @@ void IncomingSlotMigration::Init(uint32_t shards_num) {
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
shard_flows_[shard]->Start(&cntx_, source);
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
VLOG(1) << "Incoming flow " << shard
<< (GetState() == MigrationState::C_FINISHED ? " finished " : " cancelled ") << "for "
<< source_id_;
if (GetState() == MigrationState::C_FATAL) {
Stop();
}
}
size_t IncomingSlotMigration::GetKeyCount() const {

View file

@ -50,10 +50,20 @@ class IncomingSlotMigration {
return source_id_;
}
// Switch to FATAL state and store error message
void ReportFatalError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(state_mu_, error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk_state(state_mu_);
util::fb2::LockGuard lk_error(error_mu_);
state_ = MigrationState::C_FATAL;
last_error_ = std::move(err);
}
void ReportError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk(error_mu_);
last_error_ = std::move(err);
if (GetState() != MigrationState::C_FATAL)
last_error_ = std::move(err);
}
std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) {
@ -75,6 +85,7 @@ class IncomingSlotMigration {
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
ExecutionState cntx_;
mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
std::atomic<size_t> errors_count_ = 0;

View file

@ -36,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
exec_st_.SwitchErrorHandler(
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
}
~SliceSlotMigration() {
@ -138,10 +139,8 @@ void OutgoingMigration::OnAllShards(
});
}
void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) {
if (error) {
next_state = MigrationState::C_ERROR;
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
exec_st_.ReportError(std::move(error));
@ -164,6 +163,7 @@ void OutgoingMigration::Finish(GenericError error) {
case MigrationState::C_SYNC:
case MigrationState::C_ERROR:
case MigrationState::C_FATAL:
should_cancel_flows = true;
break;
}
@ -221,6 +221,14 @@ void OutgoingMigration::SyncFb() {
continue;
}
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
// happen on second iteration after first failed with OOM. Sending second INIT is required to
// cleanup slots on incoming slot migration node.
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
ChangeState(MigrationState::C_FATAL);
break;
}
if (!CheckRespIsSimpleReply("OK")) {
if (CheckRespIsSimpleReply(kUnknownMigration)) {
const absl::Duration passed = absl::Now() - start_time;
@ -272,7 +280,11 @@ void OutgoingMigration::SyncFb() {
long attempt = 0;
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
// Break loop and don't sleep in case of C_FATAL
if (GetState() == MigrationState::C_FATAL) {
break;
}
// Process commands that were on pause and try again
VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms);
}
@ -355,6 +367,12 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
return false;
}
// Check OOM from incoming slot migration on ACK request
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM));
return false;
}
if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type for " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt
@ -371,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
if (!exec_st_.GetError()) {
Finish();
Finish(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false);

View file

@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start()
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
@ -76,8 +76,6 @@ class OutgoingMigration : private ProtocolClient {
size_t GetKeyCount() const ABSL_LOCKS_EXCLUDED(state_mu_);
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
private:
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);

View file

@ -33,7 +33,7 @@ template <typename... Ts> journal::ParsedEntry::CmdData BuildFromParts(Ts... par
start += part.size();
}
return {std::move(buf), std::move(slice_parts)};
return {std::move(buf), std::move(slice_parts), cmd_str.size()};
}
} // namespace

View file

@ -162,6 +162,7 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
size_t cmd_size = 0;
SET_OR_RETURN(ReadUInt<uint64_t>(), cmd_size);
data->cmd_len = cmd_size;
// Read all strings consecutively.
data->command_buf = make_unique<uint8_t[]>(cmd_size);
@ -174,6 +175,9 @@ std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data)
ptr += size;
cmd_size -= size;
}
data->cmd_len -= cmd_size;
return {};
}

View file

@ -73,6 +73,7 @@ struct ParsedEntry : public EntryBase {
struct CmdData {
std::unique_ptr<uint8_t[]> command_buf;
CmdArgVec cmd_args; // represents the parsed command.
size_t cmd_len{0};
};
CmdData cmd;

View file

@ -339,6 +339,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const {
ToSV(resp_args_.front().GetBuf()) == reply;
}
bool ProtocolClient::CheckRespSimpleError(string_view error) const {
return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR &&
ToSV(resp_args_.front().GetBuf()) == error;
}
bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types) const {
unsigned i = 0;
for (RespExpr::Type type : types) {

View file

@ -85,6 +85,9 @@ class ProtocolClient {
// Check if reps_args contains a simple reply.
bool CheckRespIsSimpleReply(std::string_view reply) const;
// Check if resp_args contains a simple error
bool CheckRespSimpleError(std::string_view error) const;
// Check resp_args contains the following types at front.
bool CheckRespFirstTypes(std::initializer_list<facade::RespExpr::Type> types) const;

View file

@ -3236,3 +3236,60 @@ async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: Dfl
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
assert await c_nodes[1].type("list") == "none"
@dfly_args({"cluster_mode": "yes"})
async def test_slot_migration_oom(df_factory):
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=4,
maxmemory="1024MB",
),
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
proactor_threads=2,
maxmemory="512MB",
),
]
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await nodes[0].client.execute_command("DEBUG POPULATE 100 test 10000000")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)
logging.info("Start migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
# Wait for FATAL status
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FATAL", 300)
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FATAL")
# Node_0 slot-migration-status
status = await nodes[0].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id
)
# Direction
assert status[0][0] == "out"
# Error message
assert status[0][4] == "INCOMING_MIGRATION_OOM"
# Node_1 slot-migration-status
status = await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
)
# Direction
assert status[0][0] == "in"
# Error message
assert status[0][4] == "INCOMING_MIGRATION_OOM"

View file

@ -449,6 +449,8 @@ def migrate(args):
continue
if len(sync_status) != 1:
die_with_err(f"Unexpected number of migrations {len(sync_status)}: {sync_status}")
if "FATAL" in sync_status[0]:
die_with_err(f"Error in migration {len(sync_status)}: {sync_status}")
if "FINISHED" in sync_status[0]:
print(f"Migration finished: {sync_status[0]}")
break