diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a8fb21db3..9367b8424 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -317,21 +317,33 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } - if (auto resp = ReadRespReply(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms)); !resp) { - LOG(WARNING) << resp.error(); - return false; - } + const absl::Time start = absl::Now(); + const absl::Duration timeout = + absl::Milliseconds(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms)); + while (true) { + const absl::Time now = absl::Now(); + const absl::Duration passed = now - start; + if (passed >= timeout) { + LOG(WARNING) << "Timeout fot ACK " << attempt; + return false; + } - if (!CheckRespFirstTypes({RespExpr::INT64})) { - LOG(WARNING) << "Incorrect response type: " - << facade::ToSV(LastResponseArgs().front().GetBuf()); - return false; - } + if (auto resp = ReadRespReply(absl::ToInt64Milliseconds(passed - timeout)); !resp) { + LOG(WARNING) << resp.error(); + return false; + } - const auto attempt_res = get(LastResponseArgs().front().u); - if (attempt_res != attempt) { - LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << attempt_res; - return false; + if (!CheckRespFirstTypes({RespExpr::INT64})) { + LOG(WARNING) << "Incorrect response type: " + << facade::ToSV(LastResponseArgs().front().GetBuf()); + return false; + } + + if (const auto res = get(LastResponseArgs().front().u); res == attempt) { + break; + } else { + LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << res; + } } auto is_error = CheckFlowsForErrors();