chore: ignore applying the same cluster config twice (#3932)

This commit is contained in:
Borys 2024-10-16 09:07:01 +03:00 committed by GitHub
parent 98bb5da67d
commit ef814f6670
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 188 additions and 23 deletions

View file

@ -221,7 +221,7 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
}
optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
ClusterShardInfos config;
std::vector<ClusterShardInfo> config;
if (!json.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "not an array " << json;
@ -271,7 +271,7 @@ optional<ClusterShardInfos> BuildClusterConfigFromJson(const JsonType& json) {
config.push_back(std::move(shard));
}
return config;
return ClusterShardInfos(config);
}
} // namespace

View file

@ -105,14 +105,15 @@ TEST_F(ClusterConfigTest, ConfigSetInvalidMissingSlots) {
TEST_F(ClusterConfigTest, ConfigSetInvalidDoubleBookedSlot) {
EXPECT_EQ(ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}}),
kMyId,
ClusterShardInfos({{.slot_ranges = SlotRanges({{.start = 0, .end = 0x3FFF}}),
.master = {.id = "other", .ip = "192.168.0.100", .port = 7000},
.replicas = {},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 0, .end = 0}}),
.master = {.id = "other2", .ip = "192.168.0.101", .port = 7001},
.replicas = {},
.migrations = {}}})),
nullptr);
}
@ -150,18 +151,19 @@ TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
auto config = ClusterConfig::CreateFromConfig(
kMyId, {{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}});
kMyId, ClusterShardInfos(
{{.slot_ranges = SlotRanges({{.start = 0, .end = 5'000}}),
.master = {.id = "other-master", .ip = "192.168.0.100", .port = 7000},
.replicas = {{.id = "other-replica", .ip = "192.168.0.101", .port = 7001}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 5'001, .end = 10'000}}),
.master = {.id = kMyId, .ip = "192.168.0.102", .port = 7002},
.replicas = {{.id = "other-replica2", .ip = "192.168.0.103", .port = 7003}},
.migrations = {}},
{.slot_ranges = SlotRanges({{.start = 10'001, .end = 0x3FFF}}),
.master = {.id = "other-master3", .ip = "192.168.0.104", .port = 7004},
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}},
.migrations = {}}}));
EXPECT_NE(config, nullptr);
SlotSet owned_slots = config->GetOwnedSlots();
EXPECT_EQ(owned_slots.ToSlotRanges().Size(), 1);
@ -609,4 +611,98 @@ TEST_F(ClusterConfigTest, SlotSetAPI) {
}
}
TEST_F(ClusterConfigTest, ConfigComparison) {
auto config1 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_EQ(config1->GetConfig(), config1->GetConfig());
auto config2 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 16383 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
}
])json");
EXPECT_NE(config1->GetConfig(), config2->GetConfig());
EXPECT_EQ(config2->GetConfig(), config2->GetConfig());
auto config3 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9002, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_NE(config1->GetConfig(), config3->GetConfig());
EXPECT_NE(config2->GetConfig(), config3->GetConfig());
EXPECT_EQ(config3->GetConfig(), config3->GetConfig());
auto config4 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id0", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id2" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_NE(config1->GetConfig(), config4->GetConfig());
EXPECT_NE(config2->GetConfig(), config4->GetConfig());
EXPECT_NE(config3->GetConfig(), config4->GetConfig());
EXPECT_EQ(config4->GetConfig(), config4->GetConfig());
auto config5 = ClusterConfig::CreateFromConfig("id0", R"json(
[
{
"slot_ranges": [ { "start": 0, "end": 8000 } ],
"master": { "id": "id2", "ip": "localhost", "port": 3000 },
"replicas": [],
"migrations": [{ "slot_ranges": [ { "start": 7000, "end": 8000 } ]
, "ip": "127.0.0.1", "port" : 9001, "node_id": "id1" }]
},
{
"slot_ranges": [ { "start": 8001, "end": 16383 } ],
"master": { "id": "id1", "ip": "localhost", "port": 3001 },
"replicas": []
}
])json");
EXPECT_NE(config1->GetConfig(), config5->GetConfig());
EXPECT_NE(config2->GetConfig(), config5->GetConfig());
EXPECT_NE(config3->GetConfig(), config5->GetConfig());
EXPECT_NE(config4->GetConfig(), config5->GetConfig());
EXPECT_EQ(config5->GetConfig(), config5->GetConfig());
}
} // namespace dfly::cluster

View file

@ -49,6 +49,26 @@ std::string MigrationInfo::ToString() const {
slot_ranges.ToString(), ")");
}
bool ClusterShardInfo::operator==(const ClusterShardInfo& r) const {
if (slot_ranges == r.slot_ranges && master == r.master) {
auto lreplicas = replicas;
auto lmigrations = migrations;
auto rreplicas = r.replicas;
auto rmigrations = r.migrations;
std::sort(lreplicas.begin(), lreplicas.end());
std::sort(lmigrations.begin(), lmigrations.end());
std::sort(rreplicas.begin(), rreplicas.end());
std::sort(rmigrations.begin(), rmigrations.end());
return lreplicas == rreplicas && lmigrations == rmigrations;
}
return false;
}
ClusterShardInfos::ClusterShardInfos(std::vector<ClusterShardInfo> infos)
: infos_(std::move(infos)) {
std::sort(infos_.begin(), infos_.end());
}
namespace {
enum class ClusterMode {
kUninitialized,

View file

@ -90,6 +90,10 @@ struct ClusterNodeInfo {
bool operator==(const ClusterNodeInfo& r) const noexcept {
return port == r.port && ip == r.ip && id == r.id;
}
bool operator<(const ClusterNodeInfo& r) const noexcept {
return id < r.id;
}
};
struct MigrationInfo {
@ -100,6 +104,10 @@ struct MigrationInfo {
return node_info == r.node_info && slot_ranges == r.slot_ranges;
}
bool operator<(const MigrationInfo& r) const noexcept {
return node_info < r.node_info;
}
std::string ToString() const;
};
@ -108,9 +116,48 @@ struct ClusterShardInfo {
ClusterNodeInfo master;
std::vector<ClusterNodeInfo> replicas;
std::vector<MigrationInfo> migrations;
bool operator==(const ClusterShardInfo& r) const;
bool operator<(const ClusterShardInfo& r) const noexcept {
return master < r.master;
}
};
using ClusterShardInfos = std::vector<ClusterShardInfo>;
class ClusterShardInfos {
public:
ClusterShardInfos() = default;
ClusterShardInfos(std::vector<ClusterShardInfo> infos);
ClusterShardInfos(ClusterShardInfo info) : infos_({info}) {
}
auto begin() const noexcept {
return infos_.cbegin();
}
auto end() const noexcept {
return infos_.cend();
}
auto size() const noexcept {
return infos_.size();
}
bool empty() const noexcept {
return infos_.empty();
}
bool operator==(const ClusterShardInfos& r) const noexcept {
return infos_ == r.infos_;
}
bool operator!=(const ClusterShardInfos& r) const noexcept {
return infos_ != r.infos_;
}
private:
std::vector<ClusterShardInfo> infos_;
};
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {

View file

@ -530,6 +530,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return cntx->SendError("Invalid cluster configuration.");
} else if (tl_cluster_config && tl_cluster_config->GetConfig() == new_config->GetConfig()) {
return cntx->SendOk();
}
PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock