diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index a8be6ac6b..14412c906 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -52,7 +52,7 @@ add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc cluster/cluster_family.cc cluster/incoming_slot_migration.cc - cluster/outgoing_slot_migration.cc + cluster/outgoing_slot_migration.cc cluster/cluster_defs.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index d1f5d71e1..0be5c4727 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -1,64 +1,17 @@ -#include - -extern "C" { -#include "redis/crc16.h" -} +#include "cluster_config.h" #include #include -#include +#include #include -#include "absl/strings/match.h" -#include "base/flags.h" #include "base/logging.h" -#include "cluster_config.h" #include "core/json/json_object.h" using namespace std; -ABSL_FLAG(string, cluster_mode, "", "Cluster mode supported. Default: \"\""); - -namespace dfly { -namespace { -enum class ClusterMode { - kUninitialized, - kNoCluster, - kEmulatedCluster, - kRealCluster, -}; - -ClusterMode cluster_mode = ClusterMode::kUninitialized; -} // namespace - -void ClusterConfig::Initialize() { - string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode); - - if (cluster_mode_str == "emulated") { - cluster_mode = ClusterMode::kEmulatedCluster; - } else if (cluster_mode_str == "yes") { - cluster_mode = ClusterMode::kRealCluster; - } else if (cluster_mode_str.empty()) { - cluster_mode = ClusterMode::kNoCluster; - } else { - LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting..."; - exit(1); - } -} - -bool ClusterConfig::IsEnabled() { - return cluster_mode == ClusterMode::kRealCluster; -} - -bool ClusterConfig::IsEmulated() { - return cluster_mode == ClusterMode::kEmulatedCluster; -} - -SlotId ClusterConfig::KeySlot(string_view key) { - string_view tag = LockTagOptions::instance().Tag(key); - return crc16(tag.data(), tag.length()) & kMaxSlotNum; -} +namespace dfly::cluster { namespace { bool HasValidNodeIds(const ClusterShardInfos& new_config) { @@ -87,7 +40,7 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) { bool IsConfigValid(const ClusterShardInfos& new_config) { // Make sure that all slots are set exactly once. - array slots_found = {}; + array slots_found = {}; if (!HasValidNodeIds(new_config)) { return false; @@ -347,7 +300,7 @@ std::shared_ptr ClusterConfig::CloneWithChanges( } bool ClusterConfig::IsMySlot(SlotId id) const { - if (id > ClusterConfig::kMaxSlotNum) { + if (id > cluster::kMaxSlotNum) { DCHECK(false) << "Requesting a non-existing slot id " << id; return false; } @@ -360,7 +313,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const { } ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const { - CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id; + CHECK_LE(id, cluster::kMaxSlotNum) << "Requesting a non-existing slot id " << id; for (const auto& shard : config_) { for (const auto& range : shard.slot_ranges) { @@ -417,4 +370,4 @@ std::vector ClusterConfig::GetFinishedIncomingMigrations( : std::vector(); } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 4c055d011..4099852c5 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -11,63 +11,10 @@ #include "src/server/cluster/slot_set.h" #include "src/server/common.h" -namespace dfly { - -// MigrationState constants are ordered in state changing order -enum class MigrationState : uint8_t { - C_NO_STATE, - C_CONNECTING, - C_SYNC, - C_FINISHED, - C_CANCELLED, - C_MAX_INVALID = std::numeric_limits::max() -}; - -struct ClusterNodeInfo { - std::string id; - std::string ip; - uint16_t port = 0; -}; - -struct MigrationInfo { - std::vector slot_ranges; - std::string node_id; - std::string ip; - uint16_t port = 0; - - bool operator==(const MigrationInfo& r) const { - return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && node_id == r.node_id; - } -}; - -struct ClusterShardInfo { - SlotRanges slot_ranges; - ClusterNodeInfo master; - std::vector replicas; - std::vector migrations; -}; - -using ClusterShardInfos = std::vector; +namespace dfly::cluster { class ClusterConfig { public: - static constexpr SlotId kMaxSlotNum = 0x3FFF; - static constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; - - static SlotId KeySlot(std::string_view key); - - static void Initialize(); - static bool IsEnabled(); - static bool IsEmulated(); - - static bool IsEnabledOrEmulated() { - return IsEnabled() || IsEmulated(); - } - - static bool IsShardedByTag() { - return IsEnabledOrEmulated() || LockTagOptions::instance().enabled; - } - // Returns an instance with `config` if it is valid. // Returns heap-allocated object as it is too big for a stack frame. static std::shared_ptr CreateFromConfig(std::string_view my_id, @@ -119,4 +66,4 @@ class ClusterConfig { std::vector my_incoming_migrations_; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index 7b37f7f0d..3aad3b648 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -14,9 +14,9 @@ using namespace std; using namespace testing; -using Node = dfly::ClusterNodeInfo; +using Node = dfly::cluster::ClusterNodeInfo; -namespace dfly { +namespace dfly::cluster { MATCHER_P(NodeMatches, expected, "") { return arg.id == expected.id && arg.ip == expected.ip && arg.port == expected.port; @@ -558,4 +558,4 @@ TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) { EXPECT_EQ(config, nullptr); } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_defs.cc b/src/server/cluster/cluster_defs.cc new file mode 100644 index 000000000..35cc88211 --- /dev/null +++ b/src/server/cluster/cluster_defs.cc @@ -0,0 +1,63 @@ + +extern "C" { +#include "redis/crc16.h" +} + +#include "base/flags.h" +#include "base/logging.h" +#include "cluster_defs.h" +#include "src/server/common.h" + +using namespace std; + +ABSL_FLAG(string, cluster_mode, "", "Cluster mode supported. Default: \"\""); + +namespace dfly::cluster { +namespace { +enum class ClusterMode { + kUninitialized, + kNoCluster, + kEmulatedCluster, + kRealCluster, +}; + +ClusterMode cluster_mode = ClusterMode::kUninitialized; +} // namespace + +void InitializeCluster() { + string cluster_mode_str = absl::GetFlag(FLAGS_cluster_mode); + + if (cluster_mode_str == "emulated") { + cluster_mode = ClusterMode::kEmulatedCluster; + } else if (cluster_mode_str == "yes") { + cluster_mode = ClusterMode::kRealCluster; + } else if (cluster_mode_str.empty()) { + cluster_mode = ClusterMode::kNoCluster; + } else { + LOG(ERROR) << "Invalid value for flag --cluster_mode. Exiting..."; + exit(1); + } +} + +bool IsClusterEnabled() { + return cluster_mode == ClusterMode::kRealCluster; +} + +bool IsClusterEmulated() { + return cluster_mode == ClusterMode::kEmulatedCluster; +} + +SlotId KeySlot(std::string_view key) { + string_view tag = LockTagOptions::instance().Tag(key); + return crc16(tag.data(), tag.length()) & kMaxSlotNum; +} + +bool IsClusterEnabledOrEmulated() { + return IsClusterEnabled() || IsClusterEmulated(); +} + +bool IsClusterShardedByTag() { + return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled; +} + +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h new file mode 100644 index 000000000..6c6630eee --- /dev/null +++ b/src/server/cluster/cluster_defs.h @@ -0,0 +1,90 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace dfly::cluster { + +using SlotId = uint16_t; + +constexpr SlotId kMaxSlotNum = 0x3FFF; +constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; + +struct SlotRange { + static constexpr SlotId kMaxSlotId = 0x3FFF; + SlotId start = 0; + SlotId end = 0; + + bool operator==(const SlotRange& r) const { + return start == r.start && end == r.end; + } + bool IsValid() { + return start <= end && start <= kMaxSlotId && end <= kMaxSlotId; + } + + std::string ToString() const { + return absl::StrCat("[", start, ", ", end, "]"); + } + + static std::string ToString(const std::vector& ranges) { + return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) { + absl::StrAppend(out, range.ToString()); + }); + } +}; + +using SlotRanges = std::vector; + +struct ClusterNodeInfo { + std::string id; + std::string ip; + uint16_t port = 0; +}; + +struct MigrationInfo { + std::vector slot_ranges; + std::string node_id; + std::string ip; + uint16_t port = 0; + + bool operator==(const MigrationInfo& r) const { + return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && node_id == r.node_id; + } +}; + +struct ClusterShardInfo { + SlotRanges slot_ranges; + ClusterNodeInfo master; + std::vector replicas; + std::vector migrations; +}; + +using ClusterShardInfos = std::vector; + +// MigrationState constants are ordered in state changing order +enum class MigrationState : uint8_t { + C_NO_STATE, + C_CONNECTING, + C_SYNC, + C_FINISHED, + C_MAX_INVALID = std::numeric_limits::max() +}; + +SlotId KeySlot(std::string_view key); + +void InitializeCluster(); +bool IsClusterEnabled(); +bool IsClusterEmulated(); +bool IsClusterEnabledOrEmulated(); +bool IsClusterShardedByTag(); + +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index cc907d65d..ce9f3a45e 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -32,6 +32,17 @@ ABSL_FLAG(std::string, cluster_node_id, "", ABSL_DECLARE_FLAG(int32_t, port); namespace dfly { +namespace acl { +constexpr uint32_t kCluster = SLOW; +// Reconsider to maybe more sensible defaults +constexpr uint32_t kDflyCluster = ADMIN | SLOW; +constexpr uint32_t kReadOnly = FAST | CONNECTION; +constexpr uint32_t kReadWrite = FAST | CONNECTION; +constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS; +} // namespace acl +} // namespace dfly + +namespace dfly::cluster { namespace { using namespace std; @@ -52,12 +63,12 @@ thread_local shared_ptr tl_cluster_config; ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) { CHECK_NOTNULL(server_family_); - ClusterConfig::Initialize(); + InitializeCluster(); id_ = absl::GetFlag(FLAGS_cluster_node_id); if (id_.empty()) { id_ = server_family_->master_replid(); - } else if (ClusterConfig::IsEmulated()) { + } else if (IsClusterEmulated()) { LOG(ERROR) << "Setting --cluster_node_id in emulated mode is unsupported"; exit(1); } @@ -68,7 +79,7 @@ ClusterConfig* ClusterFamily::cluster_config() { } ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { - ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}}, + ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = kMaxSlotNum}}, .master = {}, .replicas = {}, .migrations = {}}; @@ -166,7 +177,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) } // namespace void ClusterFamily::ClusterShards(ConnectionContext* cntx) { - if (ClusterConfig::IsEmulated()) { + if (IsClusterEmulated()) { return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, cntx); } else if (tl_cluster_config != nullptr) { return ClusterShardsImpl(tl_cluster_config->GetConfig(), cntx); @@ -210,7 +221,7 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) } // namespace void ClusterFamily::ClusterSlots(ConnectionContext* cntx) { - if (ClusterConfig::IsEmulated()) { + if (IsClusterEmulated()) { return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, cntx); } else if (tl_cluster_config != nullptr) { return ClusterSlotsImpl(tl_cluster_config->GetConfig(), cntx); @@ -265,7 +276,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, Connec } // namespace void ClusterFamily::ClusterNodes(ConnectionContext* cntx) { - if (ClusterConfig::IsEmulated()) { + if (IsClusterEmulated()) { return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, cntx); } else if (tl_cluster_config != nullptr) { return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, cntx); @@ -284,7 +295,7 @@ void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { // Initialize response variables to emulated mode. string_view state = "ok"sv; - SlotId slots_assigned = ClusterConfig::kMaxSlotNum + 1; + SlotId slots_assigned = kMaxSlotNum + 1; size_t known_nodes = 1; long epoch = 1; size_t cluster_size = 1; @@ -329,7 +340,7 @@ void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { } // namespace void ClusterFamily::ClusterInfo(ConnectionContext* cntx) { - if (ClusterConfig::IsEmulated()) { + if (IsClusterEmulated()) { return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, cntx); } else if (tl_cluster_config != nullptr) { return ClusterInfoImpl(tl_cluster_config->GetConfig(), cntx); @@ -343,7 +354,7 @@ void ClusterFamily::KeySlot(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(WrongNumArgsError("CLUSTER KEYSLOT")); } - SlotId id = ClusterConfig::KeySlot(ArgS(args, 1)); + SlotId id = cluster::KeySlot(ArgS(args, 1)); return cntx->SendLong(id); } @@ -354,7 +365,7 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); - if (!ClusterConfig::IsEnabledOrEmulated()) { + if (!IsClusterEnabledOrEmulated()) { return cntx->SendError(kClusterDisabled); } @@ -376,21 +387,21 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { } void ClusterFamily::ReadOnly(CmdArgList args, ConnectionContext* cntx) { - if (!ClusterConfig::IsEmulated()) { + if (!IsClusterEmulated()) { return cntx->SendError(kClusterDisabled); } cntx->SendOk(); } void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) { - if (!ClusterConfig::IsEmulated()) { + if (!IsClusterEmulated()) { return cntx->SendError(kClusterDisabled); } cntx->SendOk(); } void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { - if (!ClusterConfig::IsEnabledOrEmulated()) { + if (!IsClusterEnabledOrEmulated()) { return cntx->SendError(kClusterDisabled); } @@ -570,7 +581,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c vector> slots_stats; do { auto sid = parser.Next(); - if (sid > ClusterConfig::kMaxSlotNum) + if (sid > kMaxSlotNum) return rb->SendError("Invalid slot id"); slots_stats.emplace_back(sid, SlotStats{}); } while (parser.HasNext()); @@ -646,8 +657,6 @@ static string_view StateToStr(MigrationState state) { return "SYNC"sv; case MigrationState::C_FINISHED: return "FINISHED"sv; - case MigrationState::C_CANCELLED: - return "CANCELLED"sv; case MigrationState::C_MAX_INVALID: break; } @@ -924,15 +933,6 @@ inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { #define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x)) -namespace acl { -constexpr uint32_t kCluster = SLOW; -// Reconsider to maybe more sensible defaults -constexpr uint32_t kDflyCluster = ADMIN | SLOW; -constexpr uint32_t kReadOnly = FAST | CONNECTION; -constexpr uint32_t kReadWrite = FAST | CONNECTION; -constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS; -} // namespace acl - void ClusterFamily::Register(CommandRegistry* registry) { registry->StartFamily(); *registry << CI{"CLUSTER", CO::READONLY, -2, 0, 0, acl::kCluster}.HFUNC(Cluster) @@ -945,4 +945,4 @@ void ClusterFamily::Register(CommandRegistry* registry) { DflyMigrate); } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 45052ba9a..4dde1c092 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -15,9 +15,12 @@ #include "server/common.h" namespace dfly { +class ServerFamily; class CommandRegistry; class ConnectionContext; -class ServerFamily; +} // namespace dfly + +namespace dfly::cluster { class ClusterFamily { public: @@ -105,4 +108,4 @@ class ClusterFamily { ServerFamily* server_family_ = nullptr; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index fc99bb3a3..868ecea6a 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -18,7 +18,7 @@ #include "facade/facade_test.h" #include "server/test_utils.h" -namespace dfly { +namespace dfly::cluster { namespace { using namespace std; @@ -395,7 +395,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { absl::InsecureBitGen eng; while (true) { string random_key = GetRandomHex(eng, 40); - SlotId slot = ClusterConfig::KeySlot(random_key); + SlotId slot = KeySlot(random_key); if (slot > 10'000) { continue; } @@ -407,7 +407,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { while (true) { string random_key = GetRandomHex(eng, 40); - SlotId slot = ClusterConfig::KeySlot(random_key); + SlotId slot = KeySlot(random_key); if (slot <= 10'000) { continue; } @@ -429,7 +429,7 @@ TEST_F(ClusterFamilyTest, ClusterGetSlotInfo) { ConfigSingleNodeCluster(GetMyId()); constexpr string_view kKey = "some-key"; - const SlotId slot = ClusterConfig::KeySlot(kKey); + const SlotId slot = KeySlot(kKey); EXPECT_NE(slot, 0) << "We need to choose another key"; const string value(1'000, '#'); // Long string - to use heap @@ -740,4 +740,4 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) { } } // namespace -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 8c591f736..cee46f422 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -11,7 +11,7 @@ #include "server/journal/tx_executor.h" #include "server/main_service.h" -namespace dfly { +namespace dfly::cluster { using namespace std; using namespace util; @@ -140,4 +140,4 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou shard_flows_[shard]->Start(&cntx_, source, bc_); } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 952154444..470bf4d53 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -5,12 +5,15 @@ #include "helio/io/io.h" #include "helio/util/fiber_socket_base.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" +#include "server/common.h" namespace dfly { -class ClusterShardMigration; - class Service; +} + +namespace dfly::cluster { +class ClusterShardMigration; // The main entity on the target side that manage slots migration process // Manage connections between the target and source node, @@ -55,4 +58,4 @@ class IncomingSlotMigration { util::fb2::BlockingCounter bc_; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 9c97199fd..95d8cb336 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -23,7 +23,7 @@ using namespace std; using namespace facade; using namespace util; -namespace dfly { +namespace dfly::cluster { class OutgoingMigration::SliceSlotMigration : private ProtocolClient { public: @@ -221,4 +221,4 @@ std::error_code OutgoingMigration::Start(ConnectionContext* cntx) { return {}; } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 2d9eddfb2..f583bd095 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -4,18 +4,18 @@ #pragma once #include "io/io.h" -#include "server/cluster/cluster_config.h" -#include "server/common.h" +#include "server/cluster/cluster_defs.h" #include "server/protocol_client.h" namespace dfly { +class DbSlice; +class ServerFamily; namespace journal { class Journal; } - -class DbSlice; -class ServerFamily; +} // namespace dfly +namespace dfly::cluster { class ClusterFamily; // Whole outgoing slots migration manager @@ -76,4 +76,4 @@ class OutgoingMigration : private ProtocolClient { std::atomic state_ = MigrationState::C_NO_STATE; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/slot_set.h b/src/server/cluster/slot_set.h index 3379540bd..b9167330e 100644 --- a/src/server/cluster/slot_set.h +++ b/src/server/cluster/slot_set.h @@ -6,40 +6,11 @@ #include #include -#include #include -#include "absl/strings/str_cat.h" -#include "absl/strings/str_join.h" +#include "cluster_defs.h" -namespace dfly { - -using SlotId = uint16_t; - -struct SlotRange { - static constexpr SlotId kMaxSlotId = 0x3FFF; - SlotId start = 0; - SlotId end = 0; - - bool operator==(const SlotRange& r) const { - return start == r.start && end == r.end; - } - bool IsValid() { - return start <= end && start <= kMaxSlotId && end <= kMaxSlotId; - } - - std::string ToString() const { - return absl::StrCat("[", start, ", ", end, "]"); - } - - static std::string ToString(const std::vector& ranges) { - return absl::StrJoin(ranges, ", ", [](std::string* out, SlotRange range) { - absl::StrAppend(out, range.ToString()); - }); - } -}; - -using SlotRanges = std::vector; +namespace dfly::cluster { class SlotSet { public: @@ -117,4 +88,4 @@ class SlotSet { std::unique_ptr slots_{std::make_unique()}; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/unique_slot_checker.cc b/src/server/cluster/unique_slot_checker.cc index 9ff978f48..af15bbbe1 100644 --- a/src/server/cluster/unique_slot_checker.cc +++ b/src/server/cluster/unique_slot_checker.cc @@ -1,21 +1,21 @@ #include "server/cluster/unique_slot_checker.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" using namespace std; -namespace dfly { +namespace dfly::cluster { void UniqueSlotChecker::Add(std::string_view key) { - if (!ClusterConfig::IsEnabled()) { + if (!IsClusterEnabled()) { return; } - Add(ClusterConfig::KeySlot(key)); + Add(KeySlot(key)); } void UniqueSlotChecker::Add(SlotId slot_id) { - if (!ClusterConfig::IsEnabled()) { + if (!IsClusterEnabled()) { return; } @@ -25,16 +25,16 @@ void UniqueSlotChecker::Add(SlotId slot_id) { } if (*slot_id_ != slot_id) { - slot_id_ = ClusterConfig::kInvalidSlotId; + slot_id_ = kInvalidSlotId; } } optional UniqueSlotChecker::GetUniqueSlotId() const { - if (slot_id_.has_value() && *slot_id_ == ClusterConfig::kInvalidSlotId) { + if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) { return nullopt; } return slot_id_; } -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/cluster/unique_slot_checker.h b/src/server/cluster/unique_slot_checker.h index f505d029b..7f2bb63cc 100644 --- a/src/server/cluster/unique_slot_checker.h +++ b/src/server/cluster/unique_slot_checker.h @@ -7,9 +7,9 @@ #include #include -#include "server/cluster/slot_set.h" +#include "server/cluster/cluster_defs.h" -namespace dfly { +namespace dfly::cluster { // A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. // Only works when cluster is enabled. @@ -24,4 +24,4 @@ class UniqueSlotChecker { std::optional slot_id_; }; -} // namespace dfly +} // namespace dfly::cluster diff --git a/src/server/common.cc b/src/server/common.cc index 358093457..cc96d6f64 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -18,7 +18,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "core/compact_object.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" @@ -269,7 +269,7 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, ClusterConfig::KeySlot(key), + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), make_pair("DEL", ArgSlice{key}), false); } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 91a38a64d..ef22d5c89 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -9,7 +9,7 @@ #include "base/flags.h" #include "base/logging.h" #include "generic_family.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" @@ -60,8 +60,8 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* stats.AddTypeMemoryUsage(type, size); - if (ClusterConfig::IsEnabled()) { - db->slots_stats[ClusterConfig::KeySlot(key)].memory_bytes += size; + if (cluster::IsClusterEnabled()) { + db->slots_stats[cluster::KeySlot(key)].memory_bytes += size; } } @@ -204,7 +204,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT // log the evicted keys to journal. if (auto journal = db_slice_->shard_owner()->journal(); journal) { ArgSlice delete_args(&key, 1); - journal->RecordEntry(0, journal::Op::EXPIRED, cntx_.db_index, 1, ClusterConfig::KeySlot(key), + journal->RecordEntry(0, journal::Op::EXPIRED, cntx_.db_index, 1, cluster::KeySlot(key), make_pair("DEL", delete_args), false); } @@ -301,7 +301,7 @@ auto DbSlice::GetStats() const -> Stats { return s; } -SlotStats DbSlice::GetSlotStats(SlotId sid) const { +SlotStats DbSlice::GetSlotStats(cluster::SlotId sid) const { CHECK(db_arr_[0]); return db_arr_[0]->slots_stats[sid]; } @@ -528,8 +528,8 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: break; case UpdateStatsMode::kReadStats: events_.hits++; - if (ClusterConfig::IsEnabled()) { - db.slots_stats[ClusterConfig::KeySlot(key)].total_reads++; + if (cluster::IsClusterEnabled()) { + db.slots_stats[cluster::KeySlot(key)].total_reads++; } break; } @@ -661,8 +661,8 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt events_.garbage_checked += evp.checked(); memory_budget_ = evp.mem_budget() + evicted_obj_bytes; - if (ClusterConfig::IsEnabled()) { - SlotId sid = ClusterConfig::KeySlot(key); + if (cluster::IsClusterEnabled()) { + cluster::SlotId sid = cluster::KeySlot(key); db.slots_stats[sid].key_count += 1; } @@ -703,7 +703,7 @@ bool DbSlice::Del(DbIndex db_ind, Iterator it) { return true; } -void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { +void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { // Slot deletion can take time as it traverses all the database, hence it runs in fiber. // We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb // was made. Therefore we delete slots entries with version < next_version @@ -712,7 +712,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { std::string tmp; auto del_entry_cb = [&](PrimeTable::iterator it) { std::string_view key = it->first.GetSlice(&tmp); - SlotId sid = ClusterConfig::KeySlot(key); + cluster::SlotId sid = cluster::KeySlot(key); if (slot_ids.Contains(sid) && it.GetVersion() < next_version) { PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get()); } @@ -765,8 +765,8 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { etl.DecommitMemory(ServerState::kDataHeap); } -void DbSlice::FlushSlots(SlotRanges slot_ranges) { - SlotSet slot_set(slot_ranges); +void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) { + cluster::SlotSet slot_set(slot_ranges); InvalidateSlotWatches(slot_set); fb2::Fiber("flush_slots", [this, slot_set = std::move(slot_set)]() mutable { FlushSlotsFb(slot_set); @@ -1075,8 +1075,8 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size ++events_.update; - if (ClusterConfig::IsEnabled()) { - db.slots_stats[ClusterConfig::KeySlot(key)].total_writes += 1; + if (cluster::IsClusterEnabled()) { + db.slots_stats[cluster::KeySlot(key)].total_writes += 1; } SendInvalidationTrackingMessage(key); @@ -1333,7 +1333,7 @@ finish: if (auto journal = owner_->journal(); journal) { for (string_view key : keys_to_journal) { ArgSlice delete_args(&key, 1); - journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, ClusterConfig::KeySlot(key), + journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, cluster::KeySlot(key), make_pair("DEL", delete_args), false); } } @@ -1466,9 +1466,9 @@ void DbSlice::InvalidateDbWatches(DbIndex db_indx) { } } -void DbSlice::InvalidateSlotWatches(const SlotSet& slot_ids) { +void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) { for (const auto& [key, conn_list] : db_arr_[0]->watched_keys) { - SlotId sid = ClusterConfig::KeySlot(key); + cluster::SlotId sid = cluster::KeySlot(key); if (!slot_ids.Contains(sid)) { continue; } @@ -1583,8 +1583,8 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl --stats.listpack_blob_cnt; } - if (ClusterConfig::IsEnabled()) { - SlotId sid = ClusterConfig::KeySlot(del_it.key()); + if (cluster::IsClusterEnabled()) { + cluster::SlotId sid = cluster::KeySlot(del_it.key()); table->slots_stats[sid].key_count -= 1; } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 780a199b7..00a9b2454 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -229,7 +229,7 @@ class DbSlice { Stats GetStats() const; // Returns slot statistics for db 0. - SlotStats GetSlotStats(SlotId sid) const; + SlotStats GetSlotStats(cluster::SlotId sid) const; void UpdateExpireBase(uint64_t now, unsigned generation) { expire_base_[generation & 1] = now; @@ -349,7 +349,7 @@ class DbSlice { void FlushDb(DbIndex db_ind); // Flushes the data of given slot ranges. - void FlushSlots(SlotRanges slot_ranges); + void FlushSlots(cluster::SlotRanges slot_ranges); EngineShard* shard_owner() const { return owner_; @@ -487,14 +487,14 @@ class DbSlice { PrimeValue obj, uint64_t expire_at_ms, bool force_update); - void FlushSlotsFb(const SlotSet& slot_ids); + void FlushSlotsFb(const cluster::SlotSet& slot_ids); void FlushDbIndexes(const std::vector& indexes); // Invalidate all watched keys in database. Used on FLUSH. void InvalidateDbWatches(DbIndex db_indx); // Invalidate all watched keys for given slots. Used on FlushSlots. - void InvalidateSlotWatches(const SlotSet& slot_ids); + void InvalidateSlotWatches(const cluster::SlotSet& slot_ids); void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 2b66913d9..a1436bdad 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -627,7 +627,7 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args) if (!absl::SimpleAtoi(slot_str, &slot_id)) { return facade::OpStatus::INVALID_INT; } - if (slot_id > ClusterConfig::kMaxSlotNum) { + if (slot_id > cluster::kMaxSlotNum) { return facade::OpStatus::INVALID_VALUE; } return slot_id; @@ -643,8 +643,8 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args) cntx_->SendError(end.status()); return nullopt; } - options.slot_range = SlotRange{.start = static_cast(start.value()), - .end = static_cast(end.value())}; + options.slot_range = cluster::SlotRange{.start = static_cast(start.value()), + .end = static_cast(end.value())}; } else { cntx_->SendError(kSyntaxErr); @@ -716,7 +716,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, // : and continue until num_of_keys are added. // Add keys only in slot range. - SlotId sid = ClusterConfig::KeySlot(key); + cluster::SlotId sid = cluster::KeySlot(key); if (sid < options.slot_range->start || sid > options.slot_range->end) { ++index; continue; diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 97cccd560..1743fd67a 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -4,7 +4,7 @@ #pragma once -#include "server/cluster/slot_set.h" +#include "server/cluster/cluster_defs.h" #include "server/conn_context.h" namespace dfly { @@ -22,7 +22,7 @@ class DebugCmd { std::string_view type{"STRING"}; uint32_t elements = 1; - std::optional slot_range; + std::optional slot_range; }; public: diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 6d28d3b42..014233d63 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -17,7 +17,7 @@ extern "C" { #include "base/logging.h" #include "io/proc_reader.h" #include "server/blocking_controller.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" #include "server/search/doc_index.h" #include "server/server_state.h" #include "server/tiered_storage.h" @@ -865,7 +865,7 @@ void EngineShardSet::TEST_EnableCacheMode() { } ShardId Shard(string_view v, ShardId shard_num) { - if (ClusterConfig::IsShardedByTag()) { + if (cluster::IsClusterShardedByTag()) { v = LockTagOptions::instance().Tag(v); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index fcc0c3a27..a0e12f8b2 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -18,7 +18,7 @@ extern "C" { #include "redis/rdb.h" #include "server/acl/acl_commands_def.h" #include "server/blocking_controller.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/container_utils.h" @@ -1231,7 +1231,7 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { if (!absl::SimpleAtoi(key, &index)) { return cntx->SendError(kInvalidDbIndErr); } - if (ClusterConfig::IsEnabled() && index != 0) { + if (cluster::IsClusterEnabled() && index != 0) { return cntx->SendError("SELECT is not allowed in cluster mode"); } if (index < 0 || index >= absl::GetFlag(FLAGS_dbnum)) { diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index b3f039fa6..5f5fff3dd 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -67,7 +67,7 @@ void JournalExecutor::FlushAll() { Execute(cmd); } -void JournalExecutor::FlushSlots(const SlotRange& slot_range) { +void JournalExecutor::FlushSlots(const cluster::SlotRange& slot_range) { auto cmd = BuildFromParts("DFLYCLUSTER", "FLUSHSLOTS", slot_range.start, slot_range.end); Execute(cmd); } diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index ccd8fbca6..e48f14a68 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -7,7 +7,7 @@ #include #include "facade/reply_capture.h" -#include "server/cluster/slot_set.h" +#include "server/cluster/cluster_defs.h" #include "server/journal/types.h" namespace dfly { @@ -26,7 +26,7 @@ class JournalExecutor { void Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd); void FlushAll(); // Execute FLUSHALL. - void FlushSlots(const SlotRange& slot_range); + void FlushSlots(const cluster::SlotRange& slot_range); ConnectionContext* connection_context() { return &conn_context_; diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 09cd66839..e1e2f994e 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -84,7 +84,7 @@ LSN Journal::GetLsn() const { } void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload, bool await) { + std::optional slot, Entry::Payload payload, bool await) { journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await); } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 0ffec04e3..51674e3f3 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -35,7 +35,7 @@ class Journal { LSN GetLsn() const; void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload, bool await); + std::optional slot, Entry::Payload payload, bool await); private: mutable util::fb2::Mutex state_mu_; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index dd925beb6..8543cfc9e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -7,7 +7,7 @@ #include #include "base/logging.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" namespace dfly { using namespace util; @@ -60,7 +60,7 @@ void JournalStreamer::WriterFb(io::Sink* dest) { } } -RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, +RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx) : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { DCHECK(slice != nullptr); @@ -130,10 +130,10 @@ bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { } bool RestoreStreamer::ShouldWrite(std::string_view key) const { - return ShouldWrite(ClusterConfig::KeySlot(key)); + return ShouldWrite(cluster::KeySlot(key)); } -bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { +bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { return my_slots_.Contains(slot_id); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 1d7a1c76c..9652a0344 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -53,7 +53,7 @@ class JournalStreamer : protected BufferedStreamerBase { // Only handles relevant slots, while ignoring all others. class RestoreStreamer : public JournalStreamer { public: - RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); + RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx); ~RestoreStreamer() override; void Start(io::Sink* dest, bool send_lsn = false) override; @@ -70,7 +70,7 @@ class RestoreStreamer : public JournalStreamer { void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; bool ShouldWrite(std::string_view key) const; - bool ShouldWrite(SlotId slot_id) const; + bool ShouldWrite(cluster::SlotId slot_id) const; // Returns whether anything was written bool WriteBucket(PrimeTable::bucket_iterator it); @@ -79,7 +79,7 @@ class RestoreStreamer : public JournalStreamer { DbSlice* db_slice_; uint64_t snapshot_version_ = 0; - SlotSet my_slots_; + cluster::SlotSet my_slots_; Cancellation fiber_cancellation_; bool snapshot_finished_ = false; }; diff --git a/src/server/journal/types.cc b/src/server/journal/types.cc index 7dbb8c42e..e0a30b1e1 100644 --- a/src/server/journal/types.cc +++ b/src/server/journal/types.cc @@ -4,7 +4,7 @@ #include "server/journal/types.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" namespace dfly::journal { diff --git a/src/server/journal/types.h b/src/server/journal/types.h index befcf1323..6671c88a6 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -7,7 +7,7 @@ #include #include -#include "server/cluster/slot_set.h" +#include "server/cluster/cluster_defs.h" #include "server/common.h" #include "server/table.h" @@ -31,7 +31,7 @@ struct EntryBase { Op opcode; DbIndex dbid; uint32_t shard_cnt; - std::optional slot; + std::optional slot; LSN lsn{0}; }; @@ -45,12 +45,12 @@ struct Entry : public EntryBase { std::pair // Command and its shard parts. >; - Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id, - Payload pl) + Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, + std::optional slot_id, Payload pl) : EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{pl} { } - Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) + Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) : EntryBase{0, opcode, dbid, 0, slot_id, 0} { } @@ -58,7 +58,7 @@ struct Entry : public EntryBase { } Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt, - std::optional slot_id) + std::optional slot_id) : EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} { } @@ -85,7 +85,7 @@ struct JournalItem { LSN lsn; Op opcode; std::string data; - std::optional slot; + std::optional slot; }; using ChangeCallback = std::function; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index f768f3426..4ad682f1c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -575,7 +575,8 @@ void TxTable(const http::QueryArgs& args, HttpContext* send) { send->Invoke(std::move(resp)); } -void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, ClusterFamily* cluster) { +void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, + cluster::ClusterFamily* cluster_family) { http::StringResponse resp = http::MakeStringResponse(h2::status::ok); resp.body() = R"( @@ -616,19 +617,19 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send, ClusterFami auto print_kb = [&](string_view k, bool v) { print_kv(k, v ? "True" : "False"); }; - print_kv("Mode", ClusterConfig::IsEmulated() ? "Emulated" - : ClusterConfig::IsEnabled() ? "Enabled" - : "Disabled"); + print_kv("Mode", cluster::IsClusterEmulated() ? "Emulated" + : cluster::IsClusterEnabled() ? "Enabled" + : "Disabled"); - if (ClusterConfig::IsEnabledOrEmulated()) { + if (cluster::IsClusterEnabledOrEmulated()) { print_kb("Lock on hashtags", LockTagOptions::instance().enabled); } - if (ClusterConfig::IsEnabled()) { - if (cluster->cluster_config() == nullptr) { + if (cluster::IsClusterEnabled()) { + if (cluster_family->cluster_config() == nullptr) { resp.body() += "

