diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 463403f22..f7c068f46 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -143,6 +143,11 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const { } void JournalSlice::AddLogRecord(const Entry& entry, bool await) { + optional guard; + if (!await) { + guard.emplace(); // Guard is non-movable/copyable, so we must use emplace() + } + DCHECK(ring_buffer_); JournalItem dummy; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index d64bb2250..c39a0e8d2 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -324,9 +324,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and // no database switch can be performed between those two calls, because they are part of one // transaction. -// OnJournalEntry registers for changes in journal, the journal change function signature is -// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument. -void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg) { +void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { // We ignore EXEC and NOOP entries because we they have no meaning during // the LOAD phase on replica. if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC) @@ -334,9 +332,11 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused serializer_->WriteJournalEntry(item.data); - // This is the only place that flushes in streaming mode - // once the iterate buckets fiber finished. - PushSerializedToChannel(false); + if (await) { + // This is the only place that flushes in streaming mode + // once the iterate buckets fiber finished. + PushSerializedToChannel(false); + } } void SliceSnapshot::CloseRecordChannel() { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index c97168d5b..863ada5c7 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1944,3 +1944,58 @@ async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_fac assert set(keys_master) == set(keys_replica) await disconnect_clients(c_master, *[c_replica]) + + +@pytest.mark.asyncio +async def test_journal_doesnt_yield_issue_2500(df_local_factory, df_seeder_factory): + """ + Issues many SETEX commands through a Lua script so that no yields are done between them. + In parallel, connect a replica, so that these SETEX commands write their custom journal log. + This makes sure that no Fiber context switch while inside a shard callback. + """ + master = df_local_factory.create() + replica = df_local_factory.create() + df_local_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + async def send_setex(): + script = """ + local charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" + + local random_string = function(length) + local str = '' + for i=1,length do + str = str .. charset:sub(math.random(1, #charset)) + end + return str + end + + for i = 1, 200 do + -- 200 iterations to make sure SliceSnapshot dest queue is full + -- 100 bytes string to make sure serializer is big enough + redis.call('SETEX', KEYS[1], 1000, random_string(100)) + end + """ + + for i in range(10): + await asyncio.gather( + *[c_master.eval(script, 1, random.randint(0, 1_000)) for j in range(3)] + ) + + stream_task = asyncio.create_task(send_setex()) + await asyncio.sleep(0.1) + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + assert not stream_task.done(), "Weak testcase. finished sending commands before replication." + + await wait_available_async(c_replica) + await stream_task + + await check_all_replicas_finished([c_replica], c_master) + keys_master = await c_master.execute_command("keys *") + keys_replica = await c_replica.execute_command("keys *") + assert set(keys_master) == set(keys_replica) + + await disconnect_clients(c_master, *[c_replica])