fix: preemption in atomic section of heartbeat (#4720)

The bug is that expiring keys during heartbeat should not preempt while writing to the journal and we assert this with a FiberAtomicGuard. However, this atomicity guarantee is violated because the journal callback acquires a lock on a mutex that is already locked by on OnJournalEntry(). The fix is to release the lock when OnJournalEntry() preempts.

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2025-03-06 18:01:18 +02:00 committed by GitHub
parent 5f2dbb71a2
commit 0a6c28d904
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 35 additions and 3 deletions

View file

@ -424,9 +424,12 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerialized.
std::lock_guard guard(big_value_mu_);
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
{
// We should release the lock after we preempt
std::lock_guard guard(big_value_mu_);
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
}
}
if (await) {

View file

@ -2903,3 +2903,32 @@ async def test_replicaof_inside_multi(df_factory):
logging.info(f"succeses: {num_successes}")
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert num_successes > 0, "At least one REPLICAOF must success"
async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory):
master = df_factory.create(proactor_threads=1, serialization_max_chunk_size=100000000000)
replicas = [df_factory.create(proactor_threads=1) for i in range(2)]
# Start instances and connect clients
df_factory.start_all([master] + replicas)
c_master = master.client()
c_replicas = [replica.client() for replica in replicas]
total = 100000
await c_master.execute_command(f"DEBUG POPULATE {total} tmp 100 TYPE SET ELEMENTS 100")
thresehold = 50000
for i in range(thresehold):
rand = random.randint(1, 10)
await c_master.execute_command(f"EXPIRE tmp:{i} {rand} NX")
seeder = StaticSeeder(key_target=10000)
fill_task = asyncio.create_task(seeder.run(master.client()))
for replica in c_replicas:
await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}")
async with async_timeout.timeout(240):
await wait_for_replicas_state(*c_replicas)
await fill_task