mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
bug(server): do not write lsn opcode to journal (#2814)
Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
a93ad4e86f
commit
bb242a7894
10 changed files with 26 additions and 18 deletions
|
@ -86,11 +86,6 @@ LSN Journal::GetLsn() const {
|
|||
void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
|
||||
std::optional<SlotId> slot, Entry::Payload payload, bool await) {
|
||||
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await);
|
||||
time_t now = time(nullptr);
|
||||
if (now - last_lsn_joural_time_ > 2) {
|
||||
journal_slice.AddLogRecord(Entry{txid, journal::Op::LSN, 0, 0, nullopt, {}}, await);
|
||||
last_lsn_joural_time_ = now;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace journal
|
||||
|
|
|
@ -142,7 +142,7 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const {
|
|||
return (*ring_buffer_)[lsn - start].data;
|
||||
}
|
||||
|
||||
void JournalSlice::AddLogRecord(Entry&& entry, bool await) {
|
||||
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
||||
optional<FiberAtomicGuard> guard;
|
||||
if (!await) {
|
||||
guard.emplace(); // Guard is non-movable/copyable, so we must use emplace()
|
||||
|
@ -166,7 +166,6 @@ void JournalSlice::AddLogRecord(Entry&& entry, bool await) {
|
|||
item->opcode = entry.opcode;
|
||||
item->lsn = lsn_++;
|
||||
item->slot = entry.slot;
|
||||
entry.lsn = lsn_;
|
||||
|
||||
io::BufSink buf_sink{&ring_serialize_buf_};
|
||||
JournalWriter writer{&buf_sink};
|
||||
|
|
|
@ -37,7 +37,7 @@ class JournalSlice {
|
|||
return slice_index_ != UINT32_MAX;
|
||||
}
|
||||
|
||||
void AddLogRecord(Entry&& entry, bool await);
|
||||
void AddLogRecord(const Entry& entry, bool await);
|
||||
|
||||
// Register a callback that will be called every time a new entry is
|
||||
// added to the journal.
|
||||
|
|
|
@ -63,7 +63,8 @@ void JournalWriter::Write(std::monostate) {
|
|||
|
||||
void JournalWriter::Write(const journal::Entry& entry) {
|
||||
// Check if entry has a new db index and we need to emit a SELECT entry.
|
||||
if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) {
|
||||
if (entry.opcode != journal::Op::SELECT && entry.opcode != journal::Op::LSN &&
|
||||
entry.opcode != journal::Op::PING && (!cur_dbid_ || entry.dbid != *cur_dbid_)) {
|
||||
Write(journal::Entry{journal::Op::SELECT, entry.dbid, entry.slot});
|
||||
cur_dbid_ = entry.dbid;
|
||||
}
|
||||
|
|
|
@ -19,9 +19,6 @@ void JournalStreamer::Start(io::Sink* dest, bool send_lsn) {
|
|||
if (!ShouldWrite(item)) {
|
||||
return;
|
||||
}
|
||||
if (item.opcode == Op::LSN && !send_lsn) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (item.opcode == Op::NOOP) {
|
||||
// No record to write, just await if data was written so consumer will read the data.
|
||||
|
@ -29,6 +26,15 @@ void JournalStreamer::Start(io::Sink* dest, bool send_lsn) {
|
|||
}
|
||||
|
||||
Write(io::Buffer(item.data));
|
||||
time_t now = time(nullptr);
|
||||
if (send_lsn && now - last_lsn_time_ > 3) {
|
||||
last_lsn_time_ = now;
|
||||
base::IoBuf tmp;
|
||||
io::BufSink sink(&tmp);
|
||||
JournalWriter writer(&sink);
|
||||
writer.Write(Entry{journal::Op::LSN, item.lsn});
|
||||
Write(io::Buffer(io::View(tmp.InputBuffer())));
|
||||
}
|
||||
NotifyWritten(allow_await);
|
||||
});
|
||||
}
|
||||
|
|
|
@ -44,6 +44,7 @@ class JournalStreamer : protected BufferedStreamerBase {
|
|||
|
||||
uint32_t journal_cb_id_{0};
|
||||
journal::Journal* journal_;
|
||||
time_t last_lsn_time_ = 0;
|
||||
|
||||
util::fb2::Fiber write_fb_{};
|
||||
};
|
||||
|
|
|
@ -109,7 +109,9 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
|
|||
cntx->ReportError(res.error());
|
||||
return std::nullopt;
|
||||
}
|
||||
if (lsn_.has_value()) {
|
||||
|
||||
// When LSN opcode is sent master does not increase journal lsn.
|
||||
if (lsn_.has_value() && res->opcode != journal::Op::LSN) {
|
||||
++*lsn_;
|
||||
}
|
||||
|
||||
|
@ -122,7 +124,7 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
|
|||
TransactionData tx_data = TransactionData::FromSingle(std::move(res.value()));
|
||||
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
|
||||
DCHECK_NE(tx_data.lsn, 0u);
|
||||
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 1000)
|
||||
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
|
||||
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
|
||||
DCHECK_EQ(tx_data.lsn, *lsn_);
|
||||
}
|
||||
|
|
|
@ -54,6 +54,9 @@ struct Entry : public EntryBase {
|
|||
: EntryBase{0, opcode, dbid, 0, slot_id, 0} {
|
||||
}
|
||||
|
||||
Entry(journal::Op opcode, LSN lsn) : EntryBase{0, opcode, 0, 0, std::nullopt, lsn} {
|
||||
}
|
||||
|
||||
Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt,
|
||||
std::optional<SlotId> slot_id)
|
||||
: EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} {
|
||||
|
|
|
@ -812,7 +812,9 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
|
|||
io::PrefixSource ps{prefix, Sock()};
|
||||
|
||||
JournalReader reader{&ps, 0};
|
||||
TransactionReader tx_reader{use_multi_shard_exe_sync_, journal_rec_executed_};
|
||||
DCHECK_GE(journal_rec_executed_, 1u);
|
||||
TransactionReader tx_reader{use_multi_shard_exe_sync_,
|
||||
journal_rec_executed_.load(std::memory_order_relaxed) - 1};
|
||||
|
||||
if (master_context_.version > DflyVersion::VER0) {
|
||||
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
|
||||
|
@ -831,7 +833,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
|
|||
|
||||
last_io_time_ = Proactor()->GetMonotonicTimeNs();
|
||||
if (tx_data->opcode == journal::Op::LSN) {
|
||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
|
||||
// Do nothing
|
||||
} else if (tx_data->opcode == journal::Op::PING) {
|
||||
force_ping_ = true;
|
||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
|
||||
|
|
|
@ -331,8 +331,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
|
|||
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
|
||||
// We ignore EXEC and NOOP entries because we they have no meaning during
|
||||
// the LOAD phase on replica.
|
||||
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC ||
|
||||
item.opcode == journal::Op::LSN)
|
||||
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC)
|
||||
return;
|
||||
|
||||
serializer_->WriteJournalEntry(item.data);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue