feat(cluster): add repeated ACK if an error is happened (#2892)

This commit is contained in:
Borys 2024-04-12 16:20:19 +03:00 committed by GitHub
parent c2f13993d9
commit 9bed3390d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 49 additions and 39 deletions

View file

@ -827,7 +827,7 @@ void ClusterFamily::UpdateConfig(const std::vector<SlotRange>& slots, bool enabl
void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto source_id = parser.Next<std::string_view>();
auto [source_id, attempt] = parser.Next<std::string_view, long>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
@ -848,7 +848,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
UpdateConfig(migration->GetSlots(), true);
cntx->SendOk();
cntx->SendLong(attempt);
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);

View file

@ -81,14 +81,6 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}
void OutgoingMigration::Finalize(uint32_t shard_id) {
slot_migrations_[shard_id]->Finalize();
}
void OutgoingMigration::Cancel(uint32_t shard_id) {
slot_migrations_[shard_id]->Cancel();
}
MigrationState OutgoingMigration::GetState() const {
return state_.load();
}
@ -115,6 +107,14 @@ void OutgoingMigration::SyncFb() {
// TODO implement blocking on migrated slots only
long attempt = 0;
while (!FinishMigration(++attempt)) {
// process commands that were on pause and try again
ThisFiber::SleepFor(500ms);
}
}
bool OutgoingMigration::FinishMigration(long attempt) {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr,
@ -133,42 +133,50 @@ void OutgoingMigration::SyncFb() {
if (const auto* shard = EngineShard::tlocal(); shard) {
// TODO add error processing to move back into STABLE_SYNC state
VLOG(1) << "FINALIZE outgoing migration" << shard->shard_id();
Finalize(shard->shard_id());
slot_migrations_[shard->shard_id()]->Finalize();
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID());
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
VLOG(1) << "send " << cmd;
auto err = SendCommandAndReadResponse(cmd);
auto err = SendCommand(cmd);
LOG_IF(WARNING, err) << err;
if (!err) {
LOG_IF(WARNING, !CheckRespIsSimpleReply("OK")) << ToSV(LastResponseArgs().front().GetBuf());
long attempt_res = -1;
do { // we can have response from previos time so we need to read until get response for the
// last attempt
auto resp = ReadRespReply(absl::GetFlag(FLAGS_source_connect_timeout_ms));
if (!resp) {
LOG(WARNING) << resp.error();
// TODO implement connection issue error processing
return false;
}
if (!CheckRespFirstTypes({RespExpr::INT64})) {
LOG(WARNING) << "Incorrect response type: "
<< facade::ToSV(LastResponseArgs().front().GetBuf());
return false;
}
attempt_res = get<long>(LastResponseArgs().front().u);
} while (attempt_res != attempt);
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard)
Cancel(shard->shard_id());
slot_migrations_[shard->shard_id()]->Cancel();
});
state_.store(MigrationState::C_FINISHED);
cf_->UpdateConfig(migration_info_.slot_ranges, false);
return true;
} else {
// TODO implement connection issue error processing
}
cf_->UpdateConfig(migration_info_.slot_ranges, false);
}
void OutgoingMigration::Ack() {
auto cb = [this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
Cancel(shard->shard_id());
}
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
state_.store(MigrationState::C_FINISHED);
return false;
}
std::error_code OutgoingMigration::Start(ConnectionContext* cntx) {

View file

@ -31,15 +31,8 @@ class OutgoingMigration : private ProtocolClient {
// should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest);
void Finalize(uint32_t shard_id);
void Cancel(uint32_t shard_id);
MigrationState GetState() const;
// Temporary method, will be removed in one of the PR
// This method stop migration connections
void Ack();
const std::string& GetHostIp() const {
return server().host;
};
@ -62,6 +55,7 @@ class OutgoingMigration : private ProtocolClient {
class SliceSlotMigration;
void SyncFb();
bool FinishMigration(long attempt);
private:
MigrationInfo migration_info_;

View file

@ -67,7 +67,7 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal
}
void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
VLOG(2) << "RestoreStreamer start";
VLOG(1) << "RestoreStreamer start";
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
@ -100,8 +100,7 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
}
void RestoreStreamer::SendFinalize() {
VLOG(2) << "DFLYMIGRATE FINALIZE for "
<< " : " << db_slice_->shard_id();
VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id();
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
JournalWriter writer{this};

View file

@ -321,6 +321,14 @@ io::Result<ProtocolClient::ReadRespRes> ProtocolClient::ReadRespReply(base::IoBu
return nonstd::make_unexpected(ec);
}
io::Result<ProtocolClient::ReadRespRes> ProtocolClient::ReadRespReply(uint32_t timeout) {
auto prev_timeout = sock_->timeout();
sock_->set_timeout(timeout);
auto res = ReadRespReply();
sock_->set_timeout(prev_timeout);
return res;
}
error_code ProtocolClient::ReadLine(base::IoBuf* io_buf, string_view* line) {
size_t eol_pos;
std::string_view input_str = ToSV(io_buf->InputBuffer());

View file

@ -83,6 +83,7 @@ class ProtocolClient {
// is done with the result of the call; Calling ConsumeInput may invalidate the data in the result
// if the buffer relocates.
io::Result<ReadRespRes> ReadRespReply(base::IoBuf* buffer = nullptr, bool copy_msg = true);
io::Result<ReadRespRes> ReadRespReply(uint32_t timeout);
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);