feat(cluster): Implement CLUSTER SHARDS. (#1284)

Implementation includes support for both emulated mode and real cluster
mode.

Fixes #1276.
This commit is contained in:
Chaka 2023-05-24 15:53:19 +03:00 committed by GitHub
parent f746dc4f9f
commit bc717a037d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 219 additions and 10 deletions

View file

@ -73,6 +73,12 @@ inline ::testing::PolymorphicMatcher<RespTypeMatcher> ArgType(RespExpr::Type t)
return ::testing::MakePolymorphicMatcher(RespTypeMatcher(t));
}
MATCHER_P(RespArray, value, "") {
return ExplainMatchResult(testing::AllOf(testing::Field(&RespExpr::type, RespExpr::ARRAY),
testing::Property(&RespExpr::GetVec, value)),
arg, result_listener);
}
inline bool operator==(const RespExpr& left, std::string_view s) {
return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s;
}

View file

@ -65,6 +65,42 @@ bool ClusterFamily::IsEnabledOrEmulated() const {
return is_emulated_cluster_ || ClusterConfig::IsClusterEnabled();
}
// TODO: Extend this method to accommodate the needs of `CLUSTER NODES` and `CLUSTER SLOTS`.
// TODO: Also make this function safe in that it will read the state atomically.
ClusterConfig::ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterConfig::ClusterShard info{
.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}},
.master = {},
.replicas = {},
};
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
info.master = {.id = server_family_->master_id(),
.ip = preferred_endpoint,
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))};
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = etl.remote_client_id_,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
}
} else {
Replica::Info replication_info = server_family_->GetReplicaInfo();
info.master = {
.id = etl.remote_client_id_, .ip = replication_info.host, .port = replication_info.port};
info.replicas.push_back({.id = server_family_->master_id(),
.ip = cntx->owner()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
}
return info;
}
string ClusterFamily::BuildClusterNodeReply(ConnectionContext* cntx) const {
ServerState& etl = *ServerState::tlocal();
auto epoch_master_time = std::time(nullptr) * 1000;
@ -117,6 +153,62 @@ void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
return (*cntx)->SendSimpleStrArr(help_arr);
}
namespace {
void ClusterShardsImpl(const ClusterConfig::ClusterShards& config, ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-shards/
constexpr unsigned int kEntrySize = 4;
auto WriteNode = [&](const ClusterConfig::Node& node, string_view role) {
constexpr unsigned int kNodeSize = 14;
(*cntx)->StartArray(kNodeSize);
(*cntx)->SendBulkString("id");
(*cntx)->SendBulkString(node.id);
(*cntx)->SendBulkString("endpoint");
(*cntx)->SendBulkString(node.ip);
(*cntx)->SendBulkString("ip");
(*cntx)->SendBulkString(node.ip);
(*cntx)->SendBulkString("port");
(*cntx)->SendLong(node.port);
(*cntx)->SendBulkString("role");
(*cntx)->SendBulkString(role);
(*cntx)->SendBulkString("replication-offset");
(*cntx)->SendLong(0);
(*cntx)->SendBulkString("health");
(*cntx)->SendBulkString("online");
};
(*cntx)->StartArray(config.size());
for (const auto& shard : config) {
(*cntx)->StartArray(kEntrySize);
(*cntx)->SendBulkString("slots");
(*cntx)->StartArray(shard.slot_ranges.size() * 2);
for (const auto& slot_range : shard.slot_ranges) {
(*cntx)->SendLong(slot_range.start);
(*cntx)->SendLong(slot_range.end);
}
(*cntx)->SendBulkString("nodes");
(*cntx)->StartArray(1 + shard.replicas.size());
WriteNode(shard.master, "master");
for (const auto& replica : shard.replicas) {
WriteNode(replica, "replica");
}
}
}
} // namespace
void ClusterFamily::ClusterShards(ConnectionContext* cntx) {
if (is_emulated_cluster_) {
ClusterConfig::ClusterShards config{GetEmulatedShardInfo(cntx)};
return ClusterShardsImpl(config, cntx);
} else if (cluster_config_->IsConfigured()) {
return ClusterShardsImpl(cluster_config_->GetConfig(), cntx);
} else {
return (*cntx)->SendError("Cluster is not yet configured");
}
}
void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-slots/
constexpr unsigned int kClustersShardingCount = 1;
@ -251,6 +343,8 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
if (sub_cmd == "HELP") {
return ClusterHelp(cntx);
} else if (sub_cmd == "SHARDS") {
return ClusterShards(cntx);
} else if (sub_cmd == "SLOTS") {
return ClusterSlots(cntx);
} else if (sub_cmd == "NODES") {
@ -277,13 +371,16 @@ void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) {
}
void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
if (!ClusterConfig::IsClusterEnabled()) {
if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError("DFLYCLUSTER commands requires --cluster_mode=yes");
}
CHECK_NE(cluster_config_.get(), nullptr);
if (!cntx->owner()->IsAdmin()) {
return (*cntx)->SendError("DFLYCLUSTER commands requires admin port");
}
CHECK(is_emulated_cluster_ || cluster_config_.get() != nullptr);
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
if (sub_cmd == "GETSLOTINFO") {

View file

@ -33,6 +33,7 @@ class ClusterFamily {
// Cluster commands compatible with Redis
void Cluster(CmdArgList args, ConnectionContext* cntx);
void ClusterHelp(ConnectionContext* cntx);
void ClusterShards(ConnectionContext* cntx);
void ClusterSlots(ConnectionContext* cntx);
void ClusterNodes(ConnectionContext* cntx);
void ClusterInfo(ConnectionContext* cntx);
@ -48,6 +49,8 @@ class ClusterFamily {
std::string BuildClusterNodeReply(ConnectionContext* cntx) const;
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;
bool is_emulated_cluster_ = false;
ServerFamily* server_family_ = nullptr;

View file

@ -24,10 +24,7 @@ using namespace testing;
class ClusterFamilyTest : public BaseFamilyTest {
public:
ClusterFamilyTest() {
auto* flag = absl::FindCommandLineFlag("cluster_mode");
CHECK_NE(flag, nullptr);
string error;
CHECK(flag->ParseFrom("yes", &error));
SetTestFlag("cluster_mode", "yes");
}
protected:
@ -67,6 +64,8 @@ TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:0"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:0"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:0"));
EXPECT_THAT(Run({"cluster", "shards"}), ErrArg("Cluster is not yet configured"));
}
TEST_F(ClusterFamilyTest, ClusterConfigInvalidConfig) {
@ -177,6 +176,20 @@ TEST_F(ClusterFamilyTest, ClusterConfigNoReplicas) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:1"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1"));
EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16'383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", "abcd1234", //
"endpoint", "10.0.0.1", //
"ip", "10.0.0.1", //
"port", IntArg(7000), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")))))));
EXPECT_THAT(Run({"get", "x"}).GetString(),
testing::MatchesRegex(R"(MOVED [0-9]+ 10.0.0.1:7000)"));
@ -216,6 +229,29 @@ TEST_F(ClusterFamilyTest, ClusterConfigFull) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:16384"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:2"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1"));
EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16'383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", "abcd1234", //
"endpoint", "10.0.0.1", //
"ip", "10.0.0.1", //
"port", IntArg(7000), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")), //
RespArray(ElementsAre( //
"id", "wxyz", //
"endpoint", "10.0.0.10", //
"ip", "10.0.0.10", //
"port", IntArg(8000), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "online")))))));
// TODO: Use "CLUSTER SLOTS" and "CLUSTER SHARDS" once implemented to verify new configuration
// takes effect.
}
@ -273,6 +309,49 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:4"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:2"));
EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre(
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(10'000))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", "abcd1234", //
"endpoint", "10.0.0.1", //
"ip", "10.0.0.1", //
"port", IntArg(7000), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")), //
RespArray(ElementsAre( //
"id", "wxyz", //
"endpoint", "10.0.0.10", //
"ip", "10.0.0.10", //
"port", IntArg(8000), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "online")))))), //
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(10'001), IntArg(16'383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", "efgh7890", //
"endpoint", "10.0.0.2", //
"ip", "10.0.0.2", //
"port", IntArg(7001), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")), //
RespArray(ElementsAre( //
"id", "qwerty", //
"endpoint", "10.0.0.11", //
"ip", "10.0.0.11", //
"port", IntArg(8001), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "online")))))))));
absl::InsecureBitGen eng;
while (true) {
string random_key = GetRandomHex(eng, 40);
@ -382,10 +461,8 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) {
class ClusterFamilyEmulatedTest : public BaseFamilyTest {
public:
ClusterFamilyEmulatedTest() {
auto* flag = absl::FindCommandLineFlag("cluster_mode");
CHECK_NE(flag, nullptr);
string error;
CHECK(flag->ParseFrom("emulated", &error));
SetTestFlag("cluster_mode", "emulated");
SetTestFlag("cluster_announce_ip", "fake-host");
}
};
@ -398,5 +475,21 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1"));
}
TEST_F(ClusterFamilyEmulatedTest, ClusterShards) {
EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16383))), //
"nodes", //
RespArray(ElementsAre( //
RespArray(ElementsAre( //
"id", RunAdmin({"dflycluster", "myid"}).GetString(), //
"endpoint", "fake-host", //
"ip", "fake-host", //
"port", IntArg(6379), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")))))));
}
} // namespace
} // namespace dfly

View file

@ -8,6 +8,7 @@ extern "C" {
#include "redis/zmalloc.h"
}
#include <absl/flags/reflection.h>
#include <absl/strings/match.h>
#include <absl/strings/str_split.h>
#include <mimalloc.h>
@ -455,4 +456,11 @@ vector<string> BaseFamilyTest::StrArray(const RespExpr& expr) {
return res;
}
void BaseFamilyTest::SetTestFlag(string_view flag_name, string_view new_value) {
auto* flag = absl::FindCommandLineFlag(flag_name);
CHECK_NE(flag, nullptr);
string error;
CHECK(flag->ParseFrom(new_value, &error)) << "Error: " << error;
}
} // namespace dfly

View file

@ -104,6 +104,8 @@ class BaseFamilyTest : public ::testing::Test {
static unsigned NumLocked();
void SetTestFlag(std::string_view flag_name, std::string_view new_value);
std::unique_ptr<util::ProactorPool> pp_;
std::unique_ptr<Service> service_;
unsigned num_threads_ = 3;