diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 121ce2c5f..83796c453 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1045,7 +1045,6 @@ async def test_cluster_native_client( for i in range(3) ] df_factory.start_all(masters) - c_masters = [master.client() for master in masters] c_masters_admin = [master.admin_client() for master in masters] master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) @@ -1566,8 +1565,10 @@ async def test_cluster_fuzzymigration( await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) # Fill instances with some data - seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True) - await seeder.run(target_deviation=0.1) + seeder = df_seeder_factory.create( + keys=keys, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True + ) + seed_task = asyncio.create_task(seeder.run()) # Counter that pushes values to a list async def list_counter(key, client: aioredis.RedisCluster): @@ -1585,9 +1586,6 @@ async def test_cluster_fuzzymigration( for key, conn in zip(counter_keys, counter_connections) ] - # Generate capture, capture ignores counter keys - capture = await seeder.capture() - # Generate migration plan for node_idx, node in enumerate(nodes): random.shuffle(node.slots) @@ -1671,8 +1669,12 @@ async def test_cluster_fuzzymigration( for i, j in zip(counter_list, counter_list[1:]): assert int(i) == int(j) + 1, f"Found inconsistent list in {key}: {counter_list}" - # Compare capture - assert await seeder.compare(capture, nodes[0].instance.port) + # Compare to fake redis, capture ignores counter keys + seeder.stop() + await seed_task + fake_capture = await seeder.capture_fake_redis() + + assert await seeder.compare(fake_capture, nodes[0].instance.port) await asyncio.gather(*[c.aclose() for c in counter_connections]) @@ -1759,8 +1761,10 @@ async def test_cluster_replication_migration( ) logging.debug("create data") - seeder = df_seeder_factory.create(keys=2000, port=m1_node.instance.port, cluster_mode=True) - await seeder.run(target_deviation=0.1) + seeder = df_seeder_factory.create( + keys=2000, port=m1_node.instance.port, cluster_mode=True, mirror_to_fake_redis=True + ) + seed = asyncio.create_task(seeder.run()) logging.debug("start replication") await r1_node.admin_client.execute_command(f"replicaof localhost {m1_node.instance.port}") @@ -1769,9 +1773,6 @@ async def test_cluster_replication_migration( await wait_available_async(r1_node.admin_client) await wait_available_async(r2_node.admin_client) - r1_capture = await seeder.capture(r1_node.instance.port) - r2_capture = await seeder.capture(r2_node.instance.port) - logging.debug("start migration") m1_node.migrations = [ MigrationInfo("127.0.0.1", m2_node.instance.admin_port, [(0, 8000)], m2_node.id) @@ -1800,8 +1801,10 @@ async def test_cluster_replication_migration( await asyncio.sleep(2) # ensure captures got exchanged - assert await seeder.compare(r2_capture, r1_node.instance.port) - assert await seeder.compare(r1_capture, r2_node.instance.port) + seeder.stop() + await seed + fake_capture = await seeder.capture_fake_redis() + assert await seeder.compare(fake_capture, r1_node.instance.port) @pytest.mark.skip("Flaky test") @@ -1835,8 +1838,10 @@ async def test_start_replication_during_migration( ) logging.debug("create data") - seeder = df_seeder_factory.create(keys=10000, port=nodes[0].instance.port, cluster_mode=True) - await seeder.run(target_deviation=0.1) + seeder = df_seeder_factory.create( + keys=10000, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True + ) + seed = asyncio.create_task(seeder.run()) logging.debug("start migration") m1_node.migrations = [ @@ -1865,9 +1870,10 @@ async def test_start_replication_during_migration( await check_all_replicas_finished([r1_node.client], m1_node.client) - m1_capture = await seeder.capture(m1_node.instance.port) - - assert await seeder.compare(m1_capture, r1_node.instance.port) + seeder.stop() + await seed + fake_capture = await seeder.capture_fake_redis() + assert await seeder.compare(fake_capture, r1_node.instance.port) @pytest.mark.parametrize("migration_first", [False, True]) @@ -1903,10 +1909,10 @@ async def test_snapshoting_during_migration( await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) logging.debug("create data") - seeder = df_seeder_factory.create(keys=10000, port=nodes[0].instance.port, cluster_mode=True) - await seeder.run(target_deviation=0.1) - - capture_before_migration = await seeder.capture(nodes[0].instance.port) + seeder = df_seeder_factory.create( + keys=10000, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True + ) + seed = asyncio.create_task(seeder.run()) nodes[0].migrations = [ MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) @@ -1939,12 +1945,15 @@ async def test_snapshoting_during_migration( logging.debug("finish migration") nodes[0].migrations = [] nodes[0].slots = [] - nodes[0].migrations = [] - nodes[0].slots = [(0, 16383)] + nodes[1].migrations = [] + nodes[1].slots = [(0, 16383)] await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - assert await seeder.compare(capture_before_migration, nodes[1].instance.port) + seeder.stop() + await seed + fake_capture = await seeder.capture_fake_redis() + assert await seeder.compare(fake_capture, nodes[1].instance.port) await nodes[1].client.execute_command( "DFLY", @@ -1952,7 +1961,8 @@ async def test_snapshoting_during_migration( f"{dbfilename}-summary.dfs", ) - assert await seeder.compare(capture_before_migration, nodes[1].instance.port) + # TODO: We can't compare the post-loaded data as is, because it might have changed by now. + # We can try to use FakeRedis with the StaticSeeder comparison here. @pytest.mark.exclude_epoll @@ -2269,7 +2279,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact ) # Fill instances with some data - seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True) + seeder = df_seeder_factory.create( + keys=2000, port=cluster_nodes[0].port, cluster_mode=True, mirror_to_fake_redis=True + ) await seeder.run(target_deviation=0.1) fill_task = asyncio.create_task(seeder.run()) @@ -2293,6 +2305,8 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact await c_replica.execute_command("REPLICAOF NO ONE") capture = await seeder.capture() assert await seeder.compare(capture, replica.port) + fake_capture = await seeder.capture_fake_redis() + assert await seeder.compare(fake_capture, replica.port) async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10): @@ -2356,7 +2370,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_ ) # Fill instances with some data - seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True) + seeder = df_seeder_factory.create( + keys=2000, port=cluster_nodes[0].port, cluster_mode=True, mirror_to_fake_redis=True + ) await seeder.run(target_deviation=0.1) fill_task = asyncio.create_task(seeder.run()) @@ -2406,6 +2422,8 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_ await c_replica.execute_command("REPLICAOF NO ONE") capture = await seeder.capture() assert await seeder.compare(capture, replica.port) + fake_capture = await seeder.capture_fake_redis() + assert await seeder.compare(fake_capture, replica.port) await proxy.close(proxy_task) @@ -2824,7 +2842,11 @@ async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seed # Running seeder with pipeline mode when finalizing migrations leads to errors # TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem seeder = df_seeder_factory.create( - keys=50_000, port=instances[0].port, cluster_mode=True, pipeline=False + keys=50_000, + port=instances[0].port, + cluster_mode=True, + pipeline=False, + mirror_to_fake_redis=True, ) await seeder.run(target_deviation=0.1) seed = asyncio.create_task(seeder.run()) @@ -2867,3 +2889,5 @@ async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seed logging.debug("stop seeding") seeder.stop() await seed + capture = await seeder.capture_fake_redis() + assert await seeder.compare(capture, nodes[1].instance.port)