refactor: move tl_cluster_config into cluster_config.cc (#4714)

This commit is contained in:
Borys 2025-03-07 12:30:48 +02:00 committed by GitHub
parent e01aec2a21
commit d8500c9686
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 49 additions and 40 deletions

View file

@ -17,6 +17,9 @@ using namespace std;
namespace dfly::cluster {
namespace {
thread_local shared_ptr<ClusterConfig> tl_cluster_config;
bool HasValidNodeIds(const ClusterShardInfos& new_config) {
absl::flat_hash_set<string_view> nodes;
@ -390,4 +393,12 @@ std::vector<MigrationInfo> ClusterConfig::GetFinishedIncomingMigrations(
: std::vector<MigrationInfo>();
}
std::shared_ptr<ClusterConfig> ClusterConfig::Current() {
return tl_cluster_config;
}
void ClusterConfig::SetCurrent(std::shared_ptr<ClusterConfig> config) {
tl_cluster_config = std::move(config);
}
} // namespace dfly::cluster

View file

@ -52,6 +52,12 @@ class ClusterConfig {
return my_incoming_migrations_;
}
// Returns a thread-local pointer.
static std::shared_ptr<ClusterConfig> Current();
// Set a thread-local pointer.
static void SetCurrent(std::shared_ptr<ClusterConfig> config);
private:
struct SlotEntry {
const ClusterShardInfo* shard = nullptr;

View file

@ -7,12 +7,10 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include "cluster_config.h"
#include "facade/error.h"
#include "slot_set.h"
// TODO remove when tl_cluster_config will be moved out from it
#include "server/cluster/cluster_family.h"
using namespace std;
namespace dfly::cluster {
@ -63,7 +61,7 @@ ClusterShardInfos::ClusterShardInfos(std::vector<ClusterShardInfo> infos)
}
facade::ErrorReply SlotOwnershipError(SlotId slot_id) {
const cluster::ClusterConfig* cluster_config = ClusterFamily::cluster_config();
const auto cluster_config = ClusterConfig::Current();
if (!cluster_config)
return facade::ErrorReply{facade::kClusterNotConfigured};

View file

@ -63,13 +63,11 @@ constexpr char kIdNotFound[] = "syncid not found";
constexpr string_view kClusterDisabled =
"Cluster is disabled. Enabled via passing --cluster_mode=emulated|yes";
thread_local shared_ptr<ClusterConfig> tl_cluster_config;
ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
CHECK(!IsClusterEmulated());
CHECK(tl_cluster_config != nullptr);
CHECK(ClusterConfig::Current() != nullptr);
auto config = tl_cluster_config->GetConfig();
auto config = ClusterConfig::Current()->GetConfig();
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
return config;
}
@ -102,21 +100,18 @@ ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(serve
}
}
ClusterConfig* ClusterFamily::cluster_config() {
return tl_cluster_config.get();
}
void ClusterFamily::Shutdown() {
shard_set->pool()->at(0)->Await([this]() ABSL_LOCKS_EXCLUDED(set_config_mu) {
PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock
{
util::fb2::LockGuard lk(set_config_mu);
if (!tl_cluster_config)
if (!ClusterConfig::Current())
return;
auto empty_config = tl_cluster_config->CloneWithoutMigrations();
outgoing_migrations = TakeOutOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
auto empty_config = ClusterConfig::Current()->CloneWithoutMigrations();
outgoing_migrations = TakeOutOutgoingMigrations(empty_config, ClusterConfig::Current());
RemoveIncomingMigrations(
empty_config->GetFinishedIncomingMigrations(ClusterConfig::Current()));
util::fb2::LockGuard migration_lk(migration_mu_);
DCHECK(outgoing_migration_jobs_.empty());
@ -130,7 +125,7 @@ std::optional<ClusterShardInfos> ClusterFamily::GetShardInfos(ConnectionContext*
return {GetEmulatedShardInfo(cntx)};
}
if (tl_cluster_config != nullptr) {
if (ClusterConfig::Current() != nullptr) {
return GetConfigForStats(cntx);
}
return nullopt;
@ -565,7 +560,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return builder->SendError("Invalid cluster configuration.");
} else if (tl_cluster_config && tl_cluster_config->GetConfig() == new_config->GetConfig()) {
} else if (ClusterConfig::Current() &&
ClusterConfig::Current()->GetConfig() == new_config->GetConfig()) {
return builder->SendOk();
}
@ -575,8 +571,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
VLOG(1) << "Setting new cluster config: " << json_str;
util::fb2::LockGuard gu(set_config_mu);
outgoing_migrations = TakeOutOutgoingMigrations(new_config, tl_cluster_config);
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
outgoing_migrations = TakeOutOutgoingMigrations(new_config, ClusterConfig::Current());
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(ClusterConfig::Current()));
SlotRanges enable_slots, disable_slots;
@ -598,9 +594,10 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
new_config = new_config->CloneWithChanges(enable_slots, disable_slots);
StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config));
StartSlotMigrations(new_config->GetNewOutgoingMigrations(ClusterConfig::Current()));
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
SlotSet before =
ClusterConfig::Current() ? ClusterConfig::Current()->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
@ -614,18 +611,18 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase*) {
server_family_->CancelBlockingOnThread(blocking_filter);
tl_cluster_config = new_config;
ClusterConfig::SetCurrent(new_config);
tracker.TrackOnThread();
};
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr);
DCHECK(ClusterConfig::Current() != nullptr);
if (!tracker.Wait(absl::Seconds(1))) {
LOG(WARNING) << "Cluster config change timed for: " << MyID();
}
SlotSet after = tl_cluster_config->GetOwnedSlots();
SlotSet after = ClusterConfig::Current()->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
deleted_slots.Merge(outgoing_migrations.slot_ranges);
@ -859,7 +856,7 @@ bool RemoveIncomingMigrationImpl(std::vector<std::shared_ptr<IncomingSlotMigrati
// Flush non-owned migrations
SlotSet migration_slots(migration->GetSlots());
SlotSet removed = migration_slots.GetRemovedSlots(tl_cluster_config->GetOwnedSlots());
SlotSet removed = migration_slots.GetRemovedSlots(ClusterConfig::Current()->GetOwnedSlots());
migration->Stop();
// all migration fibers has migration shared_ptr so the object can be removed later
@ -903,7 +900,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
SlotRanges slot_ranges(std::move(slots));
const auto& incoming_migrations = cluster_config()->GetIncomingMigrations();
const auto& incoming_migrations = ClusterConfig::Current()->GetIncomingMigrations();
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
[source_id = source_id, &slot_ranges](const MigrationInfo& info) {
return info.node_info.id == source_id && info.slot_ranges == slot_ranges;
@ -990,14 +987,14 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
return;
}
auto new_config = is_incoming ? tl_cluster_config->CloneWithChanges(slots, {})
: tl_cluster_config->CloneWithChanges({}, slots);
auto new_config = is_incoming ? ClusterConfig::Current()->CloneWithChanges(slots, {})
: ClusterConfig::Current()->CloneWithChanges({}, slots);
// we don't need to use DispatchTracker here because for IncomingMingration we don't have
// connectionas that should be tracked and for Outgoing migration we do it under Pause
server_family_->service().proactor_pool().AwaitFiberOnAll(
[&new_config](util::ProactorBase*) { tl_cluster_config = new_config; });
DCHECK(tl_cluster_config != nullptr);
[&new_config](util::ProactorBase*) { ClusterConfig::SetCurrent(new_config); });
DCHECK(ClusterConfig::Current() != nullptr);
VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID()
<< " : " << node_id;
}
@ -1011,7 +1008,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
}
VLOG(1) << "DFLYMIGRATE ACK" << args;
auto in_migrations = tl_cluster_config->GetIncomingMigrations();
auto in_migrations = ClusterConfig::Current()->GetIncomingMigrations();
auto m_it =
std::find_if(in_migrations.begin(), in_migrations.end(),
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });

