diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 0f011e555..9034cb382 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -330,12 +330,17 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // no database switch can be performed between those two calls, because they are part of one // transaction. void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { - // We ignore EXEC and NOOP entries because we they have no meaning during + // We ignore EXEC entries because we they have no meaning during // the LOAD phase on replica. - if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC) + if (item.opcode == journal::Op::EXEC) return; - serializer_->WriteJournalEntry(item.data); + // To enable journal flushing to sync after non auto journal command is executed we call + // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no + // additional journal change to serialize, it simply invokes PushSerializedToChannel. + if (item.opcode != journal::Op::NOOP) { + serializer_->WriteJournalEntry(item.data); + } if (await) { // This is the only place that flushes in streaming mode diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f591eb61f..343b0e04a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1413,21 +1413,24 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul return; } - // If autojournaling was disabled and not re-enabled, skip it + // If autojournaling was disabled and not re-enabled the callback is writing to journal. + // We do not allow preemption in callbacks and therefor the call to RecordJournal from + // from callbacks does not allow await. + // To make sure we flush the changes to sync we call TriggerJournalWriteToSink here. if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_) { TriggerJournalWriteToSink(); return; } - // TODO: Handle complex commands like LMPOP correctly once they are implemented. journal::Entry::Payload entry_payload; - string_view cmd{cid_->name()}; if (unique_shard_cnt_ == 1 || kv_args_.empty()) { entry_payload = journal::Entry::Payload(cmd, full_args_); } else { entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice()); } + // Record to journal autojournal commands, here we allow await which anables writing to sync + // the journal change. LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); }