mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
chore: test snapshot in replica while seeding master (#4867)
Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
2729332a0f
commit
b9ff1be7d8
4 changed files with 57 additions and 8 deletions
|
@ -201,8 +201,8 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
|
|||
}
|
||||
|
||||
// serialized + side_saved must be equal to the total saved.
|
||||
VLOG(1) << "Exit SnapshotSerializer (loop_serialized/side_saved/cbcalls): "
|
||||
<< stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls;
|
||||
VLOG(1) << "Exit SnapshotSerializer loop_serialized: " << stats_.loop_serialized
|
||||
<< ", side_saved " << stats_.side_saved << ", cbcalls " << stats_.savecb_calls;
|
||||
}
|
||||
|
||||
void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
|
||||
|
|
|
@ -227,12 +227,6 @@ def stop_and_get_restore_log(instance):
|
|||
return line
|
||||
|
||||
|
||||
def extract_int_after_prefix(prefix, line):
|
||||
match = re.search(prefix + "(\\d+)", line)
|
||||
assert match
|
||||
return int(match.group(1))
|
||||
|
||||
|
||||
@dfly_args({})
|
||||
class TestNotEmulated:
|
||||
async def test_cluster_commands_fails_when_not_emulate(self, async_client: aioredis.Redis):
|
||||
|
|
|
@ -2987,3 +2987,51 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
|
|||
await wait_for_replicas_state(*c_replicas)
|
||||
|
||||
await fill_task
|
||||
|
||||
|
||||
async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyInstanceFactory):
|
||||
proactors = 4
|
||||
master = df_factory.create(proactor_threads=proactors, dbfilename="")
|
||||
replica = df_factory.create(proactor_threads=proactors, dbfilename="")
|
||||
df_factory.start_all([master, replica])
|
||||
c_master = master.client()
|
||||
c_replica = replica.client()
|
||||
|
||||
# 50% big values
|
||||
seeder_config = dict(key_target=20_000, huge_value_target=10000)
|
||||
# Fill instance with test data
|
||||
seeder = SeederV2(**seeder_config)
|
||||
await seeder.run(c_master, target_deviation=0.01)
|
||||
|
||||
assert await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
async with async_timeout.timeout(60):
|
||||
await wait_for_replicas_state(c_replica)
|
||||
|
||||
# Start data stream
|
||||
stream_task = asyncio.create_task(seeder.run(c_master))
|
||||
await asyncio.sleep(1)
|
||||
|
||||
file_name = tmp_file_name()
|
||||
assert await c_replica.execute_command(f"SAVE DF {file_name}") == "OK"
|
||||
await seeder.stop(c_master)
|
||||
await stream_task
|
||||
|
||||
# Check that everything is in sync
|
||||
hashes = await asyncio.gather(*(SeederV2.capture(c) for c in [c_master, c_replica]))
|
||||
assert len(set(hashes)) == 1
|
||||
|
||||
replica.stop()
|
||||
lines = replica.find_in_logs("Exit SnapshotSerializer")
|
||||
assert len(lines) == (proactors - 1)
|
||||
for line in lines:
|
||||
# We test the serializtion path of command execution
|
||||
side_saved = extract_int_after_prefix("side_saved ", line)
|
||||
assert side_saved > 0
|
||||
|
||||
# Check that the produced rdb is loaded correctly
|
||||
node = df_factory.create(dbfilename=file_name)
|
||||
node.start()
|
||||
c_node = node.client()
|
||||
await wait_available_async(c_node)
|
||||
assert await c_node.execute_command("dbsize") > 0
|
||||
await c_node.execute_command("FLUSHALL")
|
||||
|
|
|
@ -17,6 +17,7 @@ import os
|
|||
import fakeredis
|
||||
from typing import Iterable, Union
|
||||
from enum import Enum
|
||||
import re
|
||||
|
||||
|
||||
def tmp_file_name():
|
||||
|
@ -781,3 +782,9 @@ class ExpirySeeder:
|
|||
|
||||
def stop(self):
|
||||
self.stop_flag = True
|
||||
|
||||
|
||||
def extract_int_after_prefix(prefix, line):
|
||||
match = re.search(prefix + "(\\d+)", line)
|
||||
assert match
|
||||
return int(match.group(1))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue