From bb242a78941df8a1b31fd55ae0a875e8c662618b Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 2 Apr 2024 09:51:42 +0300 Subject: [PATCH] bug(server): do not write lsn opcode to journal (#2814) Signed-off-by: adi_holden --- src/server/journal/journal.cc | 5 ----- src/server/journal/journal_slice.cc | 3 +-- src/server/journal/journal_slice.h | 2 +- src/server/journal/serializer.cc | 3 ++- src/server/journal/streamer.cc | 12 +++++++++--- src/server/journal/streamer.h | 1 + src/server/journal/tx_executor.cc | 6 ++++-- src/server/journal/types.h | 3 +++ src/server/replica.cc | 6 ++++-- src/server/snapshot.cc | 3 +-- 10 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 91425ebdc..09cd66839 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -86,11 +86,6 @@ LSN Journal::GetLsn() const { void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, std::optional slot, Entry::Payload payload, bool await) { journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await); - time_t now = time(nullptr); - if (now - last_lsn_joural_time_ > 2) { - journal_slice.AddLogRecord(Entry{txid, journal::Op::LSN, 0, 0, nullopt, {}}, await); - last_lsn_joural_time_ = now; - } } } // namespace journal diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 6c1e6bd2d..f7c068f46 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -142,7 +142,7 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const { return (*ring_buffer_)[lsn - start].data; } -void JournalSlice::AddLogRecord(Entry&& entry, bool await) { +void JournalSlice::AddLogRecord(const Entry& entry, bool await) { optional guard; if (!await) { guard.emplace(); // Guard is non-movable/copyable, so we must use emplace() @@ -166,7 +166,6 @@ void JournalSlice::AddLogRecord(Entry&& entry, bool await) { item->opcode = entry.opcode; item->lsn = lsn_++; item->slot = entry.slot; - entry.lsn = lsn_; io::BufSink buf_sink{&ring_serialize_buf_}; JournalWriter writer{&buf_sink}; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index e5414bc6e..2752eb463 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -37,7 +37,7 @@ class JournalSlice { return slice_index_ != UINT32_MAX; } - void AddLogRecord(Entry&& entry, bool await); + void AddLogRecord(const Entry& entry, bool await); // Register a callback that will be called every time a new entry is // added to the journal. diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index e21162a31..3bdfa08bd 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -63,7 +63,8 @@ void JournalWriter::Write(std::monostate) { void JournalWriter::Write(const journal::Entry& entry) { // Check if entry has a new db index and we need to emit a SELECT entry. - if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { + if (entry.opcode != journal::Op::SELECT && entry.opcode != journal::Op::LSN && + entry.opcode != journal::Op::PING && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { Write(journal::Entry{journal::Op::SELECT, entry.dbid, entry.slot}); cur_dbid_ = entry.dbid; } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 8cce4a1e7..e5708d45a 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -19,9 +19,6 @@ void JournalStreamer::Start(io::Sink* dest, bool send_lsn) { if (!ShouldWrite(item)) { return; } - if (item.opcode == Op::LSN && !send_lsn) { - return; - } if (item.opcode == Op::NOOP) { // No record to write, just await if data was written so consumer will read the data. @@ -29,6 +26,15 @@ void JournalStreamer::Start(io::Sink* dest, bool send_lsn) { } Write(io::Buffer(item.data)); + time_t now = time(nullptr); + if (send_lsn && now - last_lsn_time_ > 3) { + last_lsn_time_ = now; + base::IoBuf tmp; + io::BufSink sink(&tmp); + JournalWriter writer(&sink); + writer.Write(Entry{journal::Op::LSN, item.lsn}); + Write(io::Buffer(io::View(tmp.InputBuffer()))); + } NotifyWritten(allow_await); }); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 9a4c29f6d..1d7a1c76c 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -44,6 +44,7 @@ class JournalStreamer : protected BufferedStreamerBase { uint32_t journal_cb_id_{0}; journal::Journal* journal_; + time_t last_lsn_time_ = 0; util::fb2::Fiber write_fb_{}; }; diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index 24ff4d27e..a5fa9474a 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -109,7 +109,9 @@ std::optional TransactionReader::NextTxData(JournalReader* read cntx->ReportError(res.error()); return std::nullopt; } - if (lsn_.has_value()) { + + // When LSN opcode is sent master does not increase journal lsn. + if (lsn_.has_value() && res->opcode != journal::Op::LSN) { ++*lsn_; } @@ -122,7 +124,7 @@ std::optional TransactionReader::NextTxData(JournalReader* read TransactionData tx_data = TransactionData::FromSingle(std::move(res.value())); if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) { DCHECK_NE(tx_data.lsn, 0u); - LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 1000) + LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000) << "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_; DCHECK_EQ(tx_data.lsn, *lsn_); } diff --git a/src/server/journal/types.h b/src/server/journal/types.h index ddd301f4f..dfe74b614 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -54,6 +54,9 @@ struct Entry : public EntryBase { : EntryBase{0, opcode, dbid, 0, slot_id, 0} { } + Entry(journal::Op opcode, LSN lsn) : EntryBase{0, opcode, 0, 0, std::nullopt, lsn} { + } + Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id) : EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} { diff --git a/src/server/replica.cc b/src/server/replica.cc index efa22b16c..89608f7f8 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -812,7 +812,9 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { io::PrefixSource ps{prefix, Sock()}; JournalReader reader{&ps, 0}; - TransactionReader tx_reader{use_multi_shard_exe_sync_, journal_rec_executed_}; + DCHECK_GE(journal_rec_executed_, 1u); + TransactionReader tx_reader{use_multi_shard_exe_sync_, + journal_rec_executed_.load(std::memory_order_relaxed) - 1}; if (master_context_.version > DflyVersion::VER0) { acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); @@ -831,7 +833,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { last_io_time_ = Proactor()->GetMonotonicTimeNs(); if (tx_data->opcode == journal::Op::LSN) { - journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); + // Do nothing } else if (tx_data->opcode == journal::Op::PING) { force_ping_ = true; journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index b120e5516..81460971f 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -331,8 +331,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { // We ignore EXEC and NOOP entries because we they have no meaning during // the LOAD phase on replica. - if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC || - item.opcode == journal::Op::LSN) + if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC) return; serializer_->WriteJournalEntry(item.data);