fix: fix deadlock in DFLYCLUSTER CONFIG command and outgoing migration finaliztion (#3239)

fix: fix deadlock in DFLYCLUSTER CONFIG command and outgoing migration finalization
This commit is contained in:
Borys 2024-06-29 11:41:23 +03:00 committed by GitHub
parent 0662b50578
commit 13169461ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 37 additions and 17 deletions

View file

@ -434,8 +434,6 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
}
namespace {
// Guards set configuration, so that we won't handle 2 in parallel.
util::fb2::Mutex set_config_mu;
void DeleteSlots(const SlotRanges& slots_ranges) {
if (slots_ranges.empty()) {
@ -511,9 +509,6 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config);
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
// Prevent simultaneous config update from outgoing migration
lock_guard config_update_lk(config_update_mu_);
SlotRanges enable_slots, disable_slots;
{
@ -542,14 +537,14 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
false /* ignore paused */, true /* ignore blocked */};
true /* ignore paused */, true /* ignore blocked */};
auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
};
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase* pb) {
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase*) {
server_family_->CancelBlockingOnThread(blocking_filter);
tl_cluster_config = new_config;
tracker.TrackOnThread();
@ -884,14 +879,36 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
migration->StartFlow(shard_id, cntx->conn()->socket());
}
void ClusterFamily::UpdateConfig(const std::vector<SlotRange>& slots, bool enable) {
lock_guard gu(config_update_mu_);
void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
const SlotRanges& slots, bool is_incoming) {
lock_guard gu(set_config_mu);
lock_guard lk(migration_mu_);
bool is_migration_valid = false;
if (is_incoming) {
for (const auto& mj : incoming_migrations_jobs_) {
if (mj->GetSourceID() == node_id) {
// TODO add compare for slots
is_migration_valid = true;
}
}
} else {
for (const auto& mj : outgoing_migration_jobs_) {
if (mj->GetMigrationInfo().node_id == node_id) {
// TODO add compare for slots
is_migration_valid = true;
}
}
}
if (!is_migration_valid)
return;
auto new_config = enable ? tl_cluster_config->CloneWithChanges(slots, {})
: tl_cluster_config->CloneWithChanges({}, slots);
auto new_config = is_incoming ? tl_cluster_config->CloneWithChanges(slots, {})
: tl_cluster_config->CloneWithChanges({}, slots);
shard_set->pool()->AwaitFiberOnAll(
[&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; });
// we don't need to use DispatchTracker here because for IncomingMingration we don't have
// connectionas that should be tracked and for Outgoing migration we do it under Pause
server_family_->service().proactor_pool().AwaitFiberOnAll(
[&new_config](util::ProactorBase*) { tl_cluster_config = new_config; });
DCHECK(tl_cluster_config != nullptr);
}
@ -922,7 +939,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Migration is joined for " << source_id;
UpdateConfig(migration->GetSlots(), true);
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);
VLOG(1) << "Config is updated for " << MyID();
return cntx->SendLong(attempt);

View file

@ -31,7 +31,8 @@ class ClusterFamily {
// Returns a thread-local pointer.
ClusterConfig* cluster_config();
void UpdateConfig(const std::vector<SlotRange>& slots, bool enable);
void ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots,
bool is_outgoing);
const std::string& MyID() const {
return id_;
@ -97,7 +98,8 @@ class ClusterFamily {
private:
ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;
mutable util::fb2::Mutex config_update_mu_;
// Guards set configuration, so that we won't handle 2 in parallel.
mutable util::fb2::Mutex set_config_mu;
std::string id_;

View file

@ -309,7 +309,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
Finish(is_error);
if (!is_error) {
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->UpdateConfig(migration_info_.slot_ranges, false);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_id, migration_info_.slot_ranges,
false);
VLOG(1) << "Config is updated for " << cf_->MyID();
}
return true;