View file

@ -32,9 +32,6 @@ class ClusterFamily {
void Shutdown() ABSL_LOCKS_EXCLUDED(set_config_mu);
// Returns a thread-local pointer.
static ClusterConfig* cluster_config();
void ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots,
bool is_outgoing);

View file

@ -551,10 +551,10 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
}
if (IsClusterEnabled()) {
if (cluster::ClusterFamily::cluster_config() == nullptr) {
if (cluster::ClusterConfig::Current() == nullptr) {
resp.body() += "<h2>Not yet configured.</h2>\n";
} else {
auto config = cluster::ClusterFamily::cluster_config()->GetConfig();
auto config = cluster::ClusterConfig::Current()->GetConfig();
for (const auto& shard : config) {
resp.body() += "<div class='master'>\n";
resp.body() += "<h3>Master</h3>\n";

View file

@ -34,7 +34,7 @@ extern "C" {
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/cluster/cluster_family.h"
#include "server/cluster/cluster_config.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
@ -2731,8 +2731,8 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
bool RdbLoader::ShouldDiscardKey(std::string_view key, ObjSettings* settings) const {
if (!load_unowned_slots_ && IsClusterEnabled()) {
const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config();
if (cluster_config != nullptr && !cluster_config->IsMySlot(key)) {
const auto cluster_config = cluster::ClusterConfig::Current();
if (cluster_config && !cluster_config->IsMySlot(key)) {
return true;
}
}