From b2e2ad6e0466938c74ad7cacf91c859383d1ef04 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 1 Apr 2024 17:51:31 +0300 Subject: [PATCH] feat(server): check master journal lsn in replica (#2778) Send journal lsn to replica and compare the lsn value against number of records received in replica side Signed-off-by: kostas Co-authored-by: adi_holden --- src/server/dflycmd.cc | 3 ++- src/server/journal/journal.cc | 5 +++++ src/server/journal/journal.h | 1 + src/server/journal/journal_slice.cc | 3 ++- src/server/journal/journal_slice.h | 2 +- src/server/journal/serializer.cc | 7 +++++++ src/server/journal/serializer.h | 2 +- src/server/journal/streamer.cc | 32 ++++++++++++++++------------- src/server/journal/streamer.h | 5 ++--- src/server/journal/tx_executor.cc | 20 +++++++++++++++--- src/server/journal/tx_executor.h | 5 ++++- src/server/journal/types.h | 8 +++++--- src/server/replica.cc | 7 ++++--- src/server/snapshot.cc | 3 ++- src/server/transaction.cc | 2 +- src/server/version.h | 5 ++++- tests/dragonfly/replication_test.py | 4 +--- 17 files changed, 77 insertions(+), 37 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index efefbff70..9fa7f26f2 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -503,7 +503,8 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS if (shard != nullptr) { flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx)); - flow->streamer->Start(flow->conn->socket()); + bool send_lsn = flow->version >= DflyVersion::VER4; + flow->streamer->Start(flow->conn->socket(), send_lsn); } // Register cleanup. diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 09cd66839..91425ebdc 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -86,6 +86,11 @@ LSN Journal::GetLsn() const { void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, std::optional slot, Entry::Payload payload, bool await) { journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await); + time_t now = time(nullptr); + if (now - last_lsn_joural_time_ > 2) { + journal_slice.AddLogRecord(Entry{txid, journal::Op::LSN, 0, 0, nullopt, {}}, await); + last_lsn_joural_time_ = now; + } } } // namespace journal diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 43edb1992..0ffec04e3 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -39,6 +39,7 @@ class Journal { private: mutable util::fb2::Mutex state_mu_; + time_t last_lsn_joural_time_ = 0; }; } // namespace journal diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index f7c068f46..6c1e6bd2d 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -142,7 +142,7 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const { return (*ring_buffer_)[lsn - start].data; } -void JournalSlice::AddLogRecord(const Entry& entry, bool await) { +void JournalSlice::AddLogRecord(Entry&& entry, bool await) { optional guard; if (!await) { guard.emplace(); // Guard is non-movable/copyable, so we must use emplace() @@ -166,6 +166,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { item->opcode = entry.opcode; item->lsn = lsn_++; item->slot = entry.slot; + entry.lsn = lsn_; io::BufSink buf_sink{&ring_serialize_buf_}; JournalWriter writer{&buf_sink}; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 2752eb463..e5414bc6e 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -37,7 +37,7 @@ class JournalSlice { return slice_index_ != UINT32_MAX; } - void AddLogRecord(const Entry& entry, bool await); + void AddLogRecord(Entry&& entry, bool await); // Register a callback that will be called every time a new entry is // added to the journal. diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 9130de7ba..e21162a31 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -75,6 +75,8 @@ void JournalWriter::Write(const journal::Entry& entry) { switch (entry.opcode) { case journal::Op::SELECT: return Write(entry.dbid); + case journal::Op::LSN: + return Write(entry.lsn); case journal::Op::PING: return; case journal::Op::COMMAND: @@ -199,6 +201,11 @@ io::Result JournalReader::ReadEntry() { return entry; } + if (opcode == journal::Op::LSN) { + SET_OR_UNEXPECT(ReadUInt(), entry.lsn); + return entry; + } + SET_OR_UNEXPECT(ReadUInt(), entry.txid); SET_OR_UNEXPECT(ReadUInt(), entry.shard_cnt); diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 09ce227c0..4d832a234 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -22,9 +22,9 @@ class JournalWriter { // Write single entry to sink. void Write(const journal::Entry& entry); + void Write(uint64_t v); // Write packed unsigned integer. private: - void Write(uint64_t v); // Write packed unsigned integer. void Write(std::string_view sv); // Write string. template // CmdArgList or ArgSlice. diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index d35706246..8cce4a1e7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -11,22 +11,26 @@ namespace dfly { using namespace util; -void JournalStreamer::Start(io::Sink* dest) { +void JournalStreamer::Start(io::Sink* dest, bool send_lsn) { using namespace journal; write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest); - journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) { - if (!ShouldWrite(item)) { - return; - } + journal_cb_id_ = + journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) { + if (!ShouldWrite(item)) { + return; + } + if (item.opcode == Op::LSN && !send_lsn) { + return; + } - if (item.opcode == Op::NOOP) { - // No record to write, just await if data was written so consumer will read the data. - return AwaitIfWritten(); - } + if (item.opcode == Op::NOOP) { + // No record to write, just await if data was written so consumer will read the data. + return AwaitIfWritten(); + } - Write(io::Buffer(item.data)); - NotifyWritten(allow_await); - }); + Write(io::Buffer(item.data)); + NotifyWritten(allow_await); + }); } void JournalStreamer::Cancel() { @@ -55,12 +59,12 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal DCHECK(slice != nullptr); } -void RestoreStreamer::Start(io::Sink* dest) { +void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { VLOG(2) << "RestoreStreamer start"; auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); - JournalStreamer::Start(dest); + JournalStreamer::Start(dest, send_lsn); PrimeTable::Cursor cursor; uint64_t last_yield = 0; diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 611f62501..9a4c29f6d 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -25,7 +25,7 @@ class JournalStreamer : protected BufferedStreamerBase { JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - virtual void Start(io::Sink* dest); + virtual void Start(io::Sink* dest, bool send_lsn); // Must be called on context cancellation for unblocking // and manual cleanup. @@ -40,7 +40,6 @@ class JournalStreamer : protected BufferedStreamerBase { return true; } - private: Context* cntx_; uint32_t journal_cb_id_{0}; @@ -56,7 +55,7 @@ class RestoreStreamer : public JournalStreamer { RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); ~RestoreStreamer() override; - void Start(io::Sink* dest) override; + void Start(io::Sink* dest, bool send_lsn = false) override; // Cancel() must be called if Start() is called void Cancel() override; diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index fd0a63903..24ff4d27e 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -54,8 +54,10 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) { opcode = entry.opcode; switch (entry.opcode) { - case journal::Op::PING: + case journal::Op::LSN: + lsn = entry.lsn; return; + case journal::Op::PING: case journal::Op::FIN: return; case journal::Op::EXPIRED: @@ -107,13 +109,25 @@ std::optional TransactionReader::NextTxData(JournalReader* read cntx->ReportError(res.error()); return std::nullopt; } + if (lsn_.has_value()) { + ++*lsn_; + } // Check if journal command can be executed right away. // Expiration checks lock on master, so it never conflicts with running multi transactions. if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND || res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN || - (res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) - return TransactionData::FromSingle(std::move(res.value())); + res->opcode == journal::Op::LSN || + (res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) { + TransactionData tx_data = TransactionData::FromSingle(std::move(res.value())); + if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) { + DCHECK_NE(tx_data.lsn, 0u); + LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 1000) + << "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_; + DCHECK_EQ(tx_data.lsn, *lsn_); + } + return tx_data; + } // Otherwise, continue building multi command. DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index 8599f552a..4c4d99985 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -51,13 +51,15 @@ struct TransactionData { absl::InlinedVector commands{0}; uint32_t journal_rec_count{0}; // Count number of source entries to check offset. journal::Op opcode = journal::Op::NOOP; + uint64_t lsn = 0; }; // Utility for reading TransactionData from a journal reader. // The journal stream can contain interleaved data for multiple multi transactions, // expiries and out of order executed transactions that need to be grouped on the replica side. struct TransactionReader { - TransactionReader(bool accumulate_multi) : accumulate_multi_(accumulate_multi) { + TransactionReader(bool accumulate_multi, std::optional lsn = std::nullopt) + : accumulate_multi_(accumulate_multi), lsn_(lsn) { } std::optional NextTxData(JournalReader* reader, Context* cntx); @@ -65,6 +67,7 @@ struct TransactionReader { // Stores ongoing multi transaction data. absl::flat_hash_map current_; bool accumulate_multi_ = false; + std::optional lsn_ = 0; }; } // namespace dfly diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 82677a62c..ddd301f4f 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -22,7 +22,8 @@ enum class Op : uint8_t { MULTI_COMMAND = 11, EXEC = 12, PING = 13, - FIN = 14 + FIN = 14, + LSN = 15 }; struct EntryBase { @@ -31,6 +32,7 @@ struct EntryBase { DbIndex dbid; uint32_t shard_cnt; std::optional slot; + LSN lsn{0}; }; // This struct represents a single journal entry. @@ -49,12 +51,12 @@ struct Entry : public EntryBase { } Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) - : EntryBase{0, opcode, dbid, 0, slot_id}, payload{} { + : EntryBase{0, opcode, dbid, 0, slot_id, 0} { } Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id) - : EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{} { + : EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} { } bool HasPayload() const { diff --git a/src/server/replica.cc b/src/server/replica.cc index 969ab46b1..efa22b16c 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -812,7 +812,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { io::PrefixSource ps{prefix, Sock()}; JournalReader reader{&ps, 0}; - TransactionReader tx_reader{use_multi_shard_exe_sync_}; + TransactionReader tx_reader{use_multi_shard_exe_sync_, journal_rec_executed_}; if (master_context_.version > DflyVersion::VER0) { acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); @@ -830,8 +830,9 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { break; last_io_time_ = Proactor()->GetMonotonicTimeNs(); - - if (tx_data->opcode == journal::Op::PING) { + if (tx_data->opcode == journal::Op::LSN) { + journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); + } else if (tx_data->opcode == journal::Op::PING) { force_ping_ = true; journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); } else if (tx_data->opcode == journal::Op::EXEC) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 81460971f..b120e5516 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -331,7 +331,8 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { // We ignore EXEC and NOOP entries because we they have no meaning during // the LOAD phase on replica. - if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC) + if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC || + item.opcode == journal::Op::LSN) return; serializer_->WriteJournalEntry(item.data); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 5458974a3..36578ff7d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -696,7 +696,7 @@ void Transaction::RunCallback(EngineShard* shard) { coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard } - // Log to jounrnal only once the command finished running + // Log to journal only once the command finished running if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) LogAutoJournalOnShard(shard, result); } diff --git a/src/server/version.h b/src/server/version.h index fbf4e9af5..f5f8ea6ff 100644 --- a/src/server/version.h +++ b/src/server/version.h @@ -33,8 +33,11 @@ enum class DflyVersion { // ACL with user replication VER3, + // - Periodic lag checks from master to replica + VER4, + // Always points to the latest version - CURRENT_VER = VER3, + CURRENT_VER = VER4, }; } // namespace dfly diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 7b5f8e91a..4f8829272 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -136,7 +136,6 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20): if not waiting_for: return await asyncio.sleep(0.2) - m_offset = await c_master.execute_command("DFLY REPLICAOFFSET") finished_list = await asyncio.gather( *(check_replica_finished_exec(c, m_offset) for c in waiting_for) @@ -1655,7 +1654,6 @@ async def test_network_disconnect(df_local_factory, df_seeder_factory): master.stop() replica.stop() - assert replica.is_in_logs("partial sync finished in") async def test_network_disconnect_active_stream(df_local_factory, df_seeder_factory): @@ -1698,7 +1696,6 @@ async def test_network_disconnect_active_stream(df_local_factory, df_seeder_fact master.stop() replica.stop() - assert replica.is_in_logs("partial sync finished in") async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_factory): @@ -2147,6 +2144,7 @@ async def test_replica_reconnect(df_local_factory, break_conn): assert await c_master.execute_command("get k") == None assert await c_replica.execute_command("get k") == None assert await c_master.execute_command("set k 6789") + await check_all_replicas_finished([c_replica], c_master) assert await c_replica.execute_command("get k") == "6789" assert not await is_replicaiton_conn_down(c_replica)