mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
test(replication test): check data only after replica finished execution (#746)
Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
69519b2c5b
commit
4b29dece0c
3 changed files with 21 additions and 12 deletions
|
@ -461,6 +461,7 @@ error_code Replica::InitiateDflySync() {
|
||||||
// We do the following operations regardless of outcome.
|
// We do the following operations regardless of outcome.
|
||||||
JoinAllFlows();
|
JoinAllFlows();
|
||||||
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
|
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
|
||||||
|
state_mask_ &= ~R_SYNCING;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Initialize MultiShardExecution.
|
// Initialize MultiShardExecution.
|
||||||
|
@ -497,6 +498,7 @@ error_code Replica::InitiateDflySync() {
|
||||||
JournalExecutor{&service_}.FlushAll();
|
JournalExecutor{&service_}.FlushAll();
|
||||||
|
|
||||||
// Start full sync flows.
|
// Start full sync flows.
|
||||||
|
state_mask_ |= R_SYNCING;
|
||||||
{
|
{
|
||||||
auto partition = Partition(num_df_flows_);
|
auto partition = Partition(num_df_flows_);
|
||||||
auto shard_cb = [&](unsigned index, auto*) {
|
auto shard_cb = [&](unsigned index, auto*) {
|
||||||
|
|
|
@ -373,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
|
|
||||||
/*************************************************************************/
|
/*************************************************************************/
|
||||||
|
|
||||||
if (!was_suspended && is_concluding) // Check last hop & non suspended.
|
if (is_concluding) // Check last hop
|
||||||
LogAutoJournalOnShard(shard);
|
LogAutoJournalOnShard(shard);
|
||||||
|
|
||||||
// at least the coordinator thread owns the reference.
|
// at least the coordinator thread owns the reference.
|
||||||
|
|
|
@ -70,26 +70,33 @@ async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_
|
||||||
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
|
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
|
||||||
await stream_task
|
await stream_task
|
||||||
|
|
||||||
|
async def check_replica_finished_exec(c_replica):
|
||||||
|
info_stats = await c_replica.execute_command("INFO")
|
||||||
|
tc1 = info_stats['total_commands_processed']
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
info_stats = await c_replica.execute_command("INFO")
|
||||||
|
tc2 = info_stats['total_commands_processed']
|
||||||
|
return tc1+1 == tc2 # Replica processed only the info command on above sleep.
|
||||||
|
|
||||||
|
async def check_all_replicas_finished():
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(1.0)
|
||||||
|
is_finished_arr = await asyncio.gather(*(asyncio.create_task(check_replica_finished_exec(c))
|
||||||
|
for c in c_replicas))
|
||||||
|
if all(is_finished_arr):
|
||||||
|
break
|
||||||
|
|
||||||
# Check data after full sync
|
# Check data after full sync
|
||||||
await asyncio.sleep(4.0)
|
await check_all_replicas_finished()
|
||||||
await check_data(seeder, replicas, c_replicas)
|
await check_data(seeder, replicas, c_replicas)
|
||||||
|
|
||||||
# Stream more data in stable state
|
# Stream more data in stable state
|
||||||
await seeder.run(target_ops=2000)
|
await seeder.run(target_ops=2000)
|
||||||
|
|
||||||
# Check data after stable state stream
|
# Check data after stable state stream
|
||||||
await asyncio.sleep(3.0)
|
await check_all_replicas_finished()
|
||||||
await check_data(seeder, replicas, c_replicas)
|
await check_data(seeder, replicas, c_replicas)
|
||||||
|
|
||||||
# Issue lots of deletes
|
|
||||||
# TODO: Enable after stable state is faster
|
|
||||||
# seeder.target(100)
|
|
||||||
# await seeder.run(target_deviation=0.1)
|
|
||||||
|
|
||||||
# Check data after deletes
|
|
||||||
# await asyncio.sleep(2.0)
|
|
||||||
# await check_data(seeder, replicas, c_replicas)
|
|
||||||
|
|
||||||
|
|
||||||
async def check_data(seeder, replicas, c_replicas):
|
async def check_data(seeder, replicas, c_replicas):
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue