fix: #2745 don't start migration process again after apply the same the same config is applied (#2822)

* fix: #2745 don't start a migration process again after the same config is applied
refactor: remove extra includes
This commit is contained in:
Borys 2024-04-03 10:21:27 +03:00 committed by GitHub
parent 3e71ab7bde
commit 84d451fbed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
21 changed files with 221 additions and 146 deletions

View file

@ -13,6 +13,7 @@ extern "C" {
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "cluster_config.h" #include "cluster_config.h"
#include "core/json/json_object.h"
using namespace std; using namespace std;
@ -71,7 +72,7 @@ SlotId ClusterConfig::KeySlot(string_view key) {
} }
namespace { namespace {
bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) { bool HasValidNodeIds(const ClusterShardInfos& new_config) {
absl::flat_hash_set<string_view> nodes; absl::flat_hash_set<string_view> nodes;
auto CheckAndInsertNode = [&](string_view node) { auto CheckAndInsertNode = [&](string_view node) {
@ -95,7 +96,7 @@ bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) {
return true; return true;
} }
bool IsConfigValid(const ClusterConfig::ClusterShards& new_config) { bool IsConfigValid(const ClusterShardInfos& new_config) {
// Make sure that all slots are set exactly once. // Make sure that all slots are set exactly once.
array<bool, ClusterConfig::kMaxSlotNum + 1> slots_found = {}; array<bool, ClusterConfig::kMaxSlotNum + 1> slots_found = {};
@ -140,7 +141,7 @@ bool IsConfigValid(const ClusterConfig::ClusterShards& new_config) {
/* static */ /* static */
shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id, shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
const ClusterShards& config) { const ClusterShardInfos& config) {
if (!IsConfigValid(config)) { if (!IsConfigValid(config)) {
return nullptr; return nullptr;
} }
@ -150,9 +151,9 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
result->config_ = config; result->config_ = config;
for (const auto& shard : result->config_) { for (const auto& shard : result->config_) {
bool owned_by_me = bool owned_by_me = shard.master.id == my_id ||
shard.master.id == my_id || any_of(shard.replicas.begin(), shard.replicas.end(), any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id; }); [&](const ClusterNodeInfo& node) { return node.id == my_id; });
if (owned_by_me) { if (owned_by_me) {
result->my_slots_.Set(shard.slot_ranges, true); result->my_slots_.Set(shard.slot_ranges, true);
result->my_outgoing_migrations_ = shard.migrations; result->my_outgoing_migrations_ = shard.migrations;
@ -206,13 +207,13 @@ optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
return ranges; return ranges;
} }
optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) { optional<ClusterNodeInfo> ParseClusterNode(const JsonType& json) {
if (!json.is_object()) { if (!json.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json; LOG(WARNING) << kInvalidConfigPrefix << "node config is not an object " << json;
return nullopt; return nullopt;
} }
ClusterConfig::Node node; ClusterNodeInfo node;
{ {
auto id = json.at_or_null("id"); auto id = json.at_or_null("id");
@ -243,8 +244,8 @@ optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
return node; return node;
} }
optional<std::vector<ClusterConfig::MigrationInfo>> ParseMigrations(const JsonType& json) { optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
std::vector<ClusterConfig::MigrationInfo> res; std::vector<MigrationInfo> res;
if (json.is_null()) { if (json.is_null()) {
return res; return res;
} }
@ -265,7 +266,7 @@ optional<std::vector<ClusterConfig::MigrationInfo>> ParseMigrations(const JsonTy
return nullopt; return nullopt;
} }
res.emplace_back(ClusterConfig::MigrationInfo{.slot_ranges = std::move(*slots), res.emplace_back(MigrationInfo{.slot_ranges = std::move(*slots),
.target_id = target_id.as_string(), .target_id = target_id.as_string(),
.ip = ip.as_string(), .ip = ip.as_string(),
.port = *port}); .port = *port});
@ -273,8 +274,8 @@ optional<std::vector<ClusterConfig::MigrationInfo>> ParseMigrations(const JsonTy
return res; return res;
} }
optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) { optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config; ClusterShardInfos config;
if (!json.is_array()) { if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json; LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
@ -282,7 +283,7 @@ optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType
} }
for (const auto& element : json.array_range()) { for (const auto& element : json.array_range()) {
ClusterConfig::ClusterShard shard; ClusterShardInfo shard;
if (!element.is_object()) { if (!element.is_object()) {
LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element; LOG(WARNING) << kInvalidConfigPrefix << "shard element is not an object " << element;
@ -330,8 +331,14 @@ optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType
/* static */ /* static */
shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id, shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
const JsonType& json_config) { std::string_view json_str) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json_config); optional<JsonType> json_config = JsonFromString(json_str, PMR_NS::get_default_resource());
if (!json_config.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
return nullptr;
}
optional<ClusterShardInfos> config = BuildClusterConfigFromJson(json_config);
if (!config.has_value()) { if (!config.has_value()) {
return nullptr; return nullptr;
} }
@ -359,7 +366,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const {
return IsMySlot(KeySlot(key)); return IsMySlot(KeySlot(key));
} }
ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { ClusterNodeInfo ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id; CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id;
for (const auto& shard : config_) { for (const auto& shard : config_) {
@ -374,7 +381,7 @@ ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
return {}; return {};
} }
ClusterConfig::ClusterShards ClusterConfig::GetConfig() const { ClusterShardInfos ClusterConfig::GetConfig() const {
return config_; return config_;
} }
@ -382,4 +389,39 @@ const SlotSet& ClusterConfig::GetOwnedSlots() const {
return my_slots_; return my_slots_;
} }
static std::vector<MigrationInfo> GetMissingMigrations(const std::vector<MigrationInfo>& haystack,
const std::vector<MigrationInfo>& needle) {
std::vector<MigrationInfo> res;
for (const auto& h : haystack) {
if (find(needle.begin(), needle.end(), h) == needle.end()) {
res.push_back(h);
}
}
return res;
}
std::vector<MigrationInfo> ClusterConfig::GetNewOutgoingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const {
return prev ? GetMissingMigrations(my_outgoing_migrations_, prev->my_outgoing_migrations_)
: my_outgoing_migrations_;
}
std::vector<MigrationInfo> ClusterConfig::GetNewIncomingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const {
return prev ? GetMissingMigrations(my_incoming_migrations_, prev->my_incoming_migrations_)
: my_incoming_migrations_;
}
std::vector<MigrationInfo> ClusterConfig::GetFinishedOutgoingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const {
return prev ? GetMissingMigrations(prev->my_outgoing_migrations_, my_outgoing_migrations_)
: std::vector<MigrationInfo>();
}
std::vector<MigrationInfo> ClusterConfig::GetFinishedIncomingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const {
return prev ? GetMissingMigrations(prev->my_incoming_migrations_, my_incoming_migrations_)
: std::vector<MigrationInfo>();
}
} // namespace dfly } // namespace dfly

View file

@ -9,7 +9,6 @@
#include <string_view> #include <string_view>
#include <vector> #include <vector>
#include "core/json/json_object.h"
#include "src/server/cluster/slot_set.h" #include "src/server/cluster/slot_set.h"
#include "src/server/common.h" #include "src/server/common.h"
@ -24,12 +23,7 @@ enum class MigrationState : uint8_t {
C_MAX_INVALID = std::numeric_limits<uint8_t>::max() C_MAX_INVALID = std::numeric_limits<uint8_t>::max()
}; };
class ClusterConfig { struct ClusterNodeInfo {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;
static constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1;
struct Node {
std::string id; std::string id;
std::string ip; std::string ip;
uint16_t port = 0; uint16_t port = 0;
@ -41,20 +35,24 @@ class ClusterConfig {
std::string ip; std::string ip;
uint16_t port = 0; uint16_t port = 0;
bool operator==(const ClusterConfig::MigrationInfo& r) const { bool operator==(const MigrationInfo& r) const {
return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && return ip == r.ip && port == r.port && slot_ranges == r.slot_ranges && target_id == r.target_id;
target_id == r.target_id;
} }
}; };
struct ClusterShard { struct ClusterShardInfo {
SlotRanges slot_ranges; SlotRanges slot_ranges;
Node master; ClusterNodeInfo master;
std::vector<Node> replicas; std::vector<ClusterNodeInfo> replicas;
std::vector<MigrationInfo> migrations; std::vector<MigrationInfo> migrations;
}; };
using ClusterShards = std::vector<ClusterShard>; using ClusterShardInfos = std::vector<ClusterShardInfo>;
class ClusterConfig {
public:
static constexpr SlotId kMaxSlotNum = 0x3FFF;
static constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1;
static SlotId KeySlot(std::string_view key); static SlotId KeySlot(std::string_view key);
@ -76,11 +74,11 @@ class ClusterConfig {
// Returns an instance with `config` if it is valid. // Returns an instance with `config` if it is valid.
// Returns heap-allocated object as it is too big for a stack frame. // Returns heap-allocated object as it is too big for a stack frame.
static std::shared_ptr<ClusterConfig> CreateFromConfig(std::string_view my_id, static std::shared_ptr<ClusterConfig> CreateFromConfig(std::string_view my_id,
const ClusterShards& config); const ClusterShardInfos& config);
// Parses `json_config` into `ClusterShards` and calls the above overload. // Parses `json_config` into `ClusterShardInfos` and calls the above overload.
static std::shared_ptr<ClusterConfig> CreateFromConfig(std::string_view my_id, static std::shared_ptr<ClusterConfig> CreateFromConfig(std::string_view my_id,
const JsonType& json_config); std::string_view json_config);
std::shared_ptr<ClusterConfig> CloneWithChanges(const std::vector<SlotRange>& slots, std::shared_ptr<ClusterConfig> CloneWithChanges(const std::vector<SlotRange>& slots,
bool enable) const; bool enable) const;
@ -90,29 +88,30 @@ class ClusterConfig {
bool IsMySlot(std::string_view key) const; bool IsMySlot(std::string_view key) const;
// Returns the master configured for `id`. // Returns the master configured for `id`.
Node GetMasterNodeForSlot(SlotId id) const; ClusterNodeInfo GetMasterNodeForSlot(SlotId id) const;
ClusterShards GetConfig() const; ClusterShardInfos GetConfig() const;
const SlotSet& GetOwnedSlots() const; const SlotSet& GetOwnedSlots() const;
const std::vector<MigrationInfo>& GetOutgoingMigrations() const { std::vector<MigrationInfo> GetNewOutgoingMigrations(
return my_outgoing_migrations_; const std::shared_ptr<ClusterConfig>& prev) const;
} std::vector<MigrationInfo> GetNewIncomingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const;
const std::vector<MigrationInfo>& GetIncomingMigrations() const { std::vector<MigrationInfo> GetFinishedOutgoingMigrations(
return my_incoming_migrations_; const std::shared_ptr<ClusterConfig>& prev) const;
} std::vector<MigrationInfo> GetFinishedIncomingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const;
private: private:
struct SlotEntry { struct SlotEntry {
const ClusterShard* shard = nullptr; const ClusterShardInfo* shard = nullptr;
bool owned_by_me = false; bool owned_by_me = false;
}; };
ClusterConfig() = default; ClusterConfig() = default;
ClusterShards config_; ClusterShardInfos config_;
SlotSet my_slots_; SlotSet my_slots_;
std::vector<MigrationInfo> my_outgoing_migrations_; std::vector<MigrationInfo> my_outgoing_migrations_;

View file

@ -13,7 +13,7 @@
using namespace std; using namespace std;
using namespace testing; using namespace testing;
using Node = dfly::ClusterConfig::Node; using Node = dfly::ClusterNodeInfo;
namespace dfly { namespace dfly {
@ -23,12 +23,6 @@ MATCHER_P(NodeMatches, expected, "") {
class ClusterConfigTest : public ::testing::Test { class ClusterConfigTest : public ::testing::Test {
protected: protected:
JsonType ParseJson(string_view json_str) {
optional<JsonType> opt_json = JsonFromString(json_str, PMR_NS::get_default_resource());
CHECK(opt_json.has_value());
return opt_json.value();
}
const string kMyId = "my-id"; const string kMyId = "my-id";
}; };
@ -50,7 +44,7 @@ TEST_F(ClusterConfigTest, KeyTagTest) {
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) { TEST_F(ClusterConfigTest, ConfigSetInvalidEmpty) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ClusterConfig::ClusterShards{}), nullptr); EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ClusterShardInfos{}), nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) { TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
@ -154,7 +148,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) { TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) {
// Note that slot_ranges is not an object // Note that slot_ranges is not an object
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": "0,16383", "slot_ranges": "0,16383",
@ -165,13 +159,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRanges) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) { TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) {
// Note that slot_ranges.start is not a number // Note that slot_ranges.start is not a number
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -187,13 +181,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeStart) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) { TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) {
// Note that slot_ranges.end is not a number // Note that slot_ranges.end is not a number
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -209,12 +203,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidSlotRangeEnd) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) { TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -224,13 +218,13 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingMaster) {
} }
] ]
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) { TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) {
// Note that master is not an object // Note that master is not an object
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -242,12 +236,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterNotObject) {
"master": 123, "master": 123,
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) { TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -262,12 +256,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingId) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) { TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -282,12 +276,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingIp) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) { TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -302,12 +296,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMasterMissingPort) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) { TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -322,12 +316,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingReplicas) {
"port": 8000 "port": 8000
} }
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterId) { TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -357,12 +351,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterId) {
}, },
"replicas": [] "replicas": []
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingReplicaId) { TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingReplicaId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -389,12 +383,12 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingReplicaId) {
} }
] ]
} }
])json")), ])json"),
nullptr); nullptr);
} }
TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterAndReplicaId) { TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterAndReplicaId) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, ParseJson(R"json( EXPECT_EQ(ClusterConfig::CreateFromConfig(kMyId, R"json(
[ [
{ {
"slot_ranges": [ "slot_ranges": [
@ -416,7 +410,7 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidRepeatingMasterAndReplicaId) {
} }
] ]
} }
])json")), ])json"),
nullptr); nullptr);
} }
@ -437,29 +431,68 @@ TEST_F(ClusterConfigTest, ConfigSetMigrations) {
} }
])json"; ])json";
auto config1 = ClusterConfig::CreateFromConfig("id0", ParseJson(config_str)); auto config1 = ClusterConfig::CreateFromConfig("id0", config_str);
EXPECT_EQ( EXPECT_EQ(
config1->GetOutgoingMigrations(), config1->GetNewOutgoingMigrations(nullptr),
(std::vector<ClusterConfig::MigrationInfo>{ (std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
EXPECT_TRUE(config1->GetIncomingMigrations().empty()); EXPECT_TRUE(config1->GetFinishedOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config1->GetNewIncomingMigrations(nullptr).empty());
EXPECT_TRUE(config1->GetFinishedIncomingMigrations(nullptr).empty());
auto config2 = ClusterConfig::CreateFromConfig("id1", ParseJson(config_str)); auto config2 = ClusterConfig::CreateFromConfig("id1", config_str);
EXPECT_EQ( EXPECT_EQ(
config2->GetIncomingMigrations(), config2->GetNewIncomingMigrations(nullptr),
(std::vector<ClusterConfig::MigrationInfo>{ (std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}})); {.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
EXPECT_TRUE(config2->GetOutgoingMigrations().empty()); EXPECT_TRUE(config2->GetFinishedOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config2->GetNewOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config2->GetFinishedIncomingMigrations(nullptr).empty());
auto config3 = ClusterConfig::CreateFromConfig("id2", ParseJson(config_str)); auto config3 = ClusterConfig::CreateFromConfig("id2", config_str);
EXPECT_TRUE(config3->GetIncomingMigrations().empty()); EXPECT_TRUE(config3->GetFinishedOutgoingMigrations(nullptr).empty());
EXPECT_TRUE(config3->GetOutgoingMigrations().empty()); EXPECT_TRUE(config3->GetNewIncomingMigrations(nullptr).empty());
EXPECT_TRUE(config3->GetFinishedIncomingMigrations(nullptr).empty());
EXPECT_TRUE(config3->GetNewOutgoingMigrations(nullptr).empty());
const auto* config_str2 = R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 6999 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": []
},
{
"slot_ranges": [ { "start": 7000, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json";
auto config4 = ClusterConfig::CreateFromConfig("id0", config_str2);
auto config5 = ClusterConfig::CreateFromConfig("id1", config_str2);
EXPECT_EQ(
config4->GetFinishedOutgoingMigrations(config1),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
EXPECT_TRUE(config4->GetNewIncomingMigrations(config1).empty());
EXPECT_TRUE(config4->GetFinishedIncomingMigrations(config1).empty());
EXPECT_TRUE(config4->GetNewOutgoingMigrations(config1).empty());
EXPECT_EQ(
config5->GetFinishedIncomingMigrations(config2),
(std::vector<MigrationInfo>{
{.slot_ranges = {{7000, 8000}}, .target_id = "id1", .ip = "127.0.0.1", .port = 9001}}));
EXPECT_TRUE(config5->GetNewIncomingMigrations(config2).empty());
EXPECT_TRUE(config5->GetFinishedOutgoingMigrations(config2).empty());
EXPECT_TRUE(config5->GetNewOutgoingMigrations(config2).empty());
} }
TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) { TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) {
auto config = ClusterConfig::CreateFromConfig("id0", ParseJson(R"json( auto config = ClusterConfig::CreateFromConfig("id0", R"json(
[ [
{ {
"slot_ranges": [ { "start": 0, "end": 8000 } ], "slot_ranges": [ { "start": 0, "end": 8000 } ],
@ -473,7 +506,7 @@ TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) {
"master": { "id": "id1", "ip": "localhost", "port": 3001 }, "master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": [] "replicas": []
} }
])json")); ])json");
EXPECT_EQ(config, nullptr); EXPECT_EQ(config, nullptr);
} }

View file

@ -39,9 +39,6 @@ using namespace facade;
using namespace util; using namespace util;
using CI = CommandId; using CI = CommandId;
using ClusterShard = ClusterConfig::ClusterShard;
using ClusterShards = ClusterConfig::ClusterShards;
using Node = ClusterConfig::Node;
constexpr char kIdNotFound[] = "syncid not found"; constexpr char kIdNotFound[] = "syncid not found";
@ -70,8 +67,8 @@ ClusterConfig* ClusterFamily::cluster_config() {
return tl_cluster_config.get(); return tl_cluster_config.get();
} }
ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShard info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}}, ClusterShardInfo info{.slot_ranges = {{.start = 0, .end = ClusterConfig::kMaxSlotNum}},
.master = {}, .master = {},
.replicas = {}, .replicas = {},
.migrations = {}}; .migrations = {}};
@ -123,12 +120,12 @@ void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
} }
namespace { namespace {
void ClusterShardsImpl(const ClusterShards& config, ConnectionContext* cntx) { void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-shards/ // For more details https://redis.io/commands/cluster-shards/
constexpr unsigned int kEntrySize = 4; constexpr unsigned int kEntrySize = 4;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto WriteNode = [&](const Node& node, string_view role) { auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) {
constexpr unsigned int kNodeSize = 14; constexpr unsigned int kNodeSize = 14;
rb->StartArray(kNodeSize); rb->StartArray(kNodeSize);
rb->SendBulkString("id"); rb->SendBulkString("id");
@ -179,10 +176,10 @@ void ClusterFamily::ClusterShards(ConnectionContext* cntx) {
} }
namespace { namespace {
void ClusterSlotsImpl(const ClusterShards& config, ConnectionContext* cntx) { void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-slots/ // For more details https://redis.io/commands/cluster-slots/
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto WriteNode = [&](const Node& node) { auto WriteNode = [&](const ClusterNodeInfo& node) {
constexpr unsigned int kNodeSize = 3; constexpr unsigned int kNodeSize = 3;
rb->StartArray(kNodeSize); rb->StartArray(kNodeSize);
rb->SendBulkString(node.ip); rb->SendBulkString(node.ip);
@ -223,12 +220,12 @@ void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
} }
namespace { namespace {
void ClusterNodesImpl(const ClusterShards& config, string_view my_id, ConnectionContext* cntx) { void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-nodes/ // For more details https://redis.io/commands/cluster-nodes/
string result; string result;
auto WriteNode = [&](const Node& node, string_view role, string_view master_id, auto WriteNode = [&](const ClusterNodeInfo& node, string_view role, string_view master_id,
const vector<SlotRange>& ranges) { const vector<SlotRange>& ranges) {
absl::StrAppend(&result, node.id, " "); absl::StrAppend(&result, node.id, " ");
@ -278,7 +275,7 @@ void ClusterFamily::ClusterNodes(ConnectionContext* cntx) {
} }
namespace { namespace {
void ClusterInfoImpl(const ClusterShards& config, ConnectionContext* cntx) { void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
std::string msg; std::string msg;
auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) { auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) {
// Separate lines with \r\n, not \n, see #2726 // Separate lines with \r\n, not \n, see #2726
@ -489,13 +486,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
} }
string_view json_str = ArgS(args, 0); string_view json_str = ArgS(args, 0);
optional<JsonType> json = JsonFromString(json_str, PMR_NS::get_default_resource()); shared_ptr<ClusterConfig> new_config = ClusterConfig::CreateFromConfig(id_, json_str);
if (!json.has_value()) {
LOG(WARNING) << "Can't parse JSON for ClusterConfig " << json_str;
return cntx->SendError("Invalid JSON cluster config", kSyntaxErrType);
}
shared_ptr<ClusterConfig> new_config = ClusterConfig::CreateFromConfig(id_, json.value());
if (new_config == nullptr) { if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config"; LOG(WARNING) << "Can't set cluster config";
return cntx->SendError("Invalid cluster configuration."); return cntx->SendError("Invalid cluster configuration.");
@ -503,7 +494,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
lock_guard gu(set_config_mu); lock_guard gu(set_config_mu);
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true); auto prev_config = tl_cluster_config;
SlotSet before = prev_config ? prev_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread // Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */, DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */,
@ -523,8 +515,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr); DCHECK(tl_cluster_config != nullptr);
// TODO rewrite with outgoing migrations if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(prev_config), cntx)) {
if (!StartSlotMigrations(new_config->GetOutgoingMigrations(), cntx)) {
return cntx->SendError("Can't start the migration"); return cntx->SendError("Can't start the migration");
} }
RemoveFinishedMigrations(); RemoveFinishedMigrations();
@ -607,11 +598,12 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
return cntx->SendOk(); return cntx->SendOk();
} }
bool ClusterFamily::StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations, bool ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations,
ConnectionContext* cntx) { ConnectionContext* cntx) {
// Add validating and error processing // Add validating and error processing
for (auto m : migrations) { for (auto m : migrations) {
auto outgoing_migration = CreateOutgoingMigration(m.ip, m.port, m.slot_ranges); auto outgoing_migration =
CreateOutgoingMigration(std::move(m.ip), m.port, std::move(m.slot_ranges));
outgoing_migration->Start(cntx); outgoing_migration->Start(cntx);
} }
return true; return true;

View file

@ -74,8 +74,7 @@ class ClusterFamily {
std::shared_ptr<ClusterSlotMigration> GetIncomingMigration(std::string host_ip, uint16_t port); std::shared_ptr<ClusterSlotMigration> GetIncomingMigration(std::string host_ip, uint16_t port);
bool StartSlotMigrations(const std::vector<ClusterConfig::MigrationInfo>& migrations, bool StartSlotMigrations(std::vector<MigrationInfo> migrations, ConnectionContext* cntx);
ConnectionContext* cntx);
void RemoveFinishedMigrations(); void RemoveFinishedMigrations();
// store info about migration and create unique session id // store info about migration and create unique session id
@ -95,7 +94,7 @@ class ClusterFamily {
OutgoingMigrationMap outgoing_migration_jobs_ ABSL_GUARDED_BY(migration_mu_); OutgoingMigrationMap outgoing_migration_jobs_ ABSL_GUARDED_BY(migration_mu_);
private: private:
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; ClusterShardInfo GetEmulatedShardInfo(ConnectionContext* cntx) const;
std::string id_; std::string id_;

View file

@ -62,7 +62,7 @@ class ClusterFamilyTest : public BaseFamilyTest {
TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) { TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) {
EXPECT_THAT(RunPrivileged({"dflycluster", "config", "invalid JSON"}), EXPECT_THAT(RunPrivileged({"dflycluster", "config", "invalid JSON"}),
ErrArg("Invalid JSON cluster config")); ErrArg("Invalid cluster configuration."));
string cluster_info = Run({"cluster", "info"}).GetString(); string cluster_info = Run({"cluster", "info"}).GetString();
EXPECT_THAT(cluster_info, HasSubstr("cluster_state:fail")); EXPECT_THAT(cluster_info, HasSubstr("cluster_state:fail"));
@ -708,7 +708,7 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterInfo) {
EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1")); EXPECT_THAT(cluster_info, HasSubstr("cluster_size:1"));
} }
TEST_F(ClusterFamilyEmulatedTest, ClusterShards) { TEST_F(ClusterFamilyEmulatedTest, ClusterShardInfos) {
EXPECT_THAT(Run({"cluster", "shards"}), EXPECT_THAT(Run({"cluster", "shards"}),
RespArray(ElementsAre("slots", // RespArray(ElementsAre("slots", //
RespArray(ElementsAre(IntArg(0), IntArg(16383))), // RespArray(ElementsAre(IntArg(0), IntArg(16383))), //

View file

@ -3,6 +3,7 @@
// //
#pragma once #pragma once
#include "server/cluster/cluster_config.h"
#include "server/protocol_client.h" #include "server/protocol_client.h"
namespace dfly { namespace dfly {

View file

@ -1,5 +1,7 @@
#include "server/cluster/unique_slot_checker.h" #include "server/cluster/unique_slot_checker.h"
#include "server/cluster/cluster_config.h"
using namespace std; using namespace std;
namespace dfly { namespace dfly {

View file

@ -7,7 +7,7 @@
#include <optional> #include <optional>
#include <string_view> #include <string_view>
#include "server/cluster/cluster_config.h" #include "server/cluster/slot_set.h"
namespace dfly { namespace dfly {

View file

@ -18,6 +18,7 @@ extern "C" {
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "core/compact_object.h" #include "core/compact_object.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/journal/journal.h" #include "server/journal/journal.h"

View file

@ -9,6 +9,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "generic_family.h" #include "generic_family.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/journal/journal.h" #include "server/journal/journal.h"

View file

@ -4,7 +4,7 @@
#pragma once #pragma once
#include "server/cluster/cluster_config.h" #include "server/cluster/slot_set.h"
#include "server/conn_context.h" #include "server/conn_context.h"
namespace dfly { namespace dfly {

View file

@ -17,6 +17,7 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "io/proc_reader.h" #include "io/proc_reader.h"
#include "server/blocking_controller.h" #include "server/blocking_controller.h"
#include "server/cluster/cluster_config.h"
#include "server/search/doc_index.h" #include "server/search/doc_index.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/tiered_storage.h" #include "server/tiered_storage.h"

View file

@ -21,7 +21,6 @@ extern "C" {
#include "core/mi_memory_resource.h" #include "core/mi_memory_resource.h"
#include "core/task_queue.h" #include "core/task_queue.h"
#include "core/tx_queue.h" #include "core/tx_queue.h"
#include "server/cluster/cluster_config.h"
#include "server/db_slice.h" #include "server/db_slice.h"
namespace dfly { namespace dfly {

View file

@ -18,6 +18,7 @@ extern "C" {
#include "redis/rdb.h" #include "redis/rdb.h"
#include "server/acl/acl_commands_def.h" #include "server/acl/acl_commands_def.h"
#include "server/blocking_controller.h" #include "server/blocking_controller.h"
#include "server/cluster/cluster_config.h"
#include "server/command_registry.h" #include "server/command_registry.h"
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/container_utils.h" #include "server/container_utils.h"

View file

@ -7,6 +7,7 @@
#include <absl/functional/bind_front.h> #include <absl/functional/bind_front.h>
#include "base/logging.h" #include "base/logging.h"
#include "server/cluster/cluster_config.h"
namespace dfly { namespace dfly {
using namespace util; using namespace util;

View file

@ -4,6 +4,8 @@
#include "server/journal/types.h" #include "server/journal/types.h"
#include "server/cluster/cluster_config.h"
namespace dfly::journal { namespace dfly::journal {
std::string Entry::ToString() const { std::string Entry::ToString() const {

View file

@ -7,7 +7,7 @@
#include <string> #include <string>
#include <variant> #include <variant>
#include "server/cluster/cluster_config.h" #include "server/cluster/slot_set.h"
#include "server/common.h" #include "server/common.h"
#include "server/table.h" #include "server/table.h"

View file

@ -914,7 +914,7 @@ optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis
if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) { if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection // See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
ClusterConfig::Node master = cluster_config->GetMasterNodeForSlot(*keys_slot); ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot);
return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)}; return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)};
} }

View file

@ -6,6 +6,7 @@
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "server/cluster/cluster_config.h"
#include "server/server_state.h" #include "server/server_state.h"
ABSL_FLAG(bool, enable_top_keys_tracking, false, ABSL_FLAG(bool, enable_top_keys_tracking, false,

View file

@ -11,7 +11,7 @@
#include "core/expire_period.h" #include "core/expire_period.h"
#include "core/intent_lock.h" #include "core/intent_lock.h"
#include "server/cluster/cluster_config.h" #include "server/cluster/slot_set.h"
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/detail/table.h" #include "server/detail/table.h"
#include "server/top_keys.h" #include "server/top_keys.h"