diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index b651a04ca..00bd73102 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -31,12 +31,12 @@ bool HasValidNodeIds(const ClusterShardInfos& new_config) { for (const auto& shard : new_config) { if (!CheckAndInsertNode(shard.master.id)) { - LOG(WARNING) << "Master " << shard.master.id << " appears more than once"; + LOG(ERROR) << "Master " << shard.master.id << " appears more than once"; return false; } for (const auto& replica : shard.replicas) { if (!CheckAndInsertNode(replica.id)) { - LOG(WARNING) << "Replica " << replica.id << " appears more than once"; + LOG(ERROR) << "Replica " << replica.id << " appears more than once"; return false; } } @@ -56,21 +56,21 @@ bool IsConfigValid(const ClusterShardInfos& new_config) { for (const auto& shard : new_config) { for (const auto& slot_range : shard.slot_ranges) { if (slot_range.start > slot_range.end) { - LOG(WARNING) << "Invalid cluster config: start=" << slot_range.start - << " is larger than end=" << slot_range.end; + LOG(ERROR) << "Invalid cluster config: start=" << slot_range.start + << " is larger than end=" << slot_range.end; return false; } for (SlotId slot = slot_range.start; slot <= slot_range.end; ++slot) { if (slot >= slots_found.size()) { - LOG(WARNING) << "Invalid cluster config: slot=" << slot - << " is bigger than allowed max=" << slots_found.size(); + LOG(ERROR) << "Invalid cluster config: slot=" << slot + << " is bigger than allowed max=" << slots_found.size(); return false; } if (slots_found[slot]) { - LOG(WARNING) << "Invalid cluster config: slot=" << slot - << " was already configured by another slot range."; + LOG(ERROR) << "Invalid cluster config: slot=" << slot + << " was already configured by another slot range."; return false; } @@ -80,7 +80,7 @@ bool IsConfigValid(const ClusterShardInfos& new_config) { } if (!all_of(slots_found.begin(), slots_found.end(), [](bool b) { return b; }) > 0UL) { - LOG(WARNING) << "Invalid cluster config: some slots were missing."; + LOG(ERROR) << "Invalid cluster config: some slots were missing."; return false; } @@ -129,7 +129,7 @@ constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv; template optional ReadNumeric(const JsonType& obj) { if (!obj.is_number()) { - LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj; + LOG(ERROR) << kInvalidConfigPrefix << "object is not a number " << obj; return nullopt; } @@ -138,7 +138,7 @@ template optional ReadNumeric(const JsonType& obj) { optional GetClusterSlotRanges(const JsonType& slots) { if (!slots.is_array()) { - LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots; + LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots; return nullopt; } @@ -146,7 +146,7 @@ optional GetClusterSlotRanges(const JsonType& slots) { for (const auto& range : slots.array_range()) { if (!range.is_object()) { - LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range; + LOG(ERROR) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range; return nullopt; } @@ -164,7 +164,7 @@ optional GetClusterSlotRanges(const JsonType& slots) { optional ParseClusterNode(const JsonType& json) { if (!json.is_object()) { - LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json; + LOG(ERROR) << kInvalidConfigPrefix << "node config is not an object " << json; return nullopt; } @@ -173,7 +173,7 @@ optional ParseClusterNode(const JsonType& json) { { auto id = json.at_or_null("id"); if (!id.is_string()) { - LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json; + LOG(ERROR) << kInvalidConfigPrefix << "invalid id for node " << json; return nullopt; } node.id = std::move(id).as_string(); @@ -182,7 +182,7 @@ optional ParseClusterNode(const JsonType& json) { { auto ip = json.at_or_null("ip"); if (!ip.is_string()) { - LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json; + LOG(ERROR) << kInvalidConfigPrefix << "invalid ip for node " << json; return nullopt; } node.ip = std::move(ip).as_string(); @@ -200,7 +200,7 @@ optional ParseClusterNode(const JsonType& json) { auto health = json.at_or_null("health"); if (!health.is_null()) { if (!health.is_string()) { - LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node " << json; + LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node " << json; } else { auto health_str = std::move(health).as_string(); if (absl::EqualsIgnoreCase(health_str, "FAIL")) { @@ -209,8 +209,10 @@ optional ParseClusterNode(const JsonType& json) { node.health = NodeHealth::LOADING; } else if (absl::EqualsIgnoreCase(health_str, "ONLINE")) { node.health = NodeHealth::ONLINE; + } else if (absl::EqualsIgnoreCase(health_str, "HIDDEN")) { + node.health = NodeHealth::HIDDEN; } else { - LOG(WARNING) << kInvalidConfigPrefix << "invalid health status for node: " << health_str; + LOG(ERROR) << kInvalidConfigPrefix << "invalid health status for node: " << health_str; } } } @@ -237,7 +239,7 @@ optional> ParseMigrations(const JsonType& json) { auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges")); if (!node_id.is_string() || !ip.is_string() || !port || !slots) { - LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json; + LOG(ERROR) << kInvalidConfigPrefix << "invalid migration json " << json; return nullopt; } @@ -253,7 +255,7 @@ optional BuildClusterConfigFromJson(const JsonType& json) { std::vector config; if (!json.is_array()) { - LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json; + LOG(ERROR) << kInvalidConfigPrefix << "not an array " << json; return nullopt; } @@ -261,7 +263,7 @@ optional BuildClusterConfigFromJson(const JsonType& json) { ClusterShardInfo shard; if (!element.is_object()) { - LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element; + LOG(ERROR) << kInvalidConfigPrefix << "shard element is not an object " << element; return nullopt; } @@ -279,7 +281,7 @@ optional BuildClusterConfigFromJson(const JsonType& json) { auto replicas = element.at_or_null("replicas"); if (!replicas.is_array()) { - LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas; + LOG(ERROR) << kInvalidConfigPrefix << "replicas is not an array " << replicas; return nullopt; } @@ -309,7 +311,7 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, std::string_view json_str) { optional json_config = JsonFromString(json_str, PMR_NS::get_default_resource()); if (!json_config.has_value()) { - LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str; + LOG(ERROR) << "Can't parse JSON for ClusterConfig " << json_str; return nullptr; } diff --git a/src/server/cluster/cluster_defs.cc b/src/server/cluster/cluster_defs.cc index 6a951a58d..9625d78c1 100644 --- a/src/server/cluster/cluster_defs.cc +++ b/src/server/cluster/cluster_defs.cc @@ -7,6 +7,7 @@ #include #include +#include "base/logging.h" #include "cluster_config.h" #include "facade/error.h" #include "slot_set.h" @@ -73,4 +74,21 @@ facade::ErrorReply SlotOwnershipError(SlotId slot_id) { } return facade::ErrorReply{facade::OpStatus::OK}; } + +std::string_view ToString(NodeHealth nh) { + switch (nh) { + case NodeHealth::FAIL: + return "fail"; + case NodeHealth::LOADING: + return "loading"; + case NodeHealth::ONLINE: + return "online"; + case NodeHealth::HIDDEN: + DCHECK(false); // shouldn't be used + return "hidden"; + } + DCHECK(false); + return "undefined_health"; +} + } // namespace dfly::cluster diff --git a/src/server/cluster/cluster_defs.h b/src/server/cluster/cluster_defs.h index 6bf4ef6dc..170fa3871 100644 --- a/src/server/cluster/cluster_defs.h +++ b/src/server/cluster/cluster_defs.h @@ -94,10 +94,11 @@ struct ClusterNodeInfo { } }; -enum class NodeHealth : std::uint8_t { NONE, FAIL, LOADING, ONLINE }; +enum class NodeHealth : std::uint8_t { FAIL, LOADING, ONLINE, HIDDEN }; +std::string_view ToString(NodeHealth nh); struct ClusterExtendedNodeInfo : ClusterNodeInfo { - NodeHealth health = NodeHealth::NONE; + NodeHealth health = NodeHealth::ONLINE; }; struct MigrationInfo { @@ -159,6 +160,10 @@ class ClusterShardInfos { return infos_ != r.infos_; } + auto Unwrap() const { + return infos_; + } + private: std::vector infos_; }; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 854b3a698..079697067 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -72,16 +72,12 @@ ClusterShardInfos GetConfigForStats(ConnectionContext* cntx) { return config; } - // We can't mutate `config` so we copy it over - std::vector infos; - infos.reserve(config.size()); - - for (auto& node : config) { - infos.push_back(node); - infos.rbegin()->replicas.clear(); + auto shards_info = config.Unwrap(); + for (auto& node : shards_info) { + node.replicas.clear(); } - return ClusterShardInfos{std::move(infos)}; + return shards_info; } } // namespace @@ -149,14 +145,15 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co ? static_cast(absl::GetFlag(FLAGS_port)) : cluster_announce_port; - info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, NodeHealth::NONE}; + info.master = {{.id = id_, .ip = preferred_endpoint, .port = preferred_port}, + NodeHealth::ONLINE}; if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) { for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) { info.replicas.push_back({{.id = replica.id, .ip = replica.address, .port = static_cast(replica.listening_port)}, - NodeHealth::NONE}); + NodeHealth::ONLINE}); } } } else { @@ -166,7 +163,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co info.replicas.push_back({{.id = id_, .ip = cntx->conn()->LocalBindAddress(), .port = static_cast(absl::GetFlag(FLAGS_port))}, - NodeHealth::NONE}); + NodeHealth::ONLINE}); } return info; @@ -196,7 +193,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde constexpr unsigned int kEntrySize = 4; auto* rb = static_cast(builder); - auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) { + auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role) { constexpr unsigned int kNodeSize = 14; rb->StartArray(kNodeSize); rb->SendBulkString("id"); @@ -212,7 +209,7 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde rb->SendBulkString("replication-offset"); rb->SendLong(0); rb->SendBulkString("health"); - rb->SendBulkString("online"); + rb->SendBulkString(ToString(node.health)); }; rb->StartArray(config.size()); @@ -237,15 +234,22 @@ void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builde } // namespace void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) { - auto shard_infos = GetShardInfos(cntx); - if (shard_infos) { - return ClusterShardsImpl(*shard_infos, builder); + auto config = GetShardInfos(cntx); + if (config) { + // we need to remove hiden replicas + auto shards_info = config->Unwrap(); + for (auto& shard : shards_info) { + auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), + [](const auto& r) { return r.health == NodeHealth::HIDDEN; }); + shard.replicas.erase(new_end, shard.replicas.end()); + } + return ClusterShardsImpl({shards_info}, builder); } return builder->SendError(kClusterNotConfigured); } namespace { -void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) { +void ClusterSlotsImpl(ClusterShardInfos config, SinkReplyBuilder* builder) { // For more details https://redis.io/commands/cluster-slots/ auto* rb = static_cast(builder); @@ -258,10 +262,20 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder }; unsigned int slot_ranges = 0; - for (const auto& shard : config) { + + // we need to remove hiden and fail replicas + auto shards_info = config.Unwrap(); + for (auto& shard : shards_info) { slot_ranges += shard.slot_ranges.Size(); + auto new_end = std::remove_if(shard.replicas.begin(), shard.replicas.end(), [](const auto& r) { + return r.health == NodeHealth::HIDDEN || r.health == NodeHealth::FAIL || + r.health == NodeHealth::LOADING; + }); + shard.replicas.erase(new_end, shard.replicas.end()); } + config = {shards_info}; + rb->StartArray(slot_ranges); for (const auto& shard : config) { for (const auto& slot_range : shard.slot_ranges) { @@ -294,7 +308,7 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, string result; - auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id, + auto WriteNode = [&](const ClusterExtendedNodeInfo& node, string_view role, string_view master_id, const SlotRanges& ranges) { absl::StrAppend(&result, node.id, " "); @@ -307,7 +321,8 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, absl::StrAppend(&result, master_id, " "); - absl::StrAppend(&result, "0 0 0 connected"); + absl::StrAppend(&result, + node.health != NodeHealth::FAIL ? "0 0 0 connected" : "0 0 0 disconnected"); for (const auto& range : ranges) { absl::StrAppend(&result, " ", range.start); @@ -324,7 +339,9 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, WriteNode(shard.master, "master", "-", shard.slot_ranges); for (const auto& replica : shard.replicas) { // Only the master prints ranges, so we send an empty set for replicas. - WriteNode(replica, "slave", shard.master.id, {}); + if (replica.health != NodeHealth::HIDDEN) { + WriteNode(replica, "slave", shard.master.id, {}); + } } } diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index 69cef1a68..9f32ec97f 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -50,7 +50,8 @@ class ClusterFamilyTest : public BaseFamilyTest { "master": { "id": "$0", "ip": "10.0.0.1", - "port": 7000 + "port": 7000, + "health": "online" }, "replicas": [] } @@ -208,13 +209,15 @@ TEST_F(ClusterFamilyTest, ClusterConfigFull) { "master": { "id": "abcd1234", "ip": "10.0.0.1", - "port": 7000 + "port": 7000, + "health": "online" }, "replicas": [ { "id": "wxyz", "ip": "10.0.0.10", - "port": 8000 + "port": 8000, + "health": "online" } ] } @@ -280,13 +283,15 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { "master": { "id": "abcd1234", "ip": "10.0.0.1", - "port": 7000 + "port": 7000, + "health": "fail" }, "replicas": [ { "id": "wxyz", "ip": "10.0.0.10", - "port": 8000 + "port": 8000, + "health": "online" } ] }, @@ -300,13 +305,33 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { "master": { "id": "efgh7890", "ip": "10.0.0.2", - "port": 7001 + "port": 7001, + "health": "online" }, "replicas": [ { "id": "qwerty", "ip": "10.0.0.11", - "port": 8001 + "port": 8001, + "health": "online" + }, + { + "id": "qwerty1", + "ip": "10.0.0.12", + "port": 8002, + "health": "loading" + }, + { + "id": "qwerty2", + "ip": "10.0.0.13", + "port": 8003, + "health": "fail" + }, + { + "id": "qwerty3", + "ip": "10.0.0.14", + "port": 8004, + "health": "hidden" } ] } @@ -317,7 +342,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { EXPECT_THAT(cluster_info, HasSubstr("cluster_state:ok")); EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_assigned:16384")); EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_ok:16384")); - EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:4")); + EXPECT_THAT(cluster_info, HasSubstr("cluster_known_nodes:7")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:2")); EXPECT_THAT(Run({"cluster", "shards"}), @@ -333,7 +358,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { "port", IntArg(7000), // "role", "master", // "replication-offset", IntArg(0), // - "health", "online")), // + "health", "fail")), // RespArray(ElementsAre( // "id", "wxyz", // "endpoint", "10.0.0.10", // @@ -361,7 +386,23 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { "port", IntArg(8001), // "role", "replica", // "replication-offset", IntArg(0), // - "health", "online"))))))))); + "health", "online")), // + RespArray(ElementsAre( // + "id", "qwerty1", // + "endpoint", "10.0.0.12", // + "ip", "10.0.0.12", // + "port", IntArg(8002), // + "role", "replica", // + "replication-offset", IntArg(0), // + "health", "loading")), // + RespArray(ElementsAre( // + "id", "qwerty2", // + "endpoint", "10.0.0.13", // + "ip", "10.0.0.13", // + "port", IntArg(8003), // + "role", "replica", // + "replication-offset", IntArg(0), // + "health", "fail"))))))))); EXPECT_THAT(Run({"cluster", "slots"}), RespArray(ElementsAre( // @@ -387,10 +428,12 @@ TEST_F(ClusterFamilyTest, ClusterConfigFullMultipleInstances) { "qwerty"))))))); EXPECT_THAT(Run({"cluster", "nodes"}), - "abcd1234 10.0.0.1:7000@7000 master - 0 0 0 connected 0-10000\n" + "abcd1234 10.0.0.1:7000@7000 master - 0 0 0 disconnected 0-10000\n" "wxyz 10.0.0.10:8000@8000 slave abcd1234 0 0 0 connected\n" "efgh7890 10.0.0.2:7001@7001 master - 0 0 0 connected 10001-16383\n" - "qwerty 10.0.0.11:8001@8001 slave efgh7890 0 0 0 connected\n"); + "qwerty 10.0.0.11:8001@8001 slave efgh7890 0 0 0 connected\n" + "qwerty1 10.0.0.12:8002@8002 slave efgh7890 0 0 0 connected\n" + "qwerty2 10.0.0.13:8003@8003 slave efgh7890 0 0 0 disconnected\n"); absl::InsecureBitGen eng; while (true) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index f4c625249..a2ccc56ec 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -108,6 +108,7 @@ class NodeInfo: slots: list migrations: list replicas: list + health: str async def create_node_info(instance) -> NodeInfo: @@ -121,6 +122,7 @@ async def create_node_info(instance) -> NodeInfo: slots=[], migrations=[], replicas=[], + health="online", ) return ninfo @@ -133,12 +135,14 @@ def generate_config(nodes): "id": node.id, "ip": "127.0.0.1", "port": node.instance.port, + "health": node.health, }, "replicas": [ { "id": replica.id, "ip": "127.0.0.1", "port": replica.instance.port, + "health": node.health, } for replica in node.replicas ],