From 39dd73fc7117671cf9150b9879c6794b1bd232ac Mon Sep 17 00:00:00 2001 From: Borys Date: Fri, 7 Jun 2024 14:31:11 +0300 Subject: [PATCH] fix: fix bug in cluster/slot_set (#3143) * fix: fix bug in cluster/slot_set * fix: fix slot flushes --- src/server/cluster/cluster_config_test.cc | 51 +++++++++++++++++++++++ src/server/cluster/cluster_family.cc | 21 ++++++++-- src/server/cluster/cluster_family.h | 3 +- src/server/cluster/slot_set.h | 2 +- tests/dragonfly/cluster_test.py | 38 +++++++---------- 5 files changed, 87 insertions(+), 28 deletions(-) diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index 3aad3b648..5f72fb4d3 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -558,4 +558,55 @@ TEST_F(ClusterConfigTest, InvalidConfigMigrationsWithoutIP) { EXPECT_EQ(config, nullptr); } +TEST_F(ClusterConfigTest, SlotSetAPI) { + { + SlotSet ss(false); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges()); + EXPECT_FALSE(ss.All()); + EXPECT_TRUE(ss.Empty()); + } + { + SlotSet ss(true); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, SlotRange::kMaxSlotId}})); + EXPECT_TRUE(ss.All()); + EXPECT_FALSE(ss.Empty()); + } + { + SlotSet ss(SlotRanges({{0, 1000}, {1001, 2000}})); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({SlotRange{0, 2000}})); + EXPECT_EQ(ss.Count(), 2001); + + for (uint16_t i = 0; i < 2000; ++i) { + EXPECT_TRUE(ss.Contains(i)); + } + for (uint16_t i = 2001; i <= SlotRange::kMaxSlotId; ++i) { + EXPECT_FALSE(ss.Contains(i)); + } + + EXPECT_FALSE(ss.All()); + EXPECT_FALSE(ss.Empty()); + + ss.Set(5010, true); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5010, 5010}})); + + ss.Set({SlotRange{5000, 5100}}, true); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5100}})); + + ss.Set(5050, false); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}})); + + ss.Set(5500, false); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5100}})); + + ss.Set({SlotRange{5090, 5100}}, false); + EXPECT_EQ(ss.ToSlotRanges(), SlotRanges({{0, 2000}, {5000, 5049}, {5051, 5089}})); + + SlotSet ss1(SlotRanges({{1001, 2000}})); + + EXPECT_EQ(ss.GetRemovedSlots(ss1).ToSlotRanges(), + SlotRanges({{0, 1000}, {5000, 5049}, {5051, 5089}})); + EXPECT_EQ(ss1.GetRemovedSlots(ss).ToSlotRanges(), SlotRanges()); + } +} + } // namespace dfly::cluster diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 96678f183..e0ce17a23 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -508,7 +508,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) lock_guard gu(set_config_mu); VLOG(1) << "Setting new cluster config: " << json_str; - RemoveOutgoingMigrations(new_config->GetFinishedOutgoingMigrations(tl_cluster_config)); + auto out_migrations_slots = RemoveOutgoingMigrations(new_config, tl_cluster_config); RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config)); // Prevent simultaneous config update from outgoing migration @@ -565,6 +565,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) SlotSet after = tl_cluster_config->GetOwnedSlots(); if (ServerState::tlocal()->is_master) { auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges(); + deleted_slots.insert(deleted_slots.end(), out_migrations_slots.begin(), + out_migrations_slots.end()); LOG_IF(INFO, !deleted_slots.empty()) << "Flushing newly unowned slots: " << SlotRange::ToString(deleted_slots); DeleteSlots(deleted_slots); @@ -736,8 +738,11 @@ std::shared_ptr ClusterFamily::GetIncomingMigration( return nullptr; } -void ClusterFamily::RemoveOutgoingMigrations(const std::vector& migrations) { +SlotRanges ClusterFamily::RemoveOutgoingMigrations(shared_ptr new_config, + shared_ptr old_config) { + auto migrations = new_config->GetFinishedOutgoingMigrations(old_config); lock_guard lk(migration_mu_); + SlotRanges removed_slots; for (const auto& m : migrations) { auto it = std::find_if(outgoing_migration_jobs_.begin(), outgoing_migration_jobs_.end(), [&m](const auto& om) { @@ -747,13 +752,20 @@ void ClusterFamily::RemoveOutgoingMigrations(const std::vector& m DCHECK(it != outgoing_migration_jobs_.end()); DCHECK(it->get() != nullptr); OutgoingMigration& migration = *it->get(); - LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(migration.GetSlots()) - << " to " << migration.GetHostIp() << ":" << migration.GetPort(); + const auto& slots = migration.GetSlots(); + removed_slots.insert(removed_slots.end(), slots.begin(), slots.end()); + LOG(INFO) << "Outgoing migration cancelled: slots " << SlotRange::ToString(slots) << " to " + << migration.GetHostIp() << ":" << migration.GetPort(); migration.Finish(); outgoing_migration_jobs_.erase(it); } + // Flush non-owned migrations + SlotSet migration_slots(removed_slots); + SlotSet removed = migration_slots.GetRemovedSlots(new_config->GetOwnedSlots()); + // Flushing of removed slots is done outside this function. + return removed.ToSlotRanges(); } namespace { @@ -779,6 +791,7 @@ bool RemoveIncomingMigrationImpl(std::vector(removed.ToSlotRanges()); LOG_IF(WARNING, migration->GetState() == MigrationState::C_FINISHED) diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 5a036f844..b49688dc1 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -78,7 +78,8 @@ class ClusterFamily { std::shared_ptr GetIncomingMigration(std::string_view source_id); void StartSlotMigrations(std::vector migrations); - void RemoveOutgoingMigrations(const std::vector& migrations); + SlotRanges RemoveOutgoingMigrations(std::shared_ptr new_config, + std::shared_ptr old_config); void RemoveIncomingMigrations(const std::vector& migrations); // store info about migration and create unique session id diff --git a/src/server/cluster/slot_set.h b/src/server/cluster/slot_set.h index b9167330e..aaaebe047 100644 --- a/src/server/cluster/slot_set.h +++ b/src/server/cluster/slot_set.h @@ -41,7 +41,7 @@ class SlotSet { void Set(const SlotRanges& slot_ranges, bool value) { for (const auto& slot_range : slot_ranges) { for (auto i = slot_range.start; i <= slot_range.end; ++i) { - slots_->set(i); + slots_->set(i, value); } } } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 1c1512513..aa2f65fc8 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -144,7 +144,7 @@ async def wait_for_status(admin_client, node_id, status, timeout=10): return else: logging.debug(f"SLOT-MIGRATION-STATUS is {response}, not {status}") - await asyncio.sleep(0.05) + await asyncio.sleep(0.1) raise RuntimeError("Timeout to achieve migrations status") @@ -1303,21 +1303,15 @@ async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory): logging.debug("Migrating slots 6000-8000") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - while "FINISHED" not in await nodes[1].admin_client.execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id - ): - logging.debug("SLOT-MIGRATION-STATUS is not FINISHED") - await asyncio.sleep(0.05) + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") - assert await nodes[0].admin_client.dbsize() == SIZE - assert await nodes[1].admin_client.dbsize() == SIZE + assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes] logging.debug("Reapply config with migration") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await asyncio.sleep(0.1) - assert await nodes[0].admin_client.dbsize() == SIZE - assert await nodes[1].admin_client.dbsize() == SIZE + assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes] logging.debug("Finalizing migration") nodes[0].migrations = [] @@ -1326,15 +1320,15 @@ async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory): await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("Migration finalized") - await asyncio.sleep(0.1) - assert [0, SIZE] == [await node.admin_client.dbsize() for node in nodes] + await asyncio.sleep(1) + assert [0, SIZE] == [await node.client.dbsize() for node in nodes] + for i in range(SIZE): - assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}") + assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) -@pytest.mark.skip(reason="The test is broken") @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): """Check data migration from one node to another.""" @@ -1352,8 +1346,8 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): SIZE = 10_000 await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) for i in range(SIZE): - assert await nodes[0].admin_client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686 - assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes] + assert await nodes[0].client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686 + assert [SIZE, 0] == [await node.client.dbsize() for node in nodes] nodes[0].migrations = [ MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id) @@ -1364,13 +1358,13 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): logging.debug("Cancelling migration") nodes[0].migrations = [] await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - assert SIZE == await nodes[0].admin_client.dbsize() + assert SIZE == await nodes[0].client.dbsize() while True: - db_size = await nodes[1].admin_client.dbsize() + db_size = await nodes[1].client.dbsize() if 0 == db_size: break logging.debug(f"target dbsize is {db_size}") - logging.debug(await nodes[1].admin_client.execute_command("KEYS", "*")) + logging.debug(await nodes[1].client.execute_command("KEYS", "*")) await asyncio.sleep(0.1) logging.debug("Reissuing migration") @@ -1379,7 +1373,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): ) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") - assert [SIZE, SIZE] == [await node.admin_client.dbsize() for node in nodes] + assert [SIZE, SIZE] == [await node.client.dbsize() for node in nodes] logging.debug("Finalizing migration") nodes[0].migrations = [] @@ -1388,12 +1382,12 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("Migration finalized") - while 0 != await nodes[0].admin_client.dbsize(): + while 0 != await nodes[0].client.dbsize(): logging.debug(f"wait until source dbsize is empty") await asyncio.sleep(0.1) for i in range(SIZE): - assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}") + assert str(i) == await nodes[1].client.get(f"{{key50}}:{i}") await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])