From f536f8afbd68836a0a7c7bd7c32fb36ee9dac060 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 30 Jul 2024 12:47:58 +0300 Subject: [PATCH] chore: cancel slot migrations on shutdown (#3405) --- src/server/cluster/cluster_config.cc | 7 +++++++ src/server/cluster/cluster_config.h | 2 ++ src/server/cluster/cluster_family.cc | 15 +++++++++++++++ src/server/cluster/cluster_family.h | 2 ++ src/server/main_service.cc | 4 +++- tests/dragonfly/cluster_test.py | 20 +++++++++++++++++++- 6 files changed, 48 insertions(+), 2 deletions(-) diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 1719adaf7..9f76be02e 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -300,6 +300,13 @@ std::shared_ptr ClusterConfig::CloneWithChanges( return new_config; } +std::shared_ptr ClusterConfig::CloneWithoutMigrations() const { + auto new_config = std::make_shared(*this); + new_config->my_incoming_migrations_.clear(); + new_config->my_outgoing_migrations_.clear(); + return new_config; +} + bool ClusterConfig::IsMySlot(SlotId id) const { if (id > cluster::kMaxSlotNum) { DCHECK(false) << "Requesting a non-existing slot id " << id; diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index d00ee9d47..01d9828e9 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -26,6 +26,8 @@ class ClusterConfig { std::shared_ptr CloneWithChanges(const SlotRanges& enable_slots, const SlotRanges& disable_slots) const; + std::shared_ptr CloneWithoutMigrations() const; + // If key is in my slots ownership return true bool IsMySlot(SlotId id) const; bool IsMySlot(std::string_view key) const; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index f441e5970..6d58f8398 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -79,6 +79,21 @@ ClusterConfig* ClusterFamily::cluster_config() { return tl_cluster_config.get(); } +void ClusterFamily::Shutdown() { + shard_set->pool()->at(0)->Await([this] { + lock_guard lk(set_config_mu); + if (!tl_cluster_config) + return; + + auto empty_config = tl_cluster_config->CloneWithoutMigrations(); + RemoveOutgoingMigrations(empty_config, tl_cluster_config); + RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config)); + + DCHECK(outgoing_migration_jobs_.empty()); + DCHECK(incoming_migrations_jobs_.empty()); + }); +} + ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const { ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}), .master = {}, diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 86b288686..9557cd162 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -28,6 +28,8 @@ class ClusterFamily { void Register(CommandRegistry* registry); + void Shutdown(); + // Returns a thread-local pointer. static ClusterConfig* cluster_config(); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 367a0ba3a..f76dbbca2 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -910,7 +910,9 @@ void Service::Shutdown() { config_registry.Reset(); - // to shutdown all the runtime components that depend on EngineShard. + // to shutdown all the runtime components that depend on EngineShard + cluster_family_.Shutdown(); + server_family_.Shutdown(); StringFamily::Shutdown(); GenericFamily::Shutdown(); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 1baef681d..0e17ecf27 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1117,8 +1117,9 @@ async def test_cluster_flushall_during_migration( await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) +@pytest.mark.parametrize("interrupt", [False, True]) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_data_migration(df_factory: DflyInstanceFactory): +async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool): # Check data migration from one node to another instances = [ df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2) @@ -1145,6 +1146,23 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory): logging.debug("Start migration") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + if interrupt: # Test nodes properly shut down with pending migration + await asyncio.sleep(random.random()) + + # random instance + stop = random.getrandbits(1) + keep = 1 - stop + + nodes[stop].instance.stop() + + slots = await nodes[keep].admin_client.execute_command("CLUSTER SLOTS") + slots.sort(key=lambda cfg: cfg[0]) + assert 0 in slots[0] and 9000 in slots[0] + assert 9001 in slots[1] and 16383 in slots[1] + + await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes]) + return + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") for i in range(20, 22):