diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index a44989424..76a6f71bc 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -13,6 +13,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "cluster_config.h" +#include "core/json/json_object.h" using namespace std; @@ -71,7 +72,7 @@ SlotId ClusterConfig::KeySlot(string_view key) { } namespace { -bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) { +bool HasValidNodeIds(const ClusterShardInfos& new_config) { absl::flat_hash_set nodes; auto CheckAndInsertNode = [&](string_view node) { @@ -95,7 +96,7 @@ bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) { return true; } -bool IsConfigValid(const ClusterConfig::ClusterShards& new_config) { +bool IsConfigValid(const ClusterShardInfos& new_config) { // Make sure that all slots are set exactly once. array slots_found = {}; @@ -140,7 +141,7 @@ bool IsConfigValid(const ClusterConfig::ClusterShards& new_config) { /* static */ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, - const ClusterShards& config) { + const ClusterShardInfos& config) { if (!IsConfigValid(config)) { return nullptr; } @@ -150,9 +151,9 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, result->config_ = config; for (const auto& shard : result->config_) { - bool owned_by_me = - shard.master.id == my_id || any_of(shard.replicas.begin(), shard.replicas.end(), - [&](const Node& node) { return node.id == my_id; }); + bool owned_by_me = shard.master.id == my_id || + any_of(shard.replicas.begin(), shard.replicas.end(), + [&](const ClusterNodeInfo& node) { return node.id == my_id; }); if (owned_by_me) { result->my_slots_.Set(shard.slot_ranges, true); result->my_outgoing_migrations_ = shard.migrations; @@ -206,13 +207,13 @@ optional GetClusterSlotRanges(const JsonType& slots) { return ranges; } -optional ParseClusterNode(const JsonType& json) { +optional ParseClusterNode(const JsonType& json) { if (!json.is_object()) { LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json; return nullopt; } - ClusterConfig::Node node; + ClusterNodeInfo node; { auto id = json.at_or_null("id"); @@ -243,8 +244,8 @@ optional ParseClusterNode(const JsonType& json) { return node; } -optional> ParseMigrations(const JsonType& json) { - std::vector res; +optional> ParseMigrations(const JsonType& json) { + std::vector res; if (json.is_null()) { return res; } @@ -265,16 +266,16 @@ optional> ParseMigrations(const JsonTy return nullopt; } - res.emplace_back(ClusterConfig::MigrationInfo{.slot_ranges = std::move(*slots), - .target_id = target_id.as_string(), - .ip = ip.as_string(), - .port = *port}); + res.emplace_back(MigrationInfo{.slot_ranges = std::move(*slots), + .target_id = target_id.as_string(), + .ip = ip.as_string(), + .port = *port}); } return res; } -optional BuildClusterConfigFromJson(const JsonType& json) { - ClusterConfig::ClusterShards config; +optional BuildClusterConfigFromJson(const JsonType& json) { + ClusterShardInfos config; if (!json.is_array()) { LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json; @@ -282,7 +283,7 @@ optional BuildClusterConfigFromJson(const JsonType } for (const auto& element : json.array_range()) { - ClusterConfig::ClusterShard shard; + ClusterShardInfo shard; if (!element.is_object()) { LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element; @@ -330,8 +331,14 @@ optional BuildClusterConfigFromJson(const JsonType /* static */ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, - const JsonType& json_config) { - optional config = BuildClusterConfigFromJson(json_config); + std::string_view json_str) { + optional json_config = JsonFromString(json_str, PMR_NS::get_default_resource()); + if (!json_config.has_value()) { + LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str; + return nullptr; + } + + optional config = BuildClusterConfigFromJson(json_config); if (!config.has_value()) { return nullptr; } @@ -359,7 +366,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const { return IsMySlot(KeySlot(key)); } -ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { +ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const { CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id; for (const auto& shard : config_) { @@ -374,7 +381,7 @@ ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { return {}; } -ClusterConfig::ClusterShards ClusterConfig::GetConfig() const { +ClusterShardInfos ClusterConfig::GetConfig() const { return config_; } @@ -382,4 +389,39 @@ const SlotSet& ClusterConfig::GetOwnedSlots() const { return my_slots_; } +static std::vector GetMissingMigrations(const std::vector& haystack, + const std::vector& needle) { + std::vector res; + for (const auto& h : haystack) { + if (find(needle.begin(), needle.end(), h) == needle.end()) { + res.push_back(h); + } + } + return res; +} + +std::vector ClusterConfig::GetNewOutgoingMigrations( + const std::shared_ptr& prev) const { + return prev ? GetMissingMigrations(my_outgoing_migrations_, prev->my_outgoing_migrations_) + : my_outgoing_migrations_; +} + +std::vector ClusterConfig::GetNewIncomingMigrations( + const std::shared_ptr& prev) const { + return prev ? GetMissingMigrations(my_incoming_migrations_, prev->my_incoming_migrations_) + : my_incoming_migrations_; +} + +std::vector ClusterConfig::GetFinishedOutgoingMigrations( + const std::shared_ptr& prev) const { + return prev ? GetMissingMigrations(prev->my_outgoing_migrations_, my_outgoing_migrations_) + : std::vector(); +} + +std::vector ClusterConfig::GetFinishedIncomingMigrations( + const std::shared_ptr& prev) const { + return prev ? GetMissingMigrations(prev->my_incoming_migrations_, my_incoming_migrations_) + : std::vector(); +} + } // namespace dfly diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index f53166cb6..aec9dbf00 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -9,7 +9,6 @@ #include #include -#include "core/json/json_object.h" #include "src/server/cluster/slot_set.h" #include "src/server/common.h" @@ -24,38 +23,37 @@ enum class MigrationState : uint8_t { C_MAX_INVALID = std::numeric_limits::max() }; +struct ClusterNodeInfo { + std::string id; + std::string ip; + uint16_t port = 0; +}; + +struct MigrationInfo { + std::vector slot_ranges; + std::string target_id; + std::string ip; + uint16_t port = 0; + + bool operator==(const MigrationInfo& r) const { + return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && target_id == r.target_id; + } +}; + +struct ClusterShardInfo { + SlotRanges slot_ranges; + ClusterNodeInfo master; + std::vector replicas; + std::vector migrations; +}; + +using ClusterShardInfos = std::vector; + class ClusterConfig { public: static constexpr SlotId kMaxSlotNum = 0x3FFF; static constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; - struct Node { - std::string id; - std::string ip; - uint16_t port = 0; - }; - - struct MigrationInfo { - std::vector slot_ranges; - std::string target_id; - std::string ip; - uint16_t port = 0; - - bool operator==(const ClusterConfig::MigrationInfo& r) const { - return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && - target_id == r.target_id; - } - }; - - struct ClusterShard { - SlotRanges slot_ranges; - Node master; - std::vector replicas; - std::vector migrations; - }; - - using ClusterShards = std::vector; - static SlotId KeySlot(std::string_view key); static void Initialize(); @@ -76,11 +74,11 @@ class ClusterConfig { // Returns an instance with `config` if it is valid. // Returns heap-allocated object as it is too big for a stack frame. static std::shared_ptr CreateFromConfig(std::string_view my_id, - const ClusterShards& config); + const ClusterShardInfos& config); - // Parses `json_config` into `ClusterShards` and calls the above overload. + // Parses `json_config` into `ClusterShardInfos` and calls the above overload. static std::shared_ptr CreateFromConfig(std::string_view my_id, - const JsonType& json_config); + std::string_view json_config); std::shared_ptr CloneWithChanges(const std::vector& slots, bool enable) const; @@ -90,29 +88,30 @@ class ClusterConfig { bool IsMySlot(std::string_view key) const; // Returns the master configured for `id`. - Node GetMasterNodeForSlot(SlotId id) const; + ClusterNodeInfo GetMasterNodeForSlot(SlotId id) const; - ClusterShards GetConfig() const; + ClusterShardInfos GetConfig() const; const SlotSet& GetOwnedSlots() const; - const std::vector& GetOutgoingMigrations() const { - return my_outgoing_migrations_; - } - - const std::vector& GetIncomingMigrations() const { - return my_incoming_migrations_; - } + std::vector GetNewOutgoingMigrations( + const std::shared_ptr& prev) const; + std::vector GetNewIncomingMigrations( + const std::shared_ptr& prev) const; + std::vector GetFinishedOutgoingMigrations( + const std::shared_ptr& prev) const; + std::vector GetFinishedIncomingMigrations( + const std::shared_ptr& prev) const; private: struct SlotEntry { - const ClusterShard* shard = nullptr; + const ClusterShardInfo* shard = nullptr; bool owned_by_me = false; }; ClusterConfig() = default; - ClusterShards config_; + ClusterShardInfos config_; SlotSet my_slots_; std::vector my_outgoing_migrations_; diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index 2cd46d26f..b3685864a 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -13,7 +13,7 @@ using namespace std; using namespace testing; -using Node = dfly::ClusterConfig::Node; +using Node = dfly::ClusterNodeInfo; namespace dfly { @@ -23,12 +23,6 @@ MATCHER_P(NodeMatches, expected, "") { class ClusterConfigTest : public ::testing::Test { protected: - JsonType ParseJson(string_view json_str) { - optional opt_json = JsonFromString(json_str, PMR_NS::get_default_resource()); - CHECK(opt_json.has_value()); - return opt_json.value(); - } - const string kMyId = "my-id"; }; @@ -50,7 +44,7 @@ TEST_F(ClusterConfigTest, KeyTagTest) { } TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ClusterConfig::ClusterShards{}), nullptr); + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ClusterShardInfos{}), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) { @@ -154,7 +148,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) { // Note that slot_ranges is not an object - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": "0,16383", @@ -165,13 +159,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) { // Note that slot_ranges.start is not a number - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -187,13 +181,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) { // Note that slot_ranges.end is not a number - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -209,12 +203,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -224,13 +218,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) { } ] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) { // Note that master is not an object - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -242,12 +236,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) { "master": 123, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -262,12 +256,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -282,12 +276,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -302,12 +296,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -322,12 +316,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) { "port": 8000 } } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterId) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -357,12 +351,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterId) { }, "replicas": [] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingReplicaId) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -389,12 +383,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingReplicaId) { } ] } - ])json")), + ])json"), nullptr); } TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterAndReplicaId) { - EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( + EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json( [ { "slot_ranges": [ @@ -416,7 +410,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterAndReplicaId) { } ] } - ])json")), + ])json"), nullptr); } @@ -437,29 +431,68 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) { } ])json"; - auto config1 = ClusterConfig::CreateFromConfig("id0", ParseJson(config_str)); + auto config1 = ClusterConfig::CreateFromConfig("id0", config_str); EXPECT_EQ( - config1->GetOutgoingMigrations(), - (std::vector{ + config1->GetNewOutgoingMigrations(nullptr), + (std::vector{ {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); - EXPECT_TRUE(config1->GetIncomingMigrations().empty()); + EXPECT_TRUE(config1->GetFinishedOutgoingMigrations(nullptr).empty()); + EXPECT_TRUE(config1->GetNewIncomingMigrations(nullptr).empty()); + EXPECT_TRUE(config1->GetFinishedIncomingMigrations(nullptr).empty()); - auto config2 = ClusterConfig::CreateFromConfig("id1", ParseJson(config_str)); + auto config2 = ClusterConfig::CreateFromConfig("id1", config_str); EXPECT_EQ( - config2->GetIncomingMigrations(), - (std::vector{ + config2->GetNewIncomingMigrations(nullptr), + (std::vector{ {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); - EXPECT_TRUE(config2->GetOutgoingMigrations().empty()); + EXPECT_TRUE(config2->GetFinishedOutgoingMigrations(nullptr).empty()); + EXPECT_TRUE(config2->GetNewOutgoingMigrations(nullptr).empty()); + EXPECT_TRUE(config2->GetFinishedIncomingMigrations(nullptr).empty()); - auto config3 = ClusterConfig::CreateFromConfig("id2", ParseJson(config_str)); - EXPECT_TRUE(config3->GetIncomingMigrations().empty()); - EXPECT_TRUE(config3->GetOutgoingMigrations().empty()); + auto config3 = ClusterConfig::CreateFromConfig("id2", config_str); + EXPECT_TRUE(config3->GetFinishedOutgoingMigrations(nullptr).empty()); + EXPECT_TRUE(config3->GetNewIncomingMigrations(nullptr).empty()); + EXPECT_TRUE(config3->GetFinishedIncomingMigrations(nullptr).empty()); + EXPECT_TRUE(config3->GetNewOutgoingMigrations(nullptr).empty()); + + const auto* config_str2 = R"json( + [ + { + "slot_ranges": [ { "start": 0, "end": 6999 } ], + "master": { "id": "id0", "ip": "localhost", "port": 3000 }, + "replicas": [] + }, + { + "slot_ranges": [ { "start": 7000, "end": 16383 } ], + "master": { "id": "id1", "ip": "localhost", "port": 3001 }, + "replicas": [] + } + ])json"; + + auto config4 = ClusterConfig::CreateFromConfig("id0", config_str2); + auto config5 = ClusterConfig::CreateFromConfig("id1", config_str2); + + EXPECT_EQ( + config4->GetFinishedOutgoingMigrations(config1), + (std::vector{ + {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); + EXPECT_TRUE(config4->GetNewIncomingMigrations(config1).empty()); + EXPECT_TRUE(config4->GetFinishedIncomingMigrations(config1).empty()); + EXPECT_TRUE(config4->GetNewOutgoingMigrations(config1).empty()); + + EXPECT_EQ( + config5->GetFinishedIncomingMigrations(config2), + (std::vector{ + {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); + EXPECT_TRUE(config5->GetNewIncomingMigrations(config2).empty()); + EXPECT_TRUE(config5->GetFinishedOutgoingMigrations(config2).empty()); + EXPECT_TRUE(config5->GetNewOutgoingMigrations(config2).empty()); } TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) { - auto config = ClusterConfig::CreateFromConfig("id0", ParseJson(R"json( + auto config = ClusterConfig::CreateFromConfig("id0", R"json( [ { "slot_ranges": [ { "start": 0, "end": 8000 } ], @@ -473,7 +506,7 @@ TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) { "master": { "id": "id1", "ip": "localhost", "port": 3001 }, "replicas": [] } - ])json")); + ])json"); EXPECT_EQ(config, nullptr); } diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 2e8fd7710..023e74672 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -39,9 +39,6 @@ using namespace facade; using namespace util; using CI = CommandId; -using ClusterShard = ClusterConfig::ClusterShard; -using ClusterShards = ClusterConfig::ClusterShards; -using Node = ClusterConfig::Node; constexpr char kIdNotFound[] = "syncid not found"; @@ -70,11 +67,11 @@ ClusterConfig* ClusterFamily::cluster_config() { return tl_cluster_config.get(); } -ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { - ClusterShard info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}}, - .master = {}, - .replicas = {}, - .migrations = {}}; +ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { + ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}}, + .master = {}, + .replicas = {}, + .migrations = {}}; optional replication_info = server_family_->GetReplicaInfo(); ServerState& etl = *ServerState::tlocal(); @@ -123,12 +120,12 @@ void ClusterFamily::ClusterHelp(ConnectionContext* cntx) { } namespace { -void ClusterShardsImpl(const ClusterShards& config, ConnectionContext* cntx) { +void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { // For more details https://redis.io/commands/cluster-shards/ constexpr unsigned int kEntrySize = 4; auto* rb = static_cast(cntx->reply_builder()); - auto WriteNode = [&](const Node& node, string_view role) { + auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) { constexpr unsigned int kNodeSize = 14; rb->StartArray(kNodeSize); rb->SendBulkString("id"); @@ -179,10 +176,10 @@ void ClusterFamily::ClusterShards(ConnectionContext* cntx) { } namespace { -void ClusterSlotsImpl(const ClusterShards& config, ConnectionContext* cntx) { +void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { // For more details https://redis.io/commands/cluster-slots/ auto* rb = static_cast(cntx->reply_builder()); - auto WriteNode = [&](const Node& node) { + auto WriteNode = [&](const ClusterNodeInfo& node) { constexpr unsigned int kNodeSize = 3; rb->StartArray(kNodeSize); rb->SendBulkString(node.ip); @@ -223,12 +220,12 @@ void ClusterFamily::ClusterSlots(ConnectionContext* cntx) { } namespace { -void ClusterNodesImpl(const ClusterShards& config, string_view my_id, ConnectionContext* cntx) { +void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, ConnectionContext* cntx) { // For more details https://redis.io/commands/cluster-nodes/ string result; - auto WriteNode = [&](const Node& node, string_view role, string_view master_id, + auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id, const vector& ranges) { absl::StrAppend(&result, node.id, " "); @@ -278,7 +275,7 @@ void ClusterFamily::ClusterNodes(ConnectionContext* cntx) { } namespace { -void ClusterInfoImpl(const ClusterShards& config, ConnectionContext* cntx) { +void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) { std::string msg; auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) { // Separate lines with \r\n, not \n, see #2726 @@ -489,13 +486,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) } string_view json_str = ArgS(args, 0); - optional json = JsonFromString(json_str, PMR_NS::get_default_resource()); - if (!json.has_value()) { - LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str; - return cntx->SendError("Invalid JSON cluster config", kSyntaxErrType); - } - - shared_ptr new_config = ClusterConfig::CreateFromConfig(id_, json.value()); + shared_ptr new_config = ClusterConfig::CreateFromConfig(id_, json_str); if (new_config == nullptr) { LOG(WARNING) << "Can't set cluster config"; return cntx->SendError("Invalid cluster configuration."); @@ -503,7 +494,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) lock_guard gu(set_config_mu); - SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true); + auto prev_config = tl_cluster_config; + SlotSet before = prev_config ? prev_config->GetOwnedSlots() : SlotSet(true); // Ignore blocked commands because we filter them with CancelBlockingOnThread DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */, @@ -523,8 +515,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); DCHECK(tl_cluster_config != nullptr); - // TODO rewrite with outgoing migrations - if (!StartSlotMigrations(new_config->GetOutgoingMigrations(), cntx)) { + if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(prev_config), cntx)) { return cntx->SendError("Can't start the migration"); } RemoveFinishedMigrations(); @@ -607,11 +598,12 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn return cntx->SendOk(); } -bool ClusterFamily::StartSlotMigrations(const std::vector& migrations, +bool ClusterFamily::StartSlotMigrations(std::vector migrations, ConnectionContext* cntx) { // Add validating and error processing for (auto m : migrations) { - auto outgoing_migration = CreateOutgoingMigration(m.ip, m.port, m.slot_ranges); + auto outgoing_migration = + CreateOutgoingMigration(std::move(m.ip), m.port, std::move(m.slot_ranges)); outgoing_migration->Start(cntx); } return true; diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index ceb22f000..3d69f17a5 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -74,8 +74,7 @@ class ClusterFamily { std::shared_ptr GetIncomingMigration(std::string host_ip, uint16_t port); - bool StartSlotMigrations(const std::vector& migrations, - ConnectionContext* cntx); + bool StartSlotMigrations(std::vector migrations, ConnectionContext* cntx); void RemoveFinishedMigrations(); // store info about migration and create unique session id @@ -95,7 +94,7 @@ class ClusterFamily { OutgoingMigrationMap outgoing_migration_jobs_ ABSL_GUARDED_BY(migration_mu_); private: - ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; + ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const; std::string id_; diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index 62c20825e..fc99bb3a3 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -62,7 +62,7 @@ class ClusterFamilyTest : public BaseFamilyTest { TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) { EXPECT_THAT(RunPrivileged({"dflycluster", "config", "invalid JSON"}), - ErrArg("Invalid JSON cluster config")); + ErrArg("Invalid cluster configuration.")); string cluster_info = Run({"cluster", "info"}).GetString(); EXPECT_THAT(cluster_info, HasSubstr("cluster_state:fail")); @@ -708,7 +708,7 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) { EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1")); } -TEST_F(ClusterFamilyEmulatedTest, ClusterShards) { +TEST_F(ClusterFamilyEmulatedTest, ClusterShardInfos) { EXPECT_THAT(Run({"cluster", "shards"}), RespArray(ElementsAre("slots", // RespArray(ElementsAre(IntArg(0), IntArg(16383))), // diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index 699303f84..bf2fd6988 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -3,6 +3,7 @@ // #pragma once +#include "server/cluster/cluster_config.h" #include "server/protocol_client.h" namespace dfly { diff --git a/src/server/cluster/unique_slot_checker.cc b/src/server/cluster/unique_slot_checker.cc index 156df23d3..9ff978f48 100644 --- a/src/server/cluster/unique_slot_checker.cc +++ b/src/server/cluster/unique_slot_checker.cc @@ -1,5 +1,7 @@ #include "server/cluster/unique_slot_checker.h" +#include "server/cluster/cluster_config.h" + using namespace std; namespace dfly { diff --git a/src/server/cluster/unique_slot_checker.h b/src/server/cluster/unique_slot_checker.h index 1a8740b0f..f505d029b 100644 --- a/src/server/cluster/unique_slot_checker.h +++ b/src/server/cluster/unique_slot_checker.h @@ -7,7 +7,7 @@ #include #include -#include "server/cluster/cluster_config.h" +#include "server/cluster/slot_set.h" namespace dfly { diff --git a/src/server/common.cc b/src/server/common.cc index c2fbc5179..284cf5d70 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -18,6 +18,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "core/compact_object.h" +#include "server/cluster/cluster_config.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 388dc771d..8305d632b 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -9,6 +9,7 @@ #include "base/flags.h" #include "base/logging.h" #include "generic_family.h" +#include "server/cluster/cluster_config.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index d4ad161e9..97cccd560 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -4,7 +4,7 @@ #pragma once -#include "server/cluster/cluster_config.h" +#include "server/cluster/slot_set.h" #include "server/conn_context.h" namespace dfly { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ed0a02440..bec550c1e 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -17,6 +17,7 @@ extern "C" { #include "base/logging.h" #include "io/proc_reader.h" #include "server/blocking_controller.h" +#include "server/cluster/cluster_config.h" #include "server/search/doc_index.h" #include "server/server_state.h" #include "server/tiered_storage.h" diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index e2a9e058d..8a2ec7194 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -21,7 +21,6 @@ extern "C" { #include "core/mi_memory_resource.h" #include "core/task_queue.h" #include "core/tx_queue.h" -#include "server/cluster/cluster_config.h" #include "server/db_slice.h" namespace dfly { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 8c375f400..b46fa6ad4 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -18,6 +18,7 @@ extern "C" { #include "redis/rdb.h" #include "server/acl/acl_commands_def.h" #include "server/blocking_controller.h" +#include "server/cluster/cluster_config.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/container_utils.h" diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index e5708d45a..f741a9451 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "server/cluster/cluster_config.h" namespace dfly { using namespace util; diff --git a/src/server/journal/types.cc b/src/server/journal/types.cc index babd6595e..7dbb8c42e 100644 --- a/src/server/journal/types.cc +++ b/src/server/journal/types.cc @@ -4,6 +4,8 @@ #include "server/journal/types.h" +#include "server/cluster/cluster_config.h" + namespace dfly::journal { std::string Entry::ToString() const { diff --git a/src/server/journal/types.h b/src/server/journal/types.h index dfe74b614..befcf1323 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -7,7 +7,7 @@ #include #include -#include "server/cluster/cluster_config.h" +#include "server/cluster/slot_set.h" #include "server/common.h" #include "server/table.h" diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 55c73fbc9..5ea3d2d1b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -914,7 +914,7 @@ optional Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) { // See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection - ClusterConfig::Node master = cluster_config->GetMasterNodeForSlot(*keys_slot); + ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot); return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)}; } diff --git a/src/server/table.cc b/src/server/table.cc index 64de53de9..b5ede72b8 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -6,6 +6,7 @@ #include "base/flags.h" #include "base/logging.h" +#include "server/cluster/cluster_config.h" #include "server/server_state.h" ABSL_FLAG(bool, enable_top_keys_tracking, false, diff --git a/src/server/table.h b/src/server/table.h index 0eb0846ce..696feb1b7 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -11,7 +11,7 @@ #include "core/expire_period.h" #include "core/intent_lock.h" -#include "server/cluster/cluster_config.h" +#include "server/cluster/slot_set.h" #include "server/conn_context.h" #include "server/detail/table.h" #include "server/top_keys.h"