fix: migration finalization process (#4576)

* fix: migration finalization process
This commit is contained in:
Borys 2025-02-10 15:43:32 +02:00 committed by GitHub
parent 386ab3902b
commit d0087aaa6a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 35 additions and 25 deletions

View file

@ -192,12 +192,26 @@ bool IncomingSlotMigration::Join(long attempt) {
return false;
}
if ((bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) &&
(std::all_of(shard_flows_.begin(), shard_flows_.end(),
[&](const auto& flow) { return flow->GetLastAttempt() == attempt; }))) {
state_.store(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(slots_);
return true;
// if data was sent after LSN, WaitFor() always returns false so to reduce wait time
// we check current state and if WaitFor false but GetLastAttempt() == attempt
// the Join is failed and we can return false
const auto remaining_time = absl::ToInt64Milliseconds(timeout - passed);
const auto wait_time = (remaining_time > 100 ? 100 : remaining_time) * 1ms;
const auto is_attempt_correct =
std::all_of(shard_flows_.begin(), shard_flows_.end(),
[attempt](const auto& flow) { return flow->GetLastAttempt() == attempt; });
auto wait_res = bc_->WaitFor(wait_time);
if (is_attempt_correct) {
if (wait_res) {
state_.store(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(slots_);
} else {
LOG(WARNING) << "Can't join migration because of data after LSN for " << source_id_;
ReportError(GenericError("Can't join migration in time"));
}
return wait_res;
}
}
}

View file

@ -143,12 +143,12 @@ void OutgoingMigration::Finish(GenericError error) {
auto next_state = MigrationState::C_FINISHED;
if (error) {
next_state = MigrationState::C_ERROR;
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
cntx_.ReportError(std::move(error));
} else {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
LOG(INFO) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
}
bool should_cancel_flows = false;
@ -285,14 +285,12 @@ void OutgoingMigration::SyncFb() {
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
LOG(INFO) << "Finalize migration for " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
if (attempt > 1) {
if (cntx_.GetError()) {
return true;
}
VLOG(1) << "Reconnecting " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id
@ -316,20 +314,19 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
if (pause_fb_opt) {
is_block_active = false;
pause_fb_opt->JoinIfNeeded();
}
});
VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
LOG(INFO) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
OnAllShards([attempt](auto& migration) { migration->Finalize(attempt); });
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
VLOG(1) << "send " << cmd;
auto err = SendCommand(cmd);
LOG_IF(WARNING, err) << err;
if (err) {
if (auto err = SendCommand(cmd); err) {
LOG(WARNING) << "Error during sending DFLYMIGRATE ACK: " << err.message();
return false;
}

View file

@ -252,8 +252,7 @@ using AggregateGenericError = AggregateValue<GenericError>;
// Then a special error handler is run, if present, and the ExecutionState is cancelled. The error
// handler is run in a separate handler to free up the caller.
//
// Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error.
// This allows running the error handler and representing this scenario as an error.
// ReportCancelError() reporting an `errc::operation_canceled` error.
class ExecutionState : protected Cancellation {
public:
using ErrHandler = std::function<void(const GenericError&)>;
@ -265,8 +264,8 @@ class ExecutionState : protected Cancellation {
~ExecutionState();
void
ReportCancelError(); // Cancels the context by submitting an `errc::operation_canceled` error.
// Cancels the context by submitting an `errc::operation_canceled` error.
void ReportCancelError();
using Cancellation::IsCancelled;
const Cancellation* GetCancellation() const;