diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 28dbedd96..54a964c3c 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -201,8 +201,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { } // serialized + side_saved must be equal to the total saved. - VLOG(1) << "Exit SnapshotSerializer (loop_serialized/side_saved/cbcalls): " - << stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls; + VLOG(1) << "Exit SnapshotSerializer loop_serialized: " << stats_.loop_serialized + << ", side_saved " << stats_.side_saved << ", cbcalls " << stats_.savecb_calls; } void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 9a8c7d215..af05d4ec9 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -227,12 +227,6 @@ def stop_and_get_restore_log(instance): return line -def extract_int_after_prefix(prefix, line): - match = re.search(prefix + "(\\d+)", line) - assert match - return int(match.group(1)) - - @dfly_args({}) class TestNotEmulated: async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis): diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a1692af3a..42af84809 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2987,3 +2987,51 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): await wait_for_replicas_state(*c_replicas) await fill_task + + +async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyInstanceFactory): + proactors = 4 + master = df_factory.create(proactor_threads=proactors, dbfilename="") + replica = df_factory.create(proactor_threads=proactors, dbfilename="") + df_factory.start_all([master, replica]) + c_master = master.client() + c_replica = replica.client() + + # 50% big values + seeder_config = dict(key_target=20_000, huge_value_target=10000) + # Fill instance with test data + seeder = SeederV2(**seeder_config) + await seeder.run(c_master, target_deviation=0.01) + + assert await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + async with async_timeout.timeout(60): + await wait_for_replicas_state(c_replica) + + # Start data stream + stream_task = asyncio.create_task(seeder.run(c_master)) + await asyncio.sleep(1) + + file_name = tmp_file_name() + assert await c_replica.execute_command(f"SAVE DF {file_name}") == "OK" + await seeder.stop(c_master) + await stream_task + + # Check that everything is in sync + hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master, c_replica])) + assert len(set(hashes)) == 1 + + replica.stop() + lines = replica.find_in_logs("Exit SnapshotSerializer") + assert len(lines) == (proactors - 1) + for line in lines: + # We test the serializtion path of command execution + side_saved = extract_int_after_prefix("side_saved ", line) + assert side_saved > 0 + + # Check that the produced rdb is loaded correctly + node = df_factory.create(dbfilename=file_name) + node.start() + c_node = node.client() + await wait_available_async(c_node) + assert await c_node.execute_command("dbsize") > 0 + await c_node.execute_command("FLUSHALL") diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 5b5195e74..bfaec2419 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -17,6 +17,7 @@ import os import fakeredis from typing import Iterable, Union from enum import Enum +import re def tmp_file_name(): @@ -781,3 +782,9 @@ class ExpirySeeder: def stop(self): self.stop_flag = True + + +def extract_int_after_prefix(prefix, line): + match = re.search(prefix + "(\\d+)", line) + assert match + return int(match.group(1))