From bc717a037d77f7b027de910c5ca5c64bf34e2af9 Mon Sep 17 00:00:00 2001 From: Chaka Date: Wed, 24 May 2023 15:53:19 +0300 Subject: [PATCH] feat(cluster): Implement `CLUSTER SHARDS`. (#1284) Implementation includes support for both emulated mode and real cluster mode. Fixes #1276. --- src/facade/facade_test.h | 6 ++ src/server/cluster/cluster_family.cc | 101 ++++++++++++++++++++++++- src/server/cluster/cluster_family.h | 3 + src/server/cluster_family_test.cc | 109 +++++++++++++++++++++++++-- src/server/test_utils.cc | 8 ++ src/server/test_utils.h | 2 + 6 files changed, 219 insertions(+), 10 deletions(-) diff --git a/src/facade/facade_test.h b/src/facade/facade_test.h index 0f910468a..c5a94a6b5 100644 --- a/src/facade/facade_test.h +++ b/src/facade/facade_test.h @@ -73,6 +73,12 @@ inline ::testing::PolymorphicMatcher ArgType(RespExpr::Type t) return ::testing::MakePolymorphicMatcher(RespTypeMatcher(t)); } +MATCHER_P(RespArray, value, "") { + return ExplainMatchResult(testing::AllOf(testing::Field(&RespExpr::type, RespExpr::ARRAY), + testing::Property(&RespExpr::GetVec, value)), + arg, result_listener); +} + inline bool operator==(const RespExpr& left, std::string_view s) { return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s; } diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index efd3410a6..785895db6 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -65,6 +65,42 @@ bool ClusterFamily::IsEnabledOrEmulated() const { return is_emulated_cluster_ || ClusterConfig::IsClusterEnabled(); } +// TODO: Extend this method to accommodate the needs of `CLUSTER NODES` and `CLUSTER SLOTS`. +// TODO: Also make this function safe in that it will read the state atomically. +ClusterConfig::ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { + ClusterConfig::ClusterShard info{ + .slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}}, + .master = {}, + .replicas = {}, + }; + + ServerState& etl = *ServerState::tlocal(); + if (etl.is_master) { + std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip); + std::string preferred_endpoint = + cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip; + + info.master = {.id = server_family_->master_id(), + .ip = preferred_endpoint, + .port = static_cast(absl::GetFlag(FLAGS_port))}; + + for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) { + info.replicas.push_back({.id = etl.remote_client_id_, + .ip = replica.address, + .port = static_cast(replica.listening_port)}); + } + } else { + Replica::Info replication_info = server_family_->GetReplicaInfo(); + info.master = { + .id = etl.remote_client_id_, .ip = replication_info.host, .port = replication_info.port}; + info.replicas.push_back({.id = server_family_->master_id(), + .ip = cntx->owner()->LocalBindAddress(), + .port = static_cast(absl::GetFlag(FLAGS_port))}); + } + + return info; +} + string ClusterFamily::BuildClusterNodeReply(ConnectionContext* cntx) const { ServerState& etl = *ServerState::tlocal(); auto epoch_master_time = std::time(nullptr) * 1000; @@ -117,6 +153,62 @@ void ClusterFamily::ClusterHelp(ConnectionContext* cntx) { return (*cntx)->SendSimpleStrArr(help_arr); } +namespace { +void ClusterShardsImpl(const ClusterConfig::ClusterShards& config, ConnectionContext* cntx) { + // For more details https://redis.io/commands/cluster-shards/ + constexpr unsigned int kEntrySize = 4; + + auto WriteNode = [&](const ClusterConfig::Node& node, string_view role) { + constexpr unsigned int kNodeSize = 14; + (*cntx)->StartArray(kNodeSize); + (*cntx)->SendBulkString("id"); + (*cntx)->SendBulkString(node.id); + (*cntx)->SendBulkString("endpoint"); + (*cntx)->SendBulkString(node.ip); + (*cntx)->SendBulkString("ip"); + (*cntx)->SendBulkString(node.ip); + (*cntx)->SendBulkString("port"); + (*cntx)->SendLong(node.port); + (*cntx)->SendBulkString("role"); + (*cntx)->SendBulkString(role); + (*cntx)->SendBulkString("replication-offset"); + (*cntx)->SendLong(0); + (*cntx)->SendBulkString("health"); + (*cntx)->SendBulkString("online"); + }; + + (*cntx)->StartArray(config.size()); + for (const auto& shard : config) { + (*cntx)->StartArray(kEntrySize); + (*cntx)->SendBulkString("slots"); + + (*cntx)->StartArray(shard.slot_ranges.size() * 2); + for (const auto& slot_range : shard.slot_ranges) { + (*cntx)->SendLong(slot_range.start); + (*cntx)->SendLong(slot_range.end); + } + + (*cntx)->SendBulkString("nodes"); + (*cntx)->StartArray(1 + shard.replicas.size()); + WriteNode(shard.master, "master"); + for (const auto& replica : shard.replicas) { + WriteNode(replica, "replica"); + } + } +} +} // namespace + +void ClusterFamily::ClusterShards(ConnectionContext* cntx) { + if (is_emulated_cluster_) { + ClusterConfig::ClusterShards config{GetEmulatedShardInfo(cntx)}; + return ClusterShardsImpl(config, cntx); + } else if (cluster_config_->IsConfigured()) { + return ClusterShardsImpl(cluster_config_->GetConfig(), cntx); + } else { + return (*cntx)->SendError("Cluster is not yet configured"); + } +} + void ClusterFamily::ClusterSlots(ConnectionContext* cntx) { // For more details https://redis.io/commands/cluster-slots/ constexpr unsigned int kClustersShardingCount = 1; @@ -251,6 +343,8 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { if (sub_cmd == "HELP") { return ClusterHelp(cntx); + } else if (sub_cmd == "SHARDS") { + return ClusterShards(cntx); } else if (sub_cmd == "SLOTS") { return ClusterSlots(cntx); } else if (sub_cmd == "NODES") { @@ -277,13 +371,16 @@ void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) { } void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { - if (!ClusterConfig::IsClusterEnabled()) { + if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) { return (*cntx)->SendError("DFLYCLUSTER commands requires --cluster_mode=yes"); } - CHECK_NE(cluster_config_.get(), nullptr); + if (!cntx->owner()->IsAdmin()) { return (*cntx)->SendError("DFLYCLUSTER commands requires admin port"); } + + CHECK(is_emulated_cluster_ || cluster_config_.get() != nullptr); + ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); if (sub_cmd == "GETSLOTINFO") { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 3410bf713..aa2f1139b 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -33,6 +33,7 @@ class ClusterFamily { // Cluster commands compatible with Redis void Cluster(CmdArgList args, ConnectionContext* cntx); void ClusterHelp(ConnectionContext* cntx); + void ClusterShards(ConnectionContext* cntx); void ClusterSlots(ConnectionContext* cntx); void ClusterNodes(ConnectionContext* cntx); void ClusterInfo(ConnectionContext* cntx); @@ -48,6 +49,8 @@ class ClusterFamily { std::string BuildClusterNodeReply(ConnectionContext* cntx) const; + ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; + bool is_emulated_cluster_ = false; ServerFamily* server_family_ = nullptr; diff --git a/src/server/cluster_family_test.cc b/src/server/cluster_family_test.cc index 73190d905..c154de571 100644 --- a/src/server/cluster_family_test.cc +++ b/src/server/cluster_family_test.cc @@ -24,10 +24,7 @@ using namespace testing; class ClusterFamilyTest : public BaseFamilyTest { public: ClusterFamilyTest() { - auto* flag = absl::FindCommandLineFlag("cluster_mode"); - CHECK_NE(flag, nullptr); - string error; - CHECK(flag->ParseFrom("yes", &error)); + SetTestFlag("cluster_mode", "yes"); } protected: @@ -67,6 +64,8 @@ TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) { EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:0")); EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:0")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:0")); + + EXPECT_THAT(Run({"cluster", "shards"}), ErrArg("Cluster is not yet configured")); } TEST_F(ClusterFamilyTest, ClusterConfigInvalidConfig) { @@ -177,6 +176,20 @@ TEST_F(ClusterFamilyTest, ClusterConfigNoReplicas) { EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:1")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1")); + EXPECT_THAT(Run({"cluster", "shards"}), + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(0), IntArg(16'383))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", "abcd1234", // + "endpoint", "10.0.0.1", // + "ip", "10.0.0.1", // + "port", IntArg(7000), // + "role", "master", // + "replication-offset", IntArg(0), // + "health", "online"))))))); + EXPECT_THAT(Run({"get", "x"}).GetString(), testing::MatchesRegex(R"(MOVED [0-9]+ 10.0.0.1:7000)")); @@ -216,6 +229,29 @@ TEST_F(ClusterFamilyTest, ClusterConfigFull) { EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:16384")); EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:2")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1")); + + EXPECT_THAT(Run({"cluster", "shards"}), + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(0), IntArg(16'383))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", "abcd1234", // + "endpoint", "10.0.0.1", // + "ip", "10.0.0.1", // + "port", IntArg(7000), // + "role", "master", // + "replication-offset", IntArg(0), // + "health", "online")), // + RespArray(ElementsAre( // + "id", "wxyz", // + "endpoint", "10.0.0.10", // + "ip", "10.0.0.10", // + "port", IntArg(8000), // + "role", "replica", // + "replication-offset", IntArg(0), // + "health", "online"))))))); + // TODO: Use "CLUSTER SLOTS" and "CLUSTER SHARDS" once implemented to verify new configuration // takes effect. } @@ -273,6 +309,49 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:4")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:2")); + EXPECT_THAT(Run({"cluster", "shards"}), + RespArray(ElementsAre( + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(0), IntArg(10'000))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", "abcd1234", // + "endpoint", "10.0.0.1", // + "ip", "10.0.0.1", // + "port", IntArg(7000), // + "role", "master", // + "replication-offset", IntArg(0), // + "health", "online")), // + RespArray(ElementsAre( // + "id", "wxyz", // + "endpoint", "10.0.0.10", // + "ip", "10.0.0.10", // + "port", IntArg(8000), // + "role", "replica", // + "replication-offset", IntArg(0), // + "health", "online")))))), // + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(10'001), IntArg(16'383))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", "efgh7890", // + "endpoint", "10.0.0.2", // + "ip", "10.0.0.2", // + "port", IntArg(7001), // + "role", "master", // + "replication-offset", IntArg(0), // + "health", "online")), // + RespArray(ElementsAre( // + "id", "qwerty", // + "endpoint", "10.0.0.11", // + "ip", "10.0.0.11", // + "port", IntArg(8001), // + "role", "replica", // + "replication-offset", IntArg(0), // + "health", "online"))))))))); + absl::InsecureBitGen eng; while (true) { string random_key = GetRandomHex(eng, 40); @@ -382,10 +461,8 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) { class ClusterFamilyEmulatedTest : public BaseFamilyTest { public: ClusterFamilyEmulatedTest() { - auto* flag = absl::FindCommandLineFlag("cluster_mode"); - CHECK_NE(flag, nullptr); - string error; - CHECK(flag->ParseFrom("emulated", &error)); + SetTestFlag("cluster_mode", "emulated"); + SetTestFlag("cluster_announce_ip", "fake-host"); } }; @@ -398,5 +475,21 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) { EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1")); } +TEST_F(ClusterFamilyEmulatedTest, ClusterShards) { + EXPECT_THAT(Run({"cluster", "shards"}), + RespArray(ElementsAre("slots", // + RespArray(ElementsAre(IntArg(0), IntArg(16383))), // + "nodes", // + RespArray(ElementsAre( // + RespArray(ElementsAre( // + "id", RunAdmin({"dflycluster", "myid"}).GetString(), // + "endpoint", "fake-host", // + "ip", "fake-host", // + "port", IntArg(6379), // + "role", "master", // + "replication-offset", IntArg(0), // + "health", "online"))))))); +} + } // namespace } // namespace dfly diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 2ed2ff98c..6fdd8f9e5 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -8,6 +8,7 @@ extern "C" { #include "redis/zmalloc.h" } +#include #include #include #include @@ -455,4 +456,11 @@ vector BaseFamilyTest::StrArray(const RespExpr& expr) { return res; } +void BaseFamilyTest::SetTestFlag(string_view flag_name, string_view new_value) { + auto* flag = absl::FindCommandLineFlag(flag_name); + CHECK_NE(flag, nullptr); + string error; + CHECK(flag->ParseFrom(new_value, &error)) << "Error: " << error; +} + } // namespace dfly diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 1c89f80be..4d8fdf43c 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -104,6 +104,8 @@ class BaseFamilyTest : public ::testing::Test { static unsigned NumLocked(); + void SetTestFlag(std::string_view flag_name, std::string_view new_value); + std::unique_ptr pp_; std::unique_ptr service_; unsigned num_threads_ = 3;