chore: Add FakeRedis capture/compare to more cluster tests and seed during migration (#4542)

* chore: Add FakeRedis capture/compare to more cluster tests

Note that we can't add such a test to `test_cluster_native_client`
because we don't do a migration there, just re-assign slots, meaning
that data is lost (by desgin?)

Fixes #4429

* Seed during migrations

* Add TODO
This commit is contained in:
Shahar Mike 2025-02-05 14:34:42 +02:00 committed by GitHub
parent 6d1c22b64c
commit f8667235db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1045,7 +1045,6 @@ async def test_cluster_native_client(
for i in range(3) for i in range(3)
] ]
df_factory.start_all(masters) df_factory.start_all(masters)
c_masters = [master.client() for master in masters]
c_masters_admin = [master.admin_client() for master in masters] c_masters_admin = [master.admin_client() for master in masters]
master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin)) master_ids = await asyncio.gather(*(get_node_id(c) for c in c_masters_admin))
@ -1566,8 +1565,10 @@ async def test_cluster_fuzzymigration(
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
# Fill instances with some data # Fill instances with some data
seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True) seeder = df_seeder_factory.create(
await seeder.run(target_deviation=0.1) keys=keys, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True
)
seed_task = asyncio.create_task(seeder.run())
# Counter that pushes values to a list # Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster): async def list_counter(key, client: aioredis.RedisCluster):
@ -1585,9 +1586,6 @@ async def test_cluster_fuzzymigration(
for key, conn in zip(counter_keys, counter_connections) for key, conn in zip(counter_keys, counter_connections)
] ]
# Generate capture, capture ignores counter keys
capture = await seeder.capture()
# Generate migration plan # Generate migration plan
for node_idx, node in enumerate(nodes): for node_idx, node in enumerate(nodes):
random.shuffle(node.slots) random.shuffle(node.slots)
@ -1671,8 +1669,12 @@ async def test_cluster_fuzzymigration(
for i, j in zip(counter_list, counter_list[1:]): for i, j in zip(counter_list, counter_list[1:]):
assert int(i) == int(j) + 1, f"Found inconsistent list in {key}: {counter_list}" assert int(i) == int(j) + 1, f"Found inconsistent list in {key}: {counter_list}"
# Compare capture # Compare to fake redis, capture ignores counter keys
assert await seeder.compare(capture, nodes[0].instance.port) seeder.stop()
await seed_task
fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, nodes[0].instance.port)
await asyncio.gather(*[c.aclose() for c in counter_connections]) await asyncio.gather(*[c.aclose() for c in counter_connections])
@ -1759,8 +1761,10 @@ async def test_cluster_replication_migration(
) )
logging.debug("create data") logging.debug("create data")
seeder = df_seeder_factory.create(keys=2000, port=m1_node.instance.port, cluster_mode=True) seeder = df_seeder_factory.create(
await seeder.run(target_deviation=0.1) keys=2000, port=m1_node.instance.port, cluster_mode=True, mirror_to_fake_redis=True
)
seed = asyncio.create_task(seeder.run())
logging.debug("start replication") logging.debug("start replication")
await r1_node.admin_client.execute_command(f"replicaof localhost {m1_node.instance.port}") await r1_node.admin_client.execute_command(f"replicaof localhost {m1_node.instance.port}")
@ -1769,9 +1773,6 @@ async def test_cluster_replication_migration(
await wait_available_async(r1_node.admin_client) await wait_available_async(r1_node.admin_client)
await wait_available_async(r2_node.admin_client) await wait_available_async(r2_node.admin_client)
r1_capture = await seeder.capture(r1_node.instance.port)
r2_capture = await seeder.capture(r2_node.instance.port)
logging.debug("start migration") logging.debug("start migration")
m1_node.migrations = [ m1_node.migrations = [
MigrationInfo("127.0.0.1", m2_node.instance.admin_port, [(0, 8000)], m2_node.id) MigrationInfo("127.0.0.1", m2_node.instance.admin_port, [(0, 8000)], m2_node.id)
@ -1800,8 +1801,10 @@ async def test_cluster_replication_migration(
await asyncio.sleep(2) await asyncio.sleep(2)
# ensure captures got exchanged # ensure captures got exchanged
assert await seeder.compare(r2_capture, r1_node.instance.port) seeder.stop()
assert await seeder.compare(r1_capture, r2_node.instance.port) await seed
fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, r1_node.instance.port)
@pytest.mark.skip("Flaky test") @pytest.mark.skip("Flaky test")
@ -1835,8 +1838,10 @@ async def test_start_replication_during_migration(
) )
logging.debug("create data") logging.debug("create data")
seeder = df_seeder_factory.create(keys=10000, port=nodes[0].instance.port, cluster_mode=True) seeder = df_seeder_factory.create(
await seeder.run(target_deviation=0.1) keys=10000, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True
)
seed = asyncio.create_task(seeder.run())
logging.debug("start migration") logging.debug("start migration")
m1_node.migrations = [ m1_node.migrations = [
@ -1865,9 +1870,10 @@ async def test_start_replication_during_migration(
await check_all_replicas_finished([r1_node.client], m1_node.client) await check_all_replicas_finished([r1_node.client], m1_node.client)
m1_capture = await seeder.capture(m1_node.instance.port) seeder.stop()
await seed
assert await seeder.compare(m1_capture, r1_node.instance.port) fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, r1_node.instance.port)
@pytest.mark.parametrize("migration_first", [False, True]) @pytest.mark.parametrize("migration_first", [False, True])
@ -1903,10 +1909,10 @@ async def test_snapshoting_during_migration(
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("create data") logging.debug("create data")
seeder = df_seeder_factory.create(keys=10000, port=nodes[0].instance.port, cluster_mode=True) seeder = df_seeder_factory.create(
await seeder.run(target_deviation=0.1) keys=10000, port=nodes[0].instance.port, cluster_mode=True, mirror_to_fake_redis=True
)
capture_before_migration = await seeder.capture(nodes[0].instance.port) seed = asyncio.create_task(seeder.run())
nodes[0].migrations = [ nodes[0].migrations = [
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
@ -1939,12 +1945,15 @@ async def test_snapshoting_during_migration(
logging.debug("finish migration") logging.debug("finish migration")
nodes[0].migrations = [] nodes[0].migrations = []
nodes[0].slots = [] nodes[0].slots = []
nodes[0].migrations = [] nodes[1].migrations = []
nodes[0].slots = [(0, 16383)] nodes[1].slots = [(0, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
assert await seeder.compare(capture_before_migration, nodes[1].instance.port) seeder.stop()
await seed
fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, nodes[1].instance.port)
await nodes[1].client.execute_command( await nodes[1].client.execute_command(
"DFLY", "DFLY",
@ -1952,7 +1961,8 @@ async def test_snapshoting_during_migration(
f"{dbfilename}-summary.dfs", f"{dbfilename}-summary.dfs",
) )
assert await seeder.compare(capture_before_migration, nodes[1].instance.port) # TODO: We can't compare the post-loaded data as is, because it might have changed by now.
# We can try to use FakeRedis with the StaticSeeder comparison here.
@pytest.mark.exclude_epoll @pytest.mark.exclude_epoll
@ -2269,7 +2279,9 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
) )
# Fill instances with some data # Fill instances with some data
seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True) seeder = df_seeder_factory.create(
keys=2000, port=cluster_nodes[0].port, cluster_mode=True, mirror_to_fake_redis=True
)
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run()) fill_task = asyncio.create_task(seeder.run())
@ -2293,6 +2305,8 @@ async def test_replicate_cluster(df_factory: DflyInstanceFactory, df_seeder_fact
await c_replica.execute_command("REPLICAOF NO ONE") await c_replica.execute_command("REPLICAOF NO ONE")
capture = await seeder.capture() capture = await seeder.capture()
assert await seeder.compare(capture, replica.port) assert await seeder.compare(capture, replica.port)
fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, replica.port)
async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10): async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10):
@ -2356,7 +2370,9 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
) )
# Fill instances with some data # Fill instances with some data
seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True) seeder = df_seeder_factory.create(
keys=2000, port=cluster_nodes[0].port, cluster_mode=True, mirror_to_fake_redis=True
)
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run()) fill_task = asyncio.create_task(seeder.run())
@ -2406,6 +2422,8 @@ async def test_replicate_disconnect_cluster(df_factory: DflyInstanceFactory, df_
await c_replica.execute_command("REPLICAOF NO ONE") await c_replica.execute_command("REPLICAOF NO ONE")
capture = await seeder.capture() capture = await seeder.capture()
assert await seeder.compare(capture, replica.port) assert await seeder.compare(capture, replica.port)
fake_capture = await seeder.capture_fake_redis()
assert await seeder.compare(fake_capture, replica.port)
await proxy.close(proxy_task) await proxy.close(proxy_task)
@ -2824,7 +2842,11 @@ async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seed
# Running seeder with pipeline mode when finalizing migrations leads to errors # Running seeder with pipeline mode when finalizing migrations leads to errors
# TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem # TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem
seeder = df_seeder_factory.create( seeder = df_seeder_factory.create(
keys=50_000, port=instances[0].port, cluster_mode=True, pipeline=False keys=50_000,
port=instances[0].port,
cluster_mode=True,
pipeline=False,
mirror_to_fake_redis=True,
) )
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
seed = asyncio.create_task(seeder.run()) seed = asyncio.create_task(seeder.run())
@ -2867,3 +2889,5 @@ async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seed
logging.debug("stop seeding") logging.debug("stop seeding")
seeder.stop() seeder.stop()
await seed await seed
capture = await seeder.capture_fake_redis()
assert await seeder.compare(capture, nodes[1].instance.port)