feat: add node health status for CLUSTER SLOTS and SHARDS (#4767)

* feat: add node health status for CLUSTER SLOTS and SHARDS
This commit is contained in:
Borys 2025-03-17 12:01:11 +02:00 committed by GitHub
parent d75cfe8c9f
commit 151e40e2c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 146 additions and 57 deletions

View file

@ -31,12 +31,12 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) {
for (const auto& shard : new_config) {
if (!CheckAndInsertNode(shard.master.id)) {
LOG(WARNING) << "Master " << shard.master.id << " appears more than once";
LOG(ERROR) << "Master " << shard.master.id << " appears more than once";
return false;
}
for (const auto& replica : shard.replicas) {
if (!CheckAndInsertNode(replica.id)) {
LOG(WARNING) << "Replica " << replica.id << " appears more than once";
LOG(ERROR) << "Replica " << replica.id << " appears more than once";
return false;
}
}
@ -56,21 +56,21 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
for (const auto& shard : new_config) {
for (const auto& slot_range : shard.slot_ranges) {
if (slot_range.start > slot_range.end) {
LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
LOG(ERROR) << "Invalid cluster config: start=" << slot_range.start
<< " is larger than end=" << slot_range.end;
return false;
}
for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) {
if (slot >= slots_found.size()) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
LOG(ERROR) << "Invalid cluster config: slot=" << slot
<< " is bigger than allowed max=" << slots_found.size();
return false;
}
if (slots_found[slot]) {
LOG(WARNING) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
LOG(ERROR) << "Invalid cluster config: slot=" << slot
<< " was already configured by another slot range.";
return false;
}
@ -80,7 +80,7 @@ bool IsConfigValid(const ClusterShardInfos& new_config) {
}
if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) {
LOG(WARNING) << "Invalid cluster config: some slots were missing.";
LOG(ERROR) << "Invalid cluster config: some slots were missing.";
return false;
}
@ -129,7 +129,7 @@ constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;
template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}
@ -138,7 +138,7 @@ template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}
@ -146,7 +146,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
for (const auto& range : slots.array_range()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}
@ -164,7 +164,7 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}
@ -173,7 +173,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
@ -182,7 +182,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
@ -200,7 +200,7 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
auto health = json.at_or_null("health");
if (!health.is_null()) {
if (!health.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node " << json;
} else {
auto health_str = std::move(health).as_string();
if (absl::EqualsIgnoreCase(health_str, "FAIL")) {
@ -209,8 +209,10 @@ optional<ClusterExtendedNodeInfo> ParseClusterNode(const JsonType& json) {
node.health = NodeHealth::LOADING;
} else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) {
node.health = NodeHealth::ONLINE;
} else if (absl::EqualsIgnoreCase(health_str, "HIDDEN")) {
node.health = NodeHealth::HIDDEN;
} else {
LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node: " << health_str;
}
}
}
@ -237,7 +239,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!node_id.is_string() || !ip.is_string() || !port || !slots) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
LOG(ERROR) << kInvalidConfigPrefix << "invalid migration json " << json;
return nullopt;
}
@ -253,7 +255,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
std::vector<ClusterShardInfo> config;
if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
LOG(ERROR) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}
@ -261,7 +263,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
ClusterShardInfo shard;
if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
LOG(ERROR) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}
@ -279,7 +281,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
LOG(ERROR) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}
@ -309,7 +311,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
std::string_view json_str) {
optional<JsonType> json_config = JsonFromString(json_str, PMR_NS::get_default_resource());
if (!json_config.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str;
return nullptr;
}

View file

@ -7,6 +7,7 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/str_join.h>
#include "base/logging.h"
#include "cluster_config.h"
#include "facade/error.h"
#include "slot_set.h"
@ -73,4 +74,21 @@ facade::ErrorReply SlotOwnershipError(SlotId slot_id) {
}
return facade::ErrorReply{facade::OpStatus::OK};
}
std::string_view ToString(NodeHealth nh) {
switch (nh) {
case NodeHealth::FAIL:
return "fail";
case NodeHealth::LOADING:
return "loading";
case NodeHealth::ONLINE:
return "online";
case NodeHealth::HIDDEN:
DCHECK(false); // shouldn't be used
return "hidden";
}
DCHECK(false);
return "undefined_health";
}
} // namespace dfly::cluster

View file

@ -94,10 +94,11 @@ struct ClusterNodeInfo {
}
};
enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE };
enum class NodeHealth : std::uint8_t { FAIL, LOADING, ONLINE, HIDDEN };
std::string_view ToString(NodeHealth nh);
struct ClusterExtendedNodeInfo : ClusterNodeInfo {
NodeHealth health = NodeHealth::NONE;
NodeHealth health = NodeHealth::ONLINE;
};
struct MigrationInfo {
@ -159,6 +160,10 @@ class ClusterShardInfos {
return infos_ != r.infos_;
}
auto Unwrap() const {
return infos_;
}
private:
std::vector<ClusterShardInfo> infos_;
};

View file

@ -72,16 +72,12 @@ ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) {
return config;
}
// We can't mutate `config` so we copy it over
std::vector<ClusterShardInfo> infos;
infos.reserve(config.size());
for (auto& node : config) {
infos.push_back(node);
infos.rbegin()->replicas.clear();
auto shards_info = config.Unwrap();
for (auto& node : shards_info) {
node.replicas.clear();
}
return ClusterShardInfos{std::move(infos)};
return shards_info;
}
} // namespace
@ -149,14 +145,15 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
? static_cast<uint16_t>(absl::GetFlag(FLAGS_port))
: cluster_announce_port;
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE};
info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port},
NodeHealth::ONLINE};
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({{.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)},
NodeHealth::NONE});
NodeHealth::ONLINE});
}
}
} else {
@ -166,7 +163,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
info.replicas.push_back({{.id = id_,
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))},
NodeHealth::NONE});
NodeHealth::ONLINE});
}
return info;
@ -196,7 +193,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
constexpr unsigned int kEntrySize = 4;
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) {
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role) {
constexpr unsigned int kNodeSize = 14;
rb->StartArray(kNodeSize);
rb->SendBulkString("id");
@ -212,7 +209,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
rb->SendBulkString("replication-offset");
rb->SendLong(0);
rb->SendBulkString("health");
rb->SendBulkString("online");
rb->SendBulkString(ToString(node.health));
};
rb->StartArray(config.size());
@ -237,15 +234,22 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde
} // namespace
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
auto shard_infos = GetShardInfos(cntx);
if (shard_infos) {
return ClusterShardsImpl(*shard_infos, builder);
auto config = GetShardInfos(cntx);
if (config) {
// we need to remove hiden replicas
auto shards_info = config->Unwrap();
for (auto& shard : shards_info) {
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(),
[](const auto& r) { return r.health == NodeHealth::HIDDEN; });
shard.replicas.erase(new_end, shard.replicas.end());
}
return ClusterShardsImpl({shards_info}, builder);
}
return builder->SendError(kClusterNotConfigured);
}
namespace {
void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
void ClusterSlotsImpl(ClusterShardInfos config, SinkReplyBuilder* builder) {
// For more details https://redis.io/commands/cluster-slots/
auto* rb = static_cast<RedisReplyBuilder*>(builder);
@ -258,10 +262,20 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder
};
unsigned int slot_ranges = 0;
for (const auto& shard : config) {
// we need to remove hiden and fail replicas
auto shards_info = config.Unwrap();
for (auto& shard : shards_info) {
slot_ranges += shard.slot_ranges.Size();
auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), [](const auto& r) {
return r.health == NodeHealth::HIDDEN || r.health == NodeHealth::FAIL ||
r.health == NodeHealth::LOADING;
});
shard.replicas.erase(new_end, shard.replicas.end());
}
config = {shards_info};
rb->StartArray(slot_ranges);
for (const auto& shard : config) {
for (const auto& slot_range : shard.slot_ranges) {
@ -294,7 +308,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
string result;
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id,
auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role, string_view master_id,
const SlotRanges& ranges) {
absl::StrAppend(&result, node.id, " ");
@ -307,7 +321,8 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
absl::StrAppend(&result, master_id, " ");
absl::StrAppend(&result, "0 0 0 connected");
absl::StrAppend(&result,
node.health != NodeHealth::FAIL ? "0 0 0 connected" : "0 0 0 disconnected");
for (const auto& range : ranges) {
absl::StrAppend(&result, " ", range.start);
@ -324,7 +339,9 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
WriteNode(shard.master, "master", "-", shard.slot_ranges);
for (const auto& replica : shard.replicas) {
// Only the master prints ranges, so we send an empty set for replicas.
WriteNode(replica, "slave", shard.master.id, {});
if (replica.health != NodeHealth::HIDDEN) {
WriteNode(replica, "slave", shard.master.id, {});
}
}
}

View file

@ -50,7 +50,8 @@ class ClusterFamilyTest : public BaseFamilyTest {
"master": {
"id": "$0",
"ip": "10.0.0.1",
"port": 7000
"port": 7000,
"health": "online"
},
"replicas": []
}
@ -208,13 +209,15 @@ TEST_F(ClusterFamilyTest, ClusterConfigFull) {
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
"port": 7000,
"health": "online"
},
"replicas": [
{
"id": "wxyz",
"ip": "10.0.0.10",
"port": 8000
"port": 8000,
"health": "online"
}
]
}
@ -280,13 +283,15 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
"port": 7000,
"health": "fail"
},
"replicas": [
{
"id": "wxyz",
"ip": "10.0.0.10",
"port": 8000
"port": 8000,
"health": "online"
}
]
},
@ -300,13 +305,33 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
"master": {
"id": "efgh7890",
"ip": "10.0.0.2",
"port": 7001
"port": 7001,
"health": "online"
},
"replicas": [
{
"id": "qwerty",
"ip": "10.0.0.11",
"port": 8001
"port": 8001,
"health": "online"
},
{
"id": "qwerty1",
"ip": "10.0.0.12",
"port": 8002,
"health": "loading"
},
{
"id": "qwerty2",
"ip": "10.0.0.13",
"port": 8003,
"health": "fail"
},
{
"id": "qwerty3",
"ip": "10.0.0.14",
"port": 8004,
"health": "hidden"
}
]
}
@ -317,7 +342,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_state:ok"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_assigned:16384"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:16384"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:4"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:7"));
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:2"));
EXPECT_THAT(Run({"cluster", "shards"}),
@ -333,7 +358,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
"port", IntArg(7000), //
"role", "master", //
"replication-offset", IntArg(0), //
"health", "online")), //
"health", "fail")), //
RespArray(ElementsAre( //
"id", "wxyz", //
"endpoint", "10.0.0.10", //
@ -361,7 +386,23 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
"port", IntArg(8001), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "online")))))))));
"health", "online")), //
RespArray(ElementsAre( //
"id", "qwerty1", //
"endpoint", "10.0.0.12", //
"ip", "10.0.0.12", //
"port", IntArg(8002), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "loading")), //
RespArray(ElementsAre( //
"id", "qwerty2", //
"endpoint", "10.0.0.13", //
"ip", "10.0.0.13", //
"port", IntArg(8003), //
"role", "replica", //
"replication-offset", IntArg(0), //
"health", "fail")))))))));
EXPECT_THAT(Run({"cluster", "slots"}),
RespArray(ElementsAre( //
@ -387,10 +428,12 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) {
"qwerty")))))));
EXPECT_THAT(Run({"cluster", "nodes"}),
"abcd1234 10.0.0.1:7000@7000 master - 0 0 0 connected 0-10000\n"
"abcd1234 10.0.0.1:7000@7000 master - 0 0 0 disconnected 0-10000\n"
"wxyz 10.0.0.10:8000@8000 slave abcd1234 0 0 0 connected\n"
"efgh7890 10.0.0.2:7001@7001 master - 0 0 0 connected 10001-16383\n"
"qwerty 10.0.0.11:8001@8001 slave efgh7890 0 0 0 connected\n");
"qwerty 10.0.0.11:8001@8001 slave efgh7890 0 0 0 connected\n"
"qwerty1 10.0.0.12:8002@8002 slave efgh7890 0 0 0 connected\n"
"qwerty2 10.0.0.13:8003@8003 slave efgh7890 0 0 0 disconnected\n");
absl::InsecureBitGen eng;
while (true) {

View file

@ -108,6 +108,7 @@ class NodeInfo:
slots: list
migrations: list
replicas: list
health: str
async def create_node_info(instance) -> NodeInfo:
@ -121,6 +122,7 @@ async def create_node_info(instance) -> NodeInfo:
slots=[],
migrations=[],
replicas=[],
health="online",
)
return ninfo
@ -133,12 +135,14 @@ def generate_config(nodes):
"id": node.id,
"ip": "127.0.0.1",
"port": node.instance.port,
"health": node.health,
},
"replicas": [
{
"id": replica.id,
"ip": "127.0.0.1",
"port": replica.instance.port,
"health": node.health,
}
for replica in node.replicas
],