From 4b29dece0ceb37d216ceac208a87c594f0785640 Mon Sep 17 00:00:00 2001 From: adiholden Date: Thu, 2 Feb 2023 15:08:44 +0200 Subject: [PATCH] test(replication test): check data only after replica finished execution (#746) Signed-off-by: adi_holden --- src/server/replica.cc | 2 ++ src/server/transaction.cc | 2 +- tests/dragonfly/replication_test.py | 29 ++++++++++++++++++----------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/src/server/replica.cc b/src/server/replica.cc index 1cbfa4c55..3ca6383b2 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -461,6 +461,7 @@ error_code Replica::InitiateDflySync() { // We do the following operations regardless of outcome. JoinAllFlows(); service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); + state_mask_ &= ~R_SYNCING; }; // Initialize MultiShardExecution. @@ -497,6 +498,7 @@ error_code Replica::InitiateDflySync() { JournalExecutor{&service_}.FlushAll(); // Start full sync flows. + state_mask_ |= R_SYNCING; { auto partition = Partition(num_df_flows_); auto shard_cb = [&](unsigned index, auto*) { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 88fe86f6b..040a01ceb 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -373,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ - if (!was_suspended && is_concluding) // Check last hop & non suspended. + if (is_concluding) // Check last hop LogAutoJournalOnShard(shard); // at least the coordinator thread owns the reference. diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 1a742faf1..737d441d0 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -70,26 +70,33 @@ async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_ ), "Weak testcase. Increase number of streamed iterations to surpass full sync" await stream_task + async def check_replica_finished_exec(c_replica): + info_stats = await c_replica.execute_command("INFO") + tc1 = info_stats['total_commands_processed'] + await asyncio.sleep(0.1) + info_stats = await c_replica.execute_command("INFO") + tc2 = info_stats['total_commands_processed'] + return tc1+1 == tc2 # Replica processed only the info command on above sleep. + + async def check_all_replicas_finished(): + while True: + await asyncio.sleep(1.0) + is_finished_arr = await asyncio.gather(*(asyncio.create_task(check_replica_finished_exec(c)) + for c in c_replicas)) + if all(is_finished_arr): + break + # Check data after full sync - await asyncio.sleep(4.0) + await check_all_replicas_finished() await check_data(seeder, replicas, c_replicas) # Stream more data in stable state await seeder.run(target_ops=2000) # Check data after stable state stream - await asyncio.sleep(3.0) + await check_all_replicas_finished() await check_data(seeder, replicas, c_replicas) - # Issue lots of deletes - # TODO: Enable after stable state is faster - # seeder.target(100) - # await seeder.run(target_deviation=0.1) - - # Check data after deletes - # await asyncio.sleep(2.0) - # await check_data(seeder, replicas, c_replicas) - async def check_data(seeder, replicas, c_replicas): capture = await seeder.capture()