mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(server): Do not yield in journal if not allowed (#2540)
* fix(server): Do not yield in journal if not allowed * Add pytest * Compare keys * check_all_replicas_finished
This commit is contained in:
parent
336d6ff181
commit
bc9b214ae4
3 changed files with 66 additions and 6 deletions
|
@ -143,6 +143,11 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
||||||
|
optional<FiberAtomicGuard> guard;
|
||||||
|
if (!await) {
|
||||||
|
guard.emplace(); // Guard is non-movable/copyable, so we must use emplace()
|
||||||
|
}
|
||||||
|
|
||||||
DCHECK(ring_buffer_);
|
DCHECK(ring_buffer_);
|
||||||
|
|
||||||
JournalItem dummy;
|
JournalItem dummy;
|
||||||
|
|
|
@ -324,9 +324,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
|
||||||
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
|
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
|
||||||
// no database switch can be performed between those two calls, because they are part of one
|
// no database switch can be performed between those two calls, because they are part of one
|
||||||
// transaction.
|
// transaction.
|
||||||
// OnJournalEntry registers for changes in journal, the journal change function signature is
|
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
|
||||||
// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument.
|
|
||||||
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg) {
|
|
||||||
// We ignore EXEC and NOOP entries because we they have no meaning during
|
// We ignore EXEC and NOOP entries because we they have no meaning during
|
||||||
// the LOAD phase on replica.
|
// the LOAD phase on replica.
|
||||||
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC)
|
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC)
|
||||||
|
@ -334,9 +332,11 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused
|
||||||
|
|
||||||
serializer_->WriteJournalEntry(item.data);
|
serializer_->WriteJournalEntry(item.data);
|
||||||
|
|
||||||
// This is the only place that flushes in streaming mode
|
if (await) {
|
||||||
// once the iterate buckets fiber finished.
|
// This is the only place that flushes in streaming mode
|
||||||
PushSerializedToChannel(false);
|
// once the iterate buckets fiber finished.
|
||||||
|
PushSerializedToChannel(false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void SliceSnapshot::CloseRecordChannel() {
|
void SliceSnapshot::CloseRecordChannel() {
|
||||||
|
|
|
@ -1944,3 +1944,58 @@ async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_fac
|
||||||
assert set(keys_master) == set(keys_replica)
|
assert set(keys_master) == set(keys_replica)
|
||||||
|
|
||||||
await disconnect_clients(c_master, *[c_replica])
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_journal_doesnt_yield_issue_2500(df_local_factory, df_seeder_factory):
|
||||||
|
"""
|
||||||
|
Issues many SETEX commands through a Lua script so that no yields are done between them.
|
||||||
|
In parallel, connect a replica, so that these SETEX commands write their custom journal log.
|
||||||
|
This makes sure that no Fiber context switch while inside a shard callback.
|
||||||
|
"""
|
||||||
|
master = df_local_factory.create()
|
||||||
|
replica = df_local_factory.create()
|
||||||
|
df_local_factory.start_all([master, replica])
|
||||||
|
|
||||||
|
c_master = master.client()
|
||||||
|
c_replica = replica.client()
|
||||||
|
|
||||||
|
async def send_setex():
|
||||||
|
script = """
|
||||||
|
local charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"
|
||||||
|
|
||||||
|
local random_string = function(length)
|
||||||
|
local str = ''
|
||||||
|
for i=1,length do
|
||||||
|
str = str .. charset:sub(math.random(1, #charset))
|
||||||
|
end
|
||||||
|
return str
|
||||||
|
end
|
||||||
|
|
||||||
|
for i = 1, 200 do
|
||||||
|
-- 200 iterations to make sure SliceSnapshot dest queue is full
|
||||||
|
-- 100 bytes string to make sure serializer is big enough
|
||||||
|
redis.call('SETEX', KEYS[1], 1000, random_string(100))
|
||||||
|
end
|
||||||
|
"""
|
||||||
|
|
||||||
|
for i in range(10):
|
||||||
|
await asyncio.gather(
|
||||||
|
*[c_master.eval(script, 1, random.randint(0, 1_000)) for j in range(3)]
|
||||||
|
)
|
||||||
|
|
||||||
|
stream_task = asyncio.create_task(send_setex())
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
|
assert not stream_task.done(), "Weak testcase. finished sending commands before replication."
|
||||||
|
|
||||||
|
await wait_available_async(c_replica)
|
||||||
|
await stream_task
|
||||||
|
|
||||||
|
await check_all_replicas_finished([c_replica], c_master)
|
||||||
|
keys_master = await c_master.execute_command("keys *")
|
||||||
|
keys_replica = await c_replica.execute_command("keys *")
|
||||||
|
assert set(keys_master) == set(keys_replica)
|
||||||
|
|
||||||
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue