From 010166525e145abfd6cd2bf61f1a92ed8042c528 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 20 Jan 2025 15:52:38 +0200 Subject: [PATCH] feat: allow finish and start migration in the same config (#4486) --- src/server/cluster/cluster_family.cc | 18 ++++---- tests/dragonfly/cluster_test.py | 63 ++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 5caa6cd3b..5d70cf9b4 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -896,11 +896,12 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { if (auto err = parser.Error(); err) return builder->SendError(err->MakeReply()); + SlotRanges slot_ranges(std::move(slots)); + const auto& incoming_migrations = cluster_config()->GetIncomingMigrations(); bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(), - [source_id = source_id](const MigrationInfo& info) { - // TODO: also compare slot ranges (in an order-agnostic way) - return info.node_info.id == source_id; + [&source_id, &slot_ranges](const MigrationInfo& info) { + return info.node_info.id == source_id && info.slot_ranges == slot_ranges; }); if (!found) { VLOG(1) << "Unrecognized incoming migration from " << source_id; @@ -914,7 +915,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) { LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id; incoming_migrations_jobs_.emplace_back(make_shared( - string(source_id), &server_family_->service(), SlotRanges(std::move(slots)), flows_num)); + string(source_id), &server_family_->service(), std::move(slot_ranges), flows_num)); return builder->SendOk(); } @@ -966,16 +967,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id, bool is_migration_valid = false; if (is_incoming) { for (const auto& mj : incoming_migrations_jobs_) { - if (mj->GetSourceID() == node_id) { - // TODO add compare for slots + if (mj->GetSourceID() == node_id && slots == mj->GetSlots()) { is_migration_valid = true; + break; } } } else { for (const auto& mj : outgoing_migration_jobs_) { - if (mj->GetMigrationInfo().node_info.id == node_id) { - // TODO add compare for slots + if (mj->GetMigrationInfo().node_info.id == node_id && + mj->GetMigrationInfo().slot_ranges == slots) { is_migration_valid = true; + break; } } } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index e06529e48..9f0cbbd46 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2140,6 +2140,69 @@ async def test_cluster_migration_while_seeding( assert extract_int_after_prefix("buckets on_db_update ", line) > 0 +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@pytest.mark.asyncio +async def test_cluster_migrations_sequence( + df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory +): + instances = [ + df_factory.create(port=next(next_port), admin_port=next(next_port)) for _ in range(2) + ] + df_factory.start_all(instances) + + nodes = [await create_node_info(instance) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Seeding cluster") + seeder = df_seeder_factory.create( + keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True + ) + await seeder.run(target_deviation=0.1) + + seed = asyncio.create_task(seeder.run()) + await asyncio.sleep(1) + + slot_step = 500 + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, slot_step - 1)], nodes[1].id) + ] + logging.debug("Migrating slots") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + for i in range(slot_step, 16301, slot_step): + logging.debug("Waiting for migration to finish") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=10) + + nodes[0].slots = [(i, 16383)] + nodes[1].slots = [(0, i - 1)] + end_slot = min(i + slot_step - 1, 16383) + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(i, end_slot)], nodes[1].id) + ] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Waiting for migration to finish") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=10) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Finalizing migration") + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + nodes[0].migrations = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("stop seeding") + seeder.stop() + await seed + + capture = await seeder.capture_fake_redis() + assert await seeder.compare(capture, instances[1].port) + + def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) assert len(lags) == 1