From 9a6a9ec198ab11cf5e79bb0190a24ad13ea23cb1 Mon Sep 17 00:00:00 2001 From: Borys Date: Fri, 19 Apr 2024 16:21:54 +0300 Subject: [PATCH] feat: add ability reaply config with migration #2924 (#2926) * feat: add ability reaply config with migration #2924 --- src/server/cluster/cluster_config.cc | 7 +- src/server/cluster/cluster_config.h | 4 +- src/server/cluster/cluster_family.cc | 40 +++++++++-- tests/dragonfly/cluster_test.py | 103 ++++++++++++++++++++------- 4 files changed, 115 insertions(+), 39 deletions(-) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 0138c68ae..d1f5d71e1 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -338,10 +338,11 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, return CreateFromConfig(my_id, config.value()); } -std::shared_ptr ClusterConfig::CloneWithChanges(const std::vector& slots, - bool enable) const { +std::shared_ptr ClusterConfig::CloneWithChanges( + const SlotRanges& enable_slots, const SlotRanges& disable_slots) const { auto new_config = std::make_shared(*this); - new_config->my_slots_.Set(slots, enable); + new_config->my_slots_.Set(enable_slots, true); + new_config->my_slots_.Set(disable_slots, false); return new_config; } diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 764d03796..4c055d011 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -77,8 +77,8 @@ class ClusterConfig { static std::shared_ptr CreateFromConfig(std::string_view my_id, std::string_view json_config); - std::shared_ptr CloneWithChanges(const std::vector& slots, - bool enable) const; + std::shared_ptr CloneWithChanges(const SlotRanges& enable_slots, + const SlotRanges& disable_slots) const; // If key is in my slots ownership return true bool IsMySlot(SlotId id) const; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 59cca32dd..ecd5796d6 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -494,15 +494,40 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) lock_guard gu(set_config_mu); - lock_guard config_update_lk( - config_update_mu_); // to prevent simultaneous update config from outgoing migration - // TODO we shouldn't provide cntx into StartSlotMigrations - if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) { - return cntx->SendError("Can't start the migration"); - } RemoveOutgoingMigrations(new_config->GetFinishedOutgoingMigrations(tl_cluster_config)); RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config)); + lock_guard config_update_lk( + config_update_mu_); // to prevent simultaneous update config from outgoing migration + + SlotRanges enable_slots, disable_slots; + + { + std::lock_guard lk(migration_mu_); + // If migration state is changed simultaneously, the changes to config will be applied after + // set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem + for (const auto& m : incoming_migrations_jobs_) { + if (m->GetState() == MigrationState::C_FINISHED) { + const auto& slots = m->GetSlots(); + enable_slots.insert(enable_slots.end(), slots.begin(), slots.end()); + } + } + for (const auto& m : outgoing_migration_jobs_) { + if (m->GetState() == MigrationState::C_FINISHED) { + const auto& slots = m->GetSlots(); + disable_slots.insert(disable_slots.end(), slots.begin(), slots.end()); + } + } + } + + new_config = new_config->CloneWithChanges(enable_slots, disable_slots); + + // TODO we shouldn't provide cntx into StartSlotMigrations + if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) { + // TODO it shouldn't be an error + return cntx->SendError("Can't start the migration"); + } + SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true); // Ignore blocked commands because we filter them with CancelBlockingOnThread @@ -851,7 +876,8 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) { void ClusterFamily::UpdateConfig(const std::vector& slots, bool enable) { lock_guard gu(config_update_mu_); - auto new_config = tl_cluster_config->CloneWithChanges(slots, enable); + auto new_config = enable ? tl_cluster_config->CloneWithChanges(slots, {}) + : tl_cluster_config->CloneWithChanges({}, slots); shard_set->pool()->AwaitFiberOnAll( [&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; }); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 7018c38f5..86b399763 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1135,6 +1135,20 @@ class NodeInfo: id: str +async def create_node_info(instance): + admin_client = instance.admin_client() + ninfo = NodeInfo( + instance=instance, + client=instance.client(), + admin_client=admin_client, + slots=[], + next_slots=[], + migrations=[], + id=await get_node_id(admin_client), + ) + return ninfo + + def generate_config(nodes): return [ { @@ -1185,18 +1199,7 @@ async def test_cluster_fuzzymigration( ] df_local_factory.start_all(instances) - nodes = [ - NodeInfo( - instance=instance, - client=instance.client(), - admin_client=instance.admin_client(), - slots=[], - next_slots=[], - migrations=[], - id=await get_node_id(instance.admin_client()), - ) - for instance in instances - ] + nodes = [(await create_node_info(instance)) for instance in instances] # Generate equally sized ranges and distribute by nodes step = 16400 // segments @@ -1321,7 +1324,7 @@ async def test_cluster_fuzzymigration( @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): +async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory): """Check data migration from one node to another.""" instances = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) @@ -1329,18 +1332,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): ] df_local_factory.start_all(instances) - nodes = [ - NodeInfo( - instance=instance, - client=instance.client(), - admin_client=instance.admin_client(), - slots=[], - next_slots=[], - migrations=[], - id=await get_node_id(instance.admin_client()), - ) - for instance in instances - ] + nodes = [await create_node_info(instance) for instance in instances] nodes[0].slots = [(0, 8000)] nodes[1].slots = [(8001, 16383)] @@ -1352,7 +1344,64 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes] nodes[0].migrations = [ - MigrationInfo("127.0.0.1", instances[1].port, [(6000, 8000)], nodes[1].id) + MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id) + ] + logging.debug("Migrating slots 6000-8000") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + while "FINISHED" not in await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ): + logging.debug("SLOT-MIGRATION-STATUS is not FINISHED") + await asyncio.sleep(0.05) + + assert await nodes[0].admin_client.dbsize() == SIZE + assert await nodes[1].admin_client.dbsize() == SIZE + + logging.debug("Reapply config with migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await asyncio.sleep(0.1) + assert await nodes[0].admin_client.dbsize() == SIZE + assert await nodes[1].admin_client.dbsize() == SIZE + + logging.debug("Finalizing migration") + nodes[0].migrations = [] + nodes[0].slots = [(0, 6000)] + nodes[1].slots = [(6001, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + logging.debug("Migration finalized") + + await asyncio.sleep(0.1) + assert [0, SIZE] == [await node.admin_client.dbsize() for node in nodes] + for i in range(SIZE): + assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}") + + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): + """Check data migration from one node to another.""" + instances = [ + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + for i in range(2) + ] + df_local_factory.start_all(instances) + + nodes = [await create_node_info(instance) for instance in instances] + nodes[0].slots = [(0, 8000)] + nodes[1].slots = [(8001, 16383)] + + logging.debug("Pushing data to slot 6XXX") + SIZE = 10_000 + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + for i in range(SIZE): + assert await nodes[0].admin_client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686 + assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes] + + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id) ] logging.debug("Migrating slots 6000-8000") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) @@ -1373,7 +1422,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): logging.debug("Reissuing migration") nodes[0].migrations.append( - MigrationInfo("127.0.0.1", instances[1].port, [(6001, 8000)], nodes[1].id) + MigrationInfo("127.0.0.1", instances[1].admin_port, [(6001, 8000)], nodes[1].id) ) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await asyncio.sleep(0.1)