refactor(cluster): #2652 initiate migration process from CONFIG cmd (#2667)

* refactor(cluster): #2652 initiate migration process from CONFIG cmd
This commit is contained in:
Borys 2024-02-29 16:08:53 +02:00 committed by GitHub
parent f8f213e22f
commit e57067d2fb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 212 additions and 31 deletions

View file

@ -155,6 +155,13 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
[&](const Node& node) { return node.id == my_id; });
if (owned_by_me) {
result->my_slots_.Set(shard.slot_ranges, true);
result->my_outgoing_migrations_ = shard.migrations;
} else {
for (const auto& m : shard.migrations) {
if (my_id == m.target_id) {
result->my_incoming_migrations_.push_back(m);
}
}
}
}
@ -236,6 +243,36 @@ optional<ClusterConfig::Node> ParseClusterNode(const JsonType& json) {
return node;
}
optional<std::vector<ClusterConfig::MigrationInfo>> ParseMigrations(const JsonType& json) {
std::vector<ClusterConfig::MigrationInfo> res;
if (json.is_null()) {
return res;
}
if (!json.is_array()) {
LOG(INFO) << "no migrations found: " << json;
return nullopt;
}
for (const auto& element : json.array_range()) {
auto target_id = element.at_or_null("target_id");
auto ip = element.at_or_null("ip");
auto port = ReadNumeric<uint16_t>(element.at_or_null("port"));
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!target_id.is_string() || !ip.is_string() || !port || !slots) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
return nullopt;
}
res.emplace_back(ClusterConfig::MigrationInfo{.slot_ranges = std::move(*slots),
.target_id = target_id.as_string(),
.ip = ip.as_string(),
.port = *port});
}
return res;
}
optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType& json) {
ClusterConfig::ClusterShards config;
@ -278,6 +315,12 @@ optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType
shard.replicas.push_back(std::move(node).value());
}
auto migrations = ParseMigrations(element.at_or_null("migrations"));
if (!migrations) {
return nullopt;
}
shard.migrations = std::move(*migrations);
config.push_back(std::move(shard));
}