From 9e52438862b2bc00a0087e1eecd11c367e1d750e Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Thu, 6 Mar 2025 18:01:18 +0200 Subject: [PATCH] fix: preemption in atomic section of heartbeat (#4720) The bug is that expiring keys during heartbeat should not preempt while writing to the journal and we assert this with a FiberAtomicGuard. However, this atomicity guarantee is violated because the journal callback acquires a lock on a mutex that is already locked by on OnJournalEntry(). The fix is to release the lock when OnJournalEntry() preempts. Signed-off-by: kostas --- src/server/snapshot.cc | 9 ++++++--- tests/dragonfly/replication_test.py | 29 +++++++++++++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 289870105..22eb573cc 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -410,9 +410,12 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerialized. - std::lock_guard guard(big_value_mu_); - if (item.opcode != journal::Op::NOOP) { - serializer_->WriteJournalEntry(item.data); + { + // We should release the lock after we preempt + std::lock_guard guard(big_value_mu_); + if (item.opcode != journal::Op::NOOP) { + serializer_->WriteJournalEntry(item.data); + } } if (await) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index add96839c..5d6d9a76e 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2770,3 +2770,32 @@ async def test_stream_approximate_trimming(df_factory): master_data = await StaticSeeder.capture(c_master) replica_data = await StaticSeeder.capture(c_replica) assert master_data == replica_data + + +async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory): + master = df_factory.create(proactor_threads=1, serialization_max_chunk_size=100000000000) + replicas = [df_factory.create(proactor_threads=1) for i in range(2)] + + # Start instances and connect clients + df_factory.start_all([master] + replicas) + c_master = master.client() + c_replicas = [replica.client() for replica in replicas] + + total = 100000 + await c_master.execute_command(f"DEBUG POPULATE {total} tmp 100 TYPE SET ELEMENTS 100") + + thresehold = 50000 + for i in range(thresehold): + rand = random.randint(1, 10) + await c_master.execute_command(f"EXPIRE tmp:{i} {rand} NX") + + seeder = StaticSeeder(key_target=10000) + fill_task = asyncio.create_task(seeder.run(master.client())) + + for replica in c_replicas: + await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}") + + async with async_timeout.timeout(240): + await wait_for_replicas_state(*c_replicas) + + await fill_task