feat(cluster): add migration removing by config #2835 (#2844)

This commit is contained in:
Borys 2024-04-05 11:03:54 +03:00 committed by GitHub
parent c8426cfd31
commit 482bd58787
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 58 additions and 42 deletions

View file

@ -159,8 +159,11 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
result->my_outgoing_migrations_ = shard.migrations;
} else {
for (const auto& m : shard.migrations) {
if (my_id == m.target_id) {
result->my_incoming_migrations_.push_back(m);
if (my_id == m.node_id) {
auto incoming_migration = m;
// for incoming migration we need the source node
incoming_migration.node_id = shard.master.id;
result->my_incoming_migrations_.push_back(std::move(incoming_migration));
}
}
}
@ -256,18 +259,18 @@ optional<std::vector<MigrationInfo>> ParseMigrations(const JsonType& json) {
}
for (const auto& element : json.array_range()) {
auto target_id = element.at_or_null("target_id");
auto node_id = element.at_or_null("node_id");
auto ip = element.at_or_null("ip");
auto port = ReadNumeric<uint16_t>(element.at_or_null("port"));
auto slots = GetClusterSlotRanges(element.at_or_null("slot_ranges"));
if (!target_id.is_string() || !ip.is_string() || !port || !slots) {
if (!node_id.is_string() || !ip.is_string() || !port || !slots) {
LOG(WARNING) << kInvalidConfigPrefix << "invalid migration json " << json;
return nullopt;
}
res.emplace_back(MigrationInfo{.slot_ranges = std::move(*slots),
.target_id = target_id.as_string(),
.node_id = node_id.as_string(),
.ip = ip.as_string(),
.port = *port});
}