feat(cluster): Implement DFLY CLUSTER CONFIG command. (#1223)

* fix: Lock before accessing slots_

* Implement `DFLY CLUSTER CONFIG` command.

* Move JSON parsing logic to ClusterConfig

* clang-tidy

* Rename test fixture class
This commit is contained in:
Chaka 2023-05-17 13:02:47 +03:00 committed by GitHub
parent f96a083473
commit f80afca9c2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 551 additions and 2 deletions

View file

@ -52,6 +52,7 @@ cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(search_family_test dfly_test_lib LABELS DFLY)
cxx_test(cluster_config_test dfly_test_lib LABELS DFLY)
cxx_test(dflycmd_test dfly_test_lib LABELS DFLY)

View file

@ -2,6 +2,7 @@ extern "C" {
#include "redis/crc16.h"
}
#include <jsoncons/json.hpp>
#include <shared_mutex>
#include <string_view>
@ -95,6 +96,139 @@ bool ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
return true;
}
namespace {
constexpr string_view kInvalidConfigPrefix = "Invalid JSON cluster config: "sv;
template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
if (!obj.is_number()) {
LOG(WARNING) << kInvalidConfigPrefix << "object is not a number " << obj;
return nullopt;
}
return obj.as<T>();
}
optional<vector<ClusterConfig::SlotRange>> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}
vector<ClusterConfig::SlotRange> ranges;
for (const auto& range : slots.array_value()) {
if (!range.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges element is not an object " << range;
return nullopt;
}
optional<SlotId> start = ReadNumeric<SlotId>(range.at_or_null("start"));
optional<SlotId> end = ReadNumeric<SlotId>(range.at_or_null("end"));
if (!start.has_value() || !end.has_value()) {
return nullopt;
}
ranges.push_back({.start = start.value(), .end = end.value()});
}
return ranges;
}
optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt;
}
ClusterConfig::Node node;
{
auto id = json.at_or_null("id");
if (!id.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid id for node " << json;
return nullopt;
}
node.id = std::move(id).as_string();
}
{
auto ip = json.at_or_null("ip");
if (!ip.is_string()) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid ip for node " << json;
return nullopt;
}
node.ip = std::move(ip).as_string();
}
{
auto port = ReadNumeric<uint16_t>(json.at_or_null("port"));
if (!port.has_value()) {
return nullopt;
}
node.port = port.value();
}
return node;
}
optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;
if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
return nullopt;
}
for (const auto& element : json.array_value()) {
ClusterConfig::ClusterShard shard;
if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
return nullopt;
}
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!slots.has_value()) {
return nullopt;
}
shard.slot_ranges = std::move(slots).value();
auto master = ParseClusterNode(element.at_or_null("master"));
if (!master.has_value()) {
return nullopt;
}
shard.master = std::move(master).value();
auto replicas = element.at_or_null("replicas");
if (!replicas.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "replicas is not an array " << replicas;
return nullopt;
}
for (const auto& replica : replicas.array_value()) {
auto node = ParseClusterNode(replica);
if (!node.has_value()) {
return nullopt;
}
shard.replicas.push_back(std::move(node).value());
}
config.push_back(std::move(shard));
}
return config;
}
} // namespace
bool ClusterConfig::SetConfig(const JsonType& json) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json);
if (!config.has_value()) {
return false;
}
return SetConfig(config.value());
}
bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;

View file

@ -11,6 +11,7 @@
#include <string_view>
#include <vector>
#include "core/json_object.h"
#include "src/core/fibers.h"
namespace dfly {
@ -19,6 +20,8 @@ using SlotId = uint16_t;
class ClusterConfig {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;
struct Node {
std::string id;
std::string ip;
@ -61,7 +64,9 @@ class ClusterConfig {
// Returns true if `new_config` is valid and internal state was changed. Returns false and changes
// nothing otherwise.
bool SetConfig(const ClusterShards& new_config);
static constexpr SlotId kMaxSlotNum = 0x3FFF;
// Parses `json` into `ClusterShards` and calls the above overload.
bool SetConfig(const JsonType& json);
private:
struct SlotEntry {

View file

@ -6,6 +6,8 @@
#include <gmock/gmock-matchers.h>
#include <jsoncons/json.hpp>
#include "base/gtest.h"
#include "base/logging.h"
@ -20,6 +22,12 @@ MATCHER_P(NodeMatches, expected, "") {
class ClusterConfigTest : public ::testing::Test {
protected:
JsonType ParseJson(string_view json_str) {
optional<JsonType> opt_json = JsonFromString(json_str);
CHECK(opt_json.has_value());
return opt_json.value();
}
const string kMyId = "my-id";
ClusterConfig config_{kMyId};
};
@ -49,7 +57,7 @@ TEST_F(ClusterConfigTest, ConfigEmpty) {
}
TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_FALSE(config_.SetConfig({}));
EXPECT_FALSE(config_.SetConfig(ClusterConfig::ClusterShards{}));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
@ -118,4 +126,169 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
}
}
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) {
// Note that slot_ranges is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": "0,16383",
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) {
// Note that slot_ranges.start is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": "0",
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) {
// Note that slot_ranges.end is not a number
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": "16383"
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
]
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) {
// Note that master is not an object
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": 123,
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"ip": "10.0.0.0",
"port": 8000
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"port": 8000
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0"
},
"replicas": []
}
])json")));
}
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) {
EXPECT_FALSE(config_.SetConfig(ParseJson(R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcdefg",
"ip": "10.0.0.0",
"port": 8000
}
}
])json")));
}
} // namespace dfly

