fix: deadlock in the cluster migration process (#3653)

This commit is contained in:
Borys 2024-09-05 21:55:15 +03:00 committed by GitHub
parent a1e9ee1b6d
commit 2cc2a23247
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 97 additions and 71 deletions

View file

@ -85,17 +85,20 @@ ClusterConfig* ClusterFamily::cluster_config() {
void ClusterFamily::Shutdown() {
shard_set->pool()->at(0)->Await([this]() ABSL_LOCKS_EXCLUDED(set_config_mu) {
util::fb2::LockGuard lk(set_config_mu);
if (!tl_cluster_config)
return;
PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock
{
util::fb2::LockGuard lk(set_config_mu);
if (!tl_cluster_config)
return;
auto empty_config = tl_cluster_config->CloneWithoutMigrations();
RemoveOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
auto empty_config = tl_cluster_config->CloneWithoutMigrations();
outgoing_migrations = TakeOutOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
util::fb2::LockGuard migration_lk(migration_mu_);
DCHECK(outgoing_migration_jobs_.empty());
DCHECK(incoming_migrations_jobs_.empty());
util::fb2::LockGuard migration_lk(migration_mu_);
DCHECK(outgoing_migration_jobs_.empty());
DCHECK(incoming_migrations_jobs_.empty());
}
});
}
@ -532,66 +535,71 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
return cntx->SendError("Invalid cluster configuration.");
}
util::fb2::LockGuard gu(set_config_mu);
VLOG(1) << "Setting new cluster config: " << json_str;
auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config);
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
SlotRanges enable_slots, disable_slots;
PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock
{
util::fb2::LockGuard lk(migration_mu_);
// If migration state is changed simultaneously, the changes to config will be applied after
// set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem
for (const auto& m : incoming_migrations_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
enable_slots.Merge(m->GetSlots());
VLOG(1) << "Setting new cluster config: " << json_str;
util::fb2::LockGuard gu(set_config_mu);
outgoing_migrations = TakeOutOutgoingMigrations(new_config, tl_cluster_config);
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
SlotRanges enable_slots, disable_slots;
{
util::fb2::LockGuard lk(migration_mu_);
// If migration state is changed simultaneously, the changes to config will be applied after
// set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem
for (const auto& m : incoming_migrations_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
enable_slots.Merge(m->GetSlots());
}
}
for (const auto& m : outgoing_migration_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
disable_slots.Merge(m->GetSlots());
}
}
}
for (const auto& m : outgoing_migration_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
disable_slots.Merge(m->GetSlots());
}
new_config = new_config->CloneWithChanges(enable_slots, disable_slots);
StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config));
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
true /* ignore paused */, true /* ignore blocked */};
auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved =
any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
};
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase*) {
server_family_->CancelBlockingOnThread(blocking_filter);
tl_cluster_config = new_config;
tracker.TrackOnThread();
};
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);
if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed for: " << MyID();
}
}
new_config = new_config->CloneWithChanges(enable_slots, disable_slots);
StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config));
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
true /* ignore paused */, true /* ignore blocked */};
auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
};
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase*) {
server_family_->CancelBlockingOnThread(blocking_filter);
tl_cluster_config = new_config;
tracker.TrackOnThread();
};
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);
if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed for: " << MyID();
}
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.Merge(out_migrations_slots);
DeleteSlots(deleted_slots);
LOG_IF(INFO, !deleted_slots.Empty())
<< "Flushing newly unowned slots: " << deleted_slots.ToString();
WriteFlushSlotsToJournal(deleted_slots);
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.Merge(outgoing_migrations.slot_ranges);
DeleteSlots(deleted_slots);
LOG_IF(INFO, !deleted_slots.Empty())
<< "Flushing newly unowned slots: " << deleted_slots.ToString();
WriteFlushSlotsToJournal(deleted_slots);
}
}
return cntx->SendOk();
@ -755,11 +763,15 @@ std::shared_ptr<IncomingSlotMigration> ClusterFamily::GetIncomingMigration(
return nullptr;
}
SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
shared_ptr<ClusterConfig> old_config) {
ClusterFamily::PreparedToRemoveOutgoingMigrations::~PreparedToRemoveOutgoingMigrations() = default;
[[nodiscard]] ClusterFamily::PreparedToRemoveOutgoingMigrations
ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
shared_ptr<ClusterConfig> old_config) {
auto migrations = new_config->GetFinishedOutgoingMigrations(old_config);
util::fb2::LockGuard lk(migration_mu_);
SlotRanges removed_slots;
PreparedToRemoveOutgoingMigrations res;
for (const auto& m : migrations) {
auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(),
[&m](const auto& om) {
@ -774,15 +786,16 @@ SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr<ClusterConfig> new
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
<< migration.GetHostIp() << ":" << migration.GetPort();
migration.Finish();
res.migrations.push_back(std::move(*it));
outgoing_migration_jobs_.erase(it);
}
// Flush non-owned migrations
SlotSet migration_slots(removed_slots);
SlotSet removed = migration_slots.GetRemovedSlots(new_config->GetOwnedSlots());
res.slot_ranges = migration_slots.GetRemovedSlots(new_config->GetOwnedSlots()).ToSlotRanges();
// Flushing of removed slots is done outside this function.
return removed.ToSlotRanges();
return res;
}
namespace {

View file

@ -85,8 +85,16 @@ class ClusterFamily {
ABSL_LOCKS_EXCLUDED(migration_mu_);
void StartSlotMigrations(std::vector<MigrationInfo> migrations);
SlotRanges RemoveOutgoingMigrations(std::shared_ptr<ClusterConfig> new_config,
std::shared_ptr<ClusterConfig> old_config)
// must be destroyed excluded set_config_mu and migration_mu_ locks
struct PreparedToRemoveOutgoingMigrations {
std::vector<std::shared_ptr<OutgoingMigration>> migrations;
SlotRanges slot_ranges;
~PreparedToRemoveOutgoingMigrations() ABSL_LOCKS_EXCLUDED(migration_mu_, set_config_mu);
};
[[nodiscard]] PreparedToRemoveOutgoingMigrations TakeOutOutgoingMigrations(
std::shared_ptr<ClusterConfig> new_config, std::shared_ptr<ClusterConfig> old_config)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations)
ABSL_LOCKS_EXCLUDED(migration_mu_);

View file

@ -1088,7 +1088,7 @@ async def test_cluster_flushall_during_migration(
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9,incoming_slot_migration=9",
vmodule="cluster_family=9,outgoing_slot_migration=9,incoming_slot_migration=9",
logtostdout=True,
)
for i in range(2)
@ -1142,7 +1142,12 @@ async def test_cluster_flushall_during_migration(
async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool):
# Check data migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)
for i in range(2)
]
df_factory.start_all(instances)