fix(cluster-migration): Support cancelling migration right after starting it (#2992)

* fix(cluster-migration): Support cancelling migration right after starting it

This fixes a few small places, but most importantly it does not allow a
migration to start before both the outgoing and incoming side received
the updated config. This solves a few edge cases.

Fixes #2968

* add TODO

* fix test

* gh comments and fixes

* add comment
This commit is contained in:
Shahar Mike 2024-05-02 15:50:42 +03:00 committed by GitHub
parent a95419b0c4
commit 082aba02ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 74 additions and 52 deletions

View file

@ -814,8 +814,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations) {
lock_guard lk(migration_mu_);
for (const auto& m : migrations) {
auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_id);
DCHECK(was_removed);
RemoveIncomingMigrationImpl(incoming_migrations_jobs_, m.node_id);
VLOG(1) << "Migration was canceled from: " << m.node_id;
}
}
@ -835,6 +834,17 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
const auto& incoming_migrations = cluster_config()->GetIncomingMigrations();
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
[&](const MigrationInfo& info) {
// TODO: also compare slot ranges (in an order-agnostic way)
return info.node_id == source_id;
});
if (!found) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return cntx->SendError(OutgoingMigration::kUnknownMigration);
}
VLOG(1) << "Init migration " << source_id;
lock_guard lk(migration_mu_);

View file

@ -109,7 +109,7 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
service_(*se),
slots_(std::move(slots)),
state_(MigrationState::C_CONNECTING),
bc_(shards_num) {
bc_(0) {
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_));
@ -137,6 +137,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
VLOG(1) << "Start flow for shard: " << shard;
state_.store(MigrationState::C_SYNC);
bc_->Add();
shard_flows_[shard]->Start(&cntx_, source, bc_);
}

View file

@ -57,11 +57,17 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
return;
}
// Check if migration was cancelled while we yielded so far.
if (cancelled_) {
return;
}
streamer_.Start(Sock());
}
void Cancel() {
streamer_.Cancel();
cancelled_ = true;
}
void Finalize() {
@ -74,6 +80,7 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
private:
RestoreStreamer streamer_;
bool cancelled_ = false;
};
OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf)
@ -88,26 +95,42 @@ OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
}
bool OutgoingMigration::ChangeState(MigrationState new_state) {
std::lock_guard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) {
return false;
}
state_ = new_state;
return true;
}
void OutgoingMigration::Finish(bool is_error) {
std::lock_guard lk(finish_mu_);
if (state_.load() != MigrationState::C_FINISHED) {
const auto new_state = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
state_.store(new_state);
const auto new_state = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
if (ChangeState(new_state)) {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard)
slot_migrations_[shard->shard_id()]->Cancel();
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
if (flow != nullptr) {
flow->Cancel();
}
}
});
}
}
MigrationState OutgoingMigration::GetState() const {
return state_.load();
std::lock_guard lk(state_mu_);
return state_;
}
void OutgoingMigration::SyncFb() {
// we retry starting migration until "cancel" is happened
while (state_.load() != MigrationState::C_FINISHED) {
state_.store(MigrationState::C_CONNECTING);
while (GetState() != MigrationState::C_FINISHED) {
if (!ChangeState(MigrationState::C_CONNECTING)) {
break;
}
last_error_ = cntx_.GetError();
cntx_.Reset(nullptr);
@ -137,11 +160,15 @@ void OutgoingMigration::SyncFb() {
}
if (!CheckRespIsSimpleReply("OK")) {
cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf()))));
if (!CheckRespIsSimpleReply(kUnknownMigration)) {
cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf()))));
}
continue;
}
state_.store(MigrationState::C_SYNC);
if (!ChangeState(MigrationState::C_SYNC)) {
break;
}
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
@ -163,7 +190,7 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Migrations snapshot is finished";
long attempt = 0;
while (state_.load() != MigrationState::C_FINISHED && !FinalyzeMigration(++attempt)) {
while (GetState() != MigrationState::C_FINISHED && !FinalizeMigration(++attempt)) {
// process commands that were on pause and try again
ThisFiber::SleepFor(500ms);
}
@ -174,7 +201,7 @@ void OutgoingMigration::SyncFb() {
}
}
bool OutgoingMigration::FinalyzeMigration(long attempt) {
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more
// time
if (attempt > 1) {

View file

@ -54,6 +54,7 @@ class OutgoingMigration : private ProtocolClient {
}
static constexpr long kInvalidAttempt = -1;
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
private:
// should be run for all shards
@ -68,11 +69,12 @@ class OutgoingMigration : private ProtocolClient {
void SyncFb();
// return true if migration is finalized even with C_ERROR state
bool FinalyzeMigration(long attempt);
bool FinalizeMigration(long attempt);
bool ChangeState(MigrationState new_state) ABSL_LOCKS_EXCLUDED(state_mu_);
private:
MigrationInfo migration_info_;
mutable util::fb2::Mutex finish_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;
ServerFamily* server_family_;
ClusterFamily* cf_;
@ -80,8 +82,8 @@ class OutgoingMigration : private ProtocolClient {
util::fb2::Fiber main_sync_fb_;
// Atomic only for simple read operation, writes - from the same thread, reads - from any thread
std::atomic<MigrationState> state_ = MigrationState::C_NO_STATE;
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;
};
} // namespace dfly::cluster

View file

@ -77,11 +77,15 @@ async def push_config(config, admin_connections):
async def wait_for_status(admin_client, node_id, status):
while status not in await admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
):
logging.debug("SLOT-MIGRATION-STATUS is not %s", status)
await asyncio.sleep(0.05)
while True:
response = await admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
)
if status in response:
break
else:
logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}")
await asyncio.sleep(0.05)
async def get_node_id(admin_connection):
@ -955,42 +959,21 @@ async def test_config_consistency(df_local_factory: DflyInstanceFactory):
]
"""
# push config only to source node
# Push config to source node. Migration will not start until target node gets the config as well.
await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
[c_nodes_admin[0]],
)
await wait_for_status(c_nodes_admin[0], node_ids[1], "CONNECTING")
await wait_for_status(c_nodes_admin[1], node_ids[0], "NO_STATE")
while "SYNC" not in await c_nodes_admin[0].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[1]
):
logging.debug("source SLOT-MIGRATION-STATUS is not SYNC")
await asyncio.sleep(0.05)
while "SYNC" not in await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
):
logging.debug("target SLOT-MIGRATION-STATUS is not SYNC")
await asyncio.sleep(0.05)
# migration shouldn't be finished until we set the same config to target node
await asyncio.sleep(0.5)
# push config to target node
await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
[c_nodes_admin[1]],
)
while "FINISHED" not in await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
):
logging.debug("SLOT-MIGRATION-STATUS is not FINISHED")
await asyncio.sleep(0.05)
assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
f"""out {node_ids[1]} FINISHED keys:0 errors: 0"""
]
await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED")
await wait_for_status(c_nodes_admin[0], node_ids[1], "FINISHED")
# remove finished migrations
await push_config(
@ -1416,7 +1399,6 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
]
logging.debug("Migrating slots 6000-8000")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await asyncio.sleep(0.5)
logging.debug("Cancelling migration")
nodes[0].migrations = []