diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ba86f46be..fbde4ae22 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -827,7 +827,7 @@ void ClusterFamily::UpdateConfig(const std::vector& slots, bool enabl void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser{args}; - auto source_id = parser.Next(); + auto [source_id, attempt] = parser.Next(); if (auto err = parser.Error(); err) { return cntx->SendError(err->MakeReply()); @@ -848,7 +848,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { UpdateConfig(migration->GetSlots(), true); - cntx->SendOk(); + cntx->SendLong(attempt); } using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 645da31d9..9ef64ac49 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -81,14 +81,6 @@ OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); } -void OutgoingMigration::Finalize(uint32_t shard_id) { - slot_migrations_[shard_id]->Finalize(); -} - -void OutgoingMigration::Cancel(uint32_t shard_id) { - slot_migrations_[shard_id]->Cancel(); -} - MigrationState OutgoingMigration::GetState() const { return state_.load(); } @@ -115,6 +107,14 @@ void OutgoingMigration::SyncFb() { // TODO implement blocking on migrated slots only + long attempt = 0; + while (!FinishMigration(++attempt)) { + // process commands that were on pause and try again + ThisFiber::SleepFor(500ms); + } +} + +bool OutgoingMigration::FinishMigration(long attempt) { bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr, @@ -133,42 +133,50 @@ void OutgoingMigration::SyncFb() { if (const auto* shard = EngineShard::tlocal(); shard) { // TODO add error processing to move back into STABLE_SYNC state VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id(); - Finalize(shard->shard_id()); + slot_migrations_[shard->shard_id()]->Finalize(); } }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); - auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID()); + auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt); VLOG(1) << "send " << cmd; - auto err = SendCommandAndReadResponse(cmd); + auto err = SendCommand(cmd); LOG_IF(WARNING, err) << err; if (!err) { - LOG_IF(WARNING, !CheckRespIsSimpleReply("OK")) << ToSV(LastResponseArgs().front().GetBuf()); + long attempt_res = -1; + do { // we can have response from previos time so we need to read until get response for the + // last attempt + auto resp = ReadRespReply(absl::GetFlag(FLAGS_source_connect_timeout_ms)); + + if (!resp) { + LOG(WARNING) << resp.error(); + // TODO implement connection issue error processing + return false; + } + + if (!CheckRespFirstTypes({RespExpr::INT64})) { + LOG(WARNING) << "Incorrect response type: " + << facade::ToSV(LastResponseArgs().front().GetBuf()); + return false; + } + attempt_res = get(LastResponseArgs().front().u); + } while (attempt_res != attempt); shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { if (const auto* shard = EngineShard::tlocal(); shard) - Cancel(shard->shard_id()); + slot_migrations_[shard->shard_id()]->Cancel(); }); state_.store(MigrationState::C_FINISHED); + cf_->UpdateConfig(migration_info_.slot_ranges, false); + return true; + } else { + // TODO implement connection issue error processing } - - cf_->UpdateConfig(migration_info_.slot_ranges, false); -} - -void OutgoingMigration::Ack() { - auto cb = [this](util::ProactorBase* pb) { - if (const auto* shard = EngineShard::tlocal(); shard) { - Cancel(shard->shard_id()); - } - }; - - shard_set->pool()->AwaitFiberOnAll(std::move(cb)); - - state_.store(MigrationState::C_FINISHED); + return false; } std::error_code OutgoingMigration::Start(ConnectionContext* cntx) { diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 6f7c3955d..048bbb046 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -31,15 +31,8 @@ class OutgoingMigration : private ProtocolClient { // should be run for all shards void StartFlow(journal::Journal* journal, io::Sink* dest); - void Finalize(uint32_t shard_id); - void Cancel(uint32_t shard_id); - MigrationState GetState() const; - // Temporary method, will be removed in one of the PR - // This method stop migration connections - void Ack(); - const std::string& GetHostIp() const { return server().host; }; @@ -62,6 +55,7 @@ class OutgoingMigration : private ProtocolClient { class SliceSlotMigration; void SyncFb(); + bool FinishMigration(long attempt); private: MigrationInfo migration_info_; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index f741a9451..fe57fe410 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -67,7 +67,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal } void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { - VLOG(2) << "RestoreStreamer start"; + VLOG(1) << "RestoreStreamer start"; auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); @@ -100,8 +100,7 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { } void RestoreStreamer::SendFinalize() { - VLOG(2) << "DFLYMIGRATE FINALIZE for " - << " : " << db_slice_->shard_id(); + VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id(); journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/); JournalWriter writer{this}; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 7cbb47ce1..15af6cd05 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -321,6 +321,14 @@ io::Result ProtocolClient::ReadRespReply(base::IoBu return nonstd::make_unexpected(ec); } +io::Result ProtocolClient::ReadRespReply(uint32_t timeout) { + auto prev_timeout = sock_->timeout(); + sock_->set_timeout(timeout); + auto res = ReadRespReply(); + sock_->set_timeout(prev_timeout); + return res; +} + error_code ProtocolClient::ReadLine(base::IoBuf* io_buf, string_view* line) { size_t eol_pos; std::string_view input_str = ToSV(io_buf->InputBuffer()); diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 0b49c526a..448af4727 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -83,6 +83,7 @@ class ProtocolClient { // is done with the result of the call; Calling ConsumeInput may invalidate the data in the result // if the buffer relocates. io::Result ReadRespReply(base::IoBuf* buffer = nullptr, bool copy_msg = true); + io::Result ReadRespReply(uint32_t timeout); std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);