Not yet configured.

\n"; } else { - auto config = cluster->cluster_config()->GetConfig(); + auto config = cluster_family->cluster_config()->GetConfig(); for (const auto& shard : config) { resp.body() += "
\n"; resp.body() += "

Master

\n"; @@ -922,12 +923,12 @@ optional Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis } const auto& key_index = *key_index_res; - optional keys_slot; + optional keys_slot; bool cross_slot = false; // Iterate keys and check to which slot they belong. for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) { string_view key = ArgS(args, i); - SlotId slot = ClusterConfig::KeySlot(key); + cluster::SlotId slot = cluster::KeySlot(key); if (keys_slot && slot != *keys_slot) { cross_slot = true; // keys belong to different slots break; @@ -941,14 +942,14 @@ optional Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis } // Check keys slot is in my ownership - const ClusterConfig* cluster_config = cluster_family_.cluster_config(); + const cluster::ClusterConfig* cluster_config = cluster_family_.cluster_config(); if (cluster_config == nullptr) { return ErrorReply{kClusterNotConfigured}; } if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) { // See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection - ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot); + cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot); return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)}; } @@ -1094,7 +1095,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA return ErrorReply{absl::StrCat("'", cmd_name, "' inside MULTI is not allowed")}; } - if (ClusterConfig::IsEnabled()) { + if (cluster::IsClusterEnabled()) { if (auto err = CheckKeysOwnership(cid, tail_args, dfly_cntx); err) return err; } @@ -1897,7 +1898,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret optional sid; - UniqueSlotChecker slot_checker; + cluster::UniqueSlotChecker slot_checker; for (size_t i = 0; i < eval_args.keys.size(); ++i) { string_view key = ArgS(eval_args.keys, i); slot_checker.Add(key); diff --git a/src/server/main_service.h b/src/server/main_service.h index 075acb622..0a645ceaa 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -181,7 +181,7 @@ class Service : public facade::ServiceInterface { acl::UserRegistry user_registry_; acl::AclFamily acl_family_; ServerFamily server_family_; - ClusterFamily cluster_family_; + cluster::ClusterFamily cluster_family_; CommandRegistry registry_; absl::flat_hash_map unknown_cmds_; diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 63c41063b..c69060208 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -53,7 +53,7 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio } MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo( - ShardId sid, optional slot_id) { + ShardId sid, optional slot_id) { if (sharded_.empty()) sharded_.resize(shard_set->size()); @@ -89,7 +89,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm // Check if all commands belong to one shard bool found_more = false; - UniqueSlotChecker slot_checker; + cluster::UniqueSlotChecker slot_checker; ShardId last_sid = kInvalidSid; IterateKeys(args, *keys, [&last_sid, &found_more, &slot_checker](MutableSlice key) { if (found_more) diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index be15699ef..83f01d68c 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -48,7 +48,7 @@ class MultiCommandSquasher { bool verify_commands, bool error_abort); // Lazy initialize shard info. - ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); + ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); // Retrun squash flags SquashResult TrySquash(StoredCmd* cmd); diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 448af4727..2ff8b9cb6 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -3,10 +3,8 @@ // #pragma once -#include #include -#include #include #include diff --git a/src/server/replica.cc b/src/server/replica.cc index 70afa5038..720c6cf77 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -70,7 +70,7 @@ vector> Partition(unsigned num_flows) { } // namespace Replica::Replica(string host, uint16_t port, Service* se, std::string_view id, - std::optional slot_range) + std::optional slot_range) : ProtocolClient(std::move(host), port), service_(*se), id_{id}, slot_range_(slot_range) { proactor_ = ProactorBase::me(); } diff --git a/src/server/replica.h b/src/server/replica.h index db44b3c78..d82e18c6d 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -12,7 +12,7 @@ #include "base/io_buf.h" #include "facade/facade_types.h" #include "facade/redis_parser.h" -#include "server/cluster/slot_set.h" +#include "server/cluster/cluster_defs.h" #include "server/common.h" #include "server/journal/tx_executor.h" #include "server/journal/types.h" @@ -55,7 +55,7 @@ class Replica : ProtocolClient { public: Replica(std::string master_host, uint16_t port, Service* se, std::string_view id, - std::optional slot_range); + std::optional slot_range); ~Replica(); // Spawns a fiber that runs until link with master is broken or the replication is stopped. @@ -173,7 +173,7 @@ class Replica : ProtocolClient { bool is_paused_ = false; std::string id_; - std::optional slot_range_; + std::optional slot_range_; }; // This class implements a single shard replication flow from a Dragonfly master instance. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 4f61b30ad..3f7301280 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -549,13 +549,13 @@ std::string_view GetOSString() { } string_view GetRedisMode() { - return ClusterConfig::IsEnabledOrEmulated() ? "cluster"sv : "standalone"sv; + return cluster::IsClusterEnabledOrEmulated() ? "cluster"sv : "standalone"sv; } struct ReplicaOfArgs { string host; uint16_t port; - std::optional slot_range; + std::optional slot_range; static optional FromCmdArgs(CmdArgList args, ConnectionContext* cntx); bool IsReplicaOfNoOne() const { return port == 0; @@ -588,8 +588,8 @@ optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo return nullopt; } if (parser.HasNext()) { - auto [slot_start, slot_end] = parser.Next(); - replicaof_args.slot_range = SlotRange{slot_start, slot_end}; + auto [slot_start, slot_end] = parser.Next(); + replicaof_args.slot_range = cluster::SlotRange{slot_start, slot_end}; if (auto err = parser.Error(); err || !replicaof_args.slot_range->IsValid()) { cntx->SendError("Invalid slot range"); return nullopt; @@ -2259,7 +2259,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { #endif if (should_enter("CLUSTER")) { - append("cluster_enabled", ClusterConfig::IsEnabledOrEmulated()); + append("cluster_enabled", cluster::IsClusterEnabledOrEmulated()); } auto* rb = static_cast(cntx->reply_builder()); rb->SendVerbatimString(info); diff --git a/src/server/server_family.h b/src/server/server_family.h index 912d71647..80fb0adc8 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -43,8 +43,10 @@ std::string GetPassword(); namespace journal { class Journal; } // namespace journal - +namespace cluster { class ClusterFamily; +} + class ConnectionContext; class CommandRegistry; class DflyCmd; diff --git a/src/server/table.cc b/src/server/table.cc index f0bd09476..28a6ffb19 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -6,7 +6,7 @@ #include "base/flags.h" #include "base/logging.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/cluster_defs.h" #include "server/server_state.h" ABSL_FLAG(bool, enable_top_keys_tracking, false, @@ -86,8 +86,8 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index) mcflag(0, detail::ExpireTablePolicy{}, mr), top_keys({.enabled = absl::GetFlag(FLAGS_enable_top_keys_tracking)}), index(db_index) { - if (ClusterConfig::IsEnabled()) { - slots_stats.resize(ClusterConfig::kMaxSlotNum + 1); + if (cluster::IsClusterEnabled()) { + slots_stats.resize(cluster::kMaxSlotNum + 1); } thread_index = ServerState::tlocal()->thread_index(); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index a850ba136..7fcbd2d60 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -139,7 +139,8 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { } } -Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id) +Transaction::Transaction(const Transaction* parent, ShardId shard_id, + std::optional slot_id) : multi_{make_unique()}, txid_{parent->txid()}, unique_shard_cnt_{1}, @@ -1008,7 +1009,7 @@ ShardId Transaction::GetUniqueShard() const { return unique_shard_id_; } -optional Transaction::GetUniqueSlotId() const { +optional Transaction::GetUniqueSlotId() const { return unique_slot_checker_.GetUniqueSlotId(); } diff --git a/src/server/transaction.h b/src/server/transaction.h index c22bb4763..0462b3646 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -169,7 +169,8 @@ class Transaction { explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx - explicit Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id); + explicit Transaction(const Transaction* parent, ShardId shard_id, + std::optional slot_id); // Initialize from command (args) on specific db. OpStatus InitByArgs(DbIndex index, CmdArgList args); @@ -280,7 +281,7 @@ class Transaction { // This method is meaningless if GetUniqueShardCnt() != 1. ShardId GetUniqueShard() const; - std::optional GetUniqueSlotId() const; + std::optional GetUniqueSlotId() const; bool IsMulti() const { return bool(multi_); @@ -616,7 +617,7 @@ class Transaction { uint32_t unique_shard_cnt_{0}; // Number of unique shards active ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 - UniqueSlotChecker unique_slot_checker_; + cluster::UniqueSlotChecker unique_slot_checker_; // Barrier for waking blocking transactions that ensures exclusivity of waking operation. BatonBarrier blocking_barrier_{};