diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e7ba2f4e0..633bddcce 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -938,7 +938,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { if (!migration) return cntx->SendError(kIdNotFound); - if (!migration->Join()) { + if (!migration->Join(attempt)) { return cntx->SendError("Join timeout happened"); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 06938c669..bd1627d2c 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -53,14 +53,16 @@ class ClusterShardMigration { break; } - while (tx_data->opcode == journal::Op::FIN) { - VLOG(2) << "Attempt to finalize flow " << source_shard_id_; + while (tx_data->opcode == journal::Op::LSN) { + VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn; + last_attempt_.store(tx_data->lsn); bc->Dec(); // we can Join the flow now // if we get new data, attempt is failed if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) { VLOG(1) << "Finalized flow " << source_shard_id_; return; } + VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_; bc->Add(); // the flow isn't finished so we lock it again } if (tx_data->opcode == journal::Op::PING) { @@ -70,6 +72,7 @@ class ClusterShardMigration { } } + VLOG(2) << "Flow " << source_shard_id_ << " canceled"; bc->Dec(); // we should provide ability to join the flow } @@ -86,6 +89,10 @@ class ClusterShardMigration { return {}; } + long GetLastAttempt() const { + return last_attempt_.load(); + } + private: void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { if (cntx->IsCancelled()) { @@ -93,7 +100,8 @@ class ClusterShardMigration { } CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution if (!tx_data.IsGlobalCmd()) { - VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid; + VLOG(3) << "Execute cmd without sync between shards. cmd: " + << CmdArgList(tx_data.command.cmd_args); executor_.Execute(tx_data.dbid, tx_data.command); } else { // TODO check which global commands should be supported @@ -112,6 +120,7 @@ class ClusterShardMigration { util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_); JournalExecutor executor_; IncomingSlotMigration* in_migration_; + atomic_long last_attempt_{-1}; }; IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots, @@ -130,16 +139,29 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot IncomingSlotMigration::~IncomingSlotMigration() { } -bool IncomingSlotMigration::Join() { - auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; - if (bc_->WaitFor(timeout)) { - state_.store(MigrationState::C_FINISHED); - keys_number_ = cluster::GetKeyCount(slots_); - return true; +bool IncomingSlotMigration::Join(long attempt) { + const absl::Time start = absl::Now(); + const absl::Duration timeout = + absl::Milliseconds(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms)); + + while (true) { + const absl::Time now = absl::Now(); + const absl::Duration passed = now - start; + VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout; + if (passed >= timeout) { + LOG(WARNING) << "Can't join migration in time"; + ReportError(GenericError("Can't join migration in time")); + return false; + } + + if ((bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) && + (std::all_of(shard_flows_.begin(), shard_flows_.end(), + [&](const auto& flow) { return flow->GetLastAttempt() == attempt; }))) { + state_.store(MigrationState::C_FINISHED); + keys_number_ = cluster::GetKeyCount(slots_); + return true; + } } - LOG(WARNING) << "Can't join migration in time"; - ReportError(GenericError("Can't join migration in time")); - return false; } void IncomingSlotMigration::Stop() { @@ -159,7 +181,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou state_.store(MigrationState::C_SYNC); shard_flows_[shard]->Start(&cntx_, source, bc_); - VLOG(1) << "Incoming flow: " << shard << " finished for " << source_id_; + VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_; } size_t IncomingSlotMigration::GetKeyCount() const { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index d2e412130..059240253 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -30,7 +30,7 @@ class IncomingSlotMigration { // Waits until all flows got FIN opcode. // returns true if we joined false if timeout is readed // After Join we still can get data due to error situation - [[nodiscard]] bool Join(); + [[nodiscard]] bool Join(long attempt); // Stop migrations, can be called even after migration is finished void Stop(); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index fa3e2c1b2..9014a90ac 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -65,8 +65,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { streamer_.Cancel(); } - void Finalize() { - streamer_.SendFinalize(); + void Finalize(long attempt) { + streamer_.SendFinalize(attempt); } const dfly::GenericError GetError() const { @@ -270,9 +270,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { pause_fb_opt->JoinIfNeeded(); }); - auto cb = [this](util::ProactorBase* pb) { + auto cb = [this, attempt](util::ProactorBase* pb) { if (const auto* shard = EngineShard::tlocal(); shard) { - slot_migrations_[shard->shard_id()]->Finalize(); + slot_migrations_[shard->shard_id()]->Finalize(attempt); } }; @@ -302,7 +302,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } const auto attempt_res = get(LastResponseArgs().front().u); - if (attempt_res == kInvalidAttempt) { + if (attempt_res != attempt) { + LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << attempt_res; return false; } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6c6e00669..4812c6f80 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -231,9 +231,9 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { } while (cursor); } -void RestoreStreamer::SendFinalize() { - VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id(); - journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/); +void RestoreStreamer::SendFinalize(long attempt) { + VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt; + journal::Entry entry(journal::Op::LSN, attempt); io::StringSink sink; JournalWriter writer{&sink}; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 0ba178f3b..6773dc2af 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -80,7 +80,7 @@ class RestoreStreamer : public JournalStreamer { // Cancel() must be called if Start() is called void Cancel() override; - void SendFinalize(); + void SendFinalize(long attempt); bool IsSnapshotFinished() const { return snapshot_finished_;