View file

@ -7,9 +7,15 @@
#include <absl/strings/str_cat.h>
#include <absl/strings/strip.h>
#include <jsoncons/json.hpp>
#include <limits>
#include <optional>
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
@ -370,15 +376,40 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
}
}
void DflyCmd::ClusterConfig(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();
if (args.size() != 3) {
return rb->SendError(WrongNumArgsError("dfly cluster config"));
}
string_view json_str = ArgS(args, 2);
optional<JsonType> json = JsonFromString(json_str);
if (!json.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
return rb->SendError("Invalid JSON cluster config", kSyntaxErrType);
}
if (!sf_->cluster_config()->SetConfig(json.value())) {
return rb->SendError("Invalid cluster configuration.");
}
return rb->SendOk();
}
void DflyCmd::ClusterManagmentCmd(CmdArgList args, ConnectionContext* cntx) {
if (!ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError("DFLY CLUSTER commands requires --cluster_mode=yes");
}
CHECK_NE(sf_->cluster_config(), nullptr);
// TODO check admin port
ToUpper(&args[1]);
string_view sub_cmd = ArgS(args, 1);
if (sub_cmd == "GETSLOTINFO") {
return ClusterGetSlotInfo(args, cntx);
} else if (sub_cmd == "CONFIG") {
return ClusterConfig(args, cntx);
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLY CLUSTER"), kSyntaxErrType);

View file

@ -169,6 +169,9 @@ class DflyCmd {
// CLUSTER GETSLOTINFO command
void ClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
// CLUSTER CONFIG command
void ClusterConfig(CmdArgList args, ConnectionContext* cntx);
// Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);

202
src/server/dflycmd_test.cc Normal file
View file

@ -0,0 +1,202 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include <absl/flags/reflection.h>
#include <string>
#include <string_view>
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/test_utils.h"
namespace dfly {
namespace {
using namespace std;
class DflyFamilyTest : public BaseFamilyTest {};
class DflyClusterTest : public DflyFamilyTest {
public:
DflyClusterTest() {
auto* flag = absl::FindCommandLineFlag("cluster_mode");
CHECK_NE(flag, nullptr);
string error;
CHECK(flag->ParseFrom("yes", &error));
}
protected:
static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration";
};
TEST_F(DflyClusterTest, ClusterConfigInvalid) {
EXPECT_THAT(Run({"dfly", "cluster", "config"}), ErrArg("syntax error"));
EXPECT_THAT(Run({"dfly", "cluster", "config", "invalid JSON"}),
ErrArg("Invalid JSON cluster config"));
EXPECT_THAT(Run({"dfly", "cluster", "config", "[]"}), ErrArg(kInvalidConfiguration));
}
TEST_F(DflyClusterTest, ClusterConfigInvalidMissingSlots) {
EXPECT_THAT(Run({"dfly", "cluster", "config", R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 100
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json"}),
ErrArg(kInvalidConfiguration));
}
TEST_F(DflyClusterTest, ClusterConfigInvalidOverlappingSlots) {
EXPECT_THAT(Run({"dfly", "cluster", "config", R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 1000
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
},
{
"slot_ranges": [
{
"start": 800,
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json"}),
ErrArg(kInvalidConfiguration));
}
TEST_F(DflyClusterTest, ClusterConfigNoReplicas) {
EXPECT_EQ(Run({"dfly", "cluster", "config", R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": []
}
])json"}),
"OK");
// TODO: Use "CLUSTER SLOTS" and "CLUSTER SHARDS" once implemented to verify new configuration
// takes effect.
}
TEST_F(DflyClusterTest, ClusterConfigFull) {
EXPECT_EQ(Run({"dfly", "cluster", "config", R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 16383
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": [
{
"id": "wxyz",
"ip": "10.0.0.10",
"port": 8000
}
]
}
])json"}),
"OK");
// TODO: Use "CLUSTER SLOTS" and "CLUSTER SHARDS" once implemented to verify new configuration
// takes effect.
}
TEST_F(DflyClusterTest, ClusterConfigFullMultipleInstances) {
EXPECT_EQ(Run({"dfly", "cluster", "config", R"json(
[
{
"slot_ranges": [
{
"start": 0,
"end": 10000
}
],
"master": {
"id": "abcd1234",
"ip": "10.0.0.1",
"port": 7000
},
"replicas": [
{
"id": "wxyz",
"ip": "10.0.0.10",
"port": 8000
}
]
},
{
"slot_ranges": [
{
"start": 10001,
"end": 16383
}
],
"master": {
"id": "efgh7890",
"ip": "10.0.0.2",
"port": 7001
},
"replicas": [
{
"id": "qwerty",
"ip": "10.0.0.11",
"port": 8001
}
]
}
])json"}),
"OK");
// TODO: Use "CLUSTER SLOTS" and "CLUSTER SHARDS" once implemented to verify new configuration
// takes effect.
}
} // namespace
} // namespace dfly