feat(server): check master journal lsn in replica (#2778)

Send journal lsn to replica and compare the lsn value against number of records received in replica side

Signed-off-by: kostas <kostas@dragonflydb.io>
Co-authored-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-04-01 17:51:31 +03:00 committed by GitHub
parent 3ec43afd30
commit b2e2ad6e04
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 77 additions and 37 deletions

View file

@ -503,7 +503,8 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
if (shard != nullptr) {
flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx));
flow->streamer->Start(flow->conn->socket());
bool send_lsn = flow->version >= DflyVersion::VER4;
flow->streamer->Start(flow->conn->socket(), send_lsn);
}
// Register cleanup.

View file

@ -86,6 +86,11 @@ LSN Journal::GetLsn() const {
void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
std::optional<SlotId> slot, Entry::Payload payload, bool await) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await);
time_t now = time(nullptr);
if (now - last_lsn_joural_time_ > 2) {
journal_slice.AddLogRecord(Entry{txid, journal::Op::LSN, 0, 0, nullopt, {}}, await);
last_lsn_joural_time_ = now;
}
}
} // namespace journal

View file

@ -39,6 +39,7 @@ class Journal {
private:
mutable util::fb2::Mutex state_mu_;
time_t last_lsn_joural_time_ = 0;
};
} // namespace journal

View file

@ -142,7 +142,7 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const {
return (*ring_buffer_)[lsn - start].data;
}
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
void JournalSlice::AddLogRecord(Entry&& entry, bool await) {
optional<FiberAtomicGuard> guard;
if (!await) {
guard.emplace(); // Guard is non-movable/copyable, so we must use emplace()
@ -166,6 +166,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
item->opcode = entry.opcode;
item->lsn = lsn_++;
item->slot = entry.slot;
entry.lsn = lsn_;
io::BufSink buf_sink{&ring_serialize_buf_};
JournalWriter writer{&buf_sink};

View file

@ -37,7 +37,7 @@ class JournalSlice {
return slice_index_ != UINT32_MAX;
}
void AddLogRecord(const Entry& entry, bool await);
void AddLogRecord(Entry&& entry, bool await);
// Register a callback that will be called every time a new entry is
// added to the journal.

View file

@ -75,6 +75,8 @@ void JournalWriter::Write(const journal::Entry& entry) {
switch (entry.opcode) {
case journal::Op::SELECT:
return Write(entry.dbid);
case journal::Op::LSN:
return Write(entry.lsn);
case journal::Op::PING:
return;
case journal::Op::COMMAND:
@ -199,6 +201,11 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
return entry;
}
if (opcode == journal::Op::LSN) {
SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.lsn);
return entry;
}
SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.txid);
SET_OR_UNEXPECT(ReadUInt<uint32_t>(), entry.shard_cnt);

View file

@ -22,9 +22,9 @@ class JournalWriter {
// Write single entry to sink.
void Write(const journal::Entry& entry);
void Write(uint64_t v); // Write packed unsigned integer.
private:
void Write(uint64_t v); // Write packed unsigned integer.
void Write(std::string_view sv); // Write string.
template <typename C> // CmdArgList or ArgSlice.

View file

@ -11,22 +11,26 @@
namespace dfly {
using namespace util;
void JournalStreamer::Start(io::Sink* dest) {
void JournalStreamer::Start(io::Sink* dest, bool send_lsn) {
using namespace journal;
write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) {
if (!ShouldWrite(item)) {
return;
}
journal_cb_id_ =
journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) {
if (!ShouldWrite(item)) {
return;
}
if (item.opcode == Op::LSN && !send_lsn) {
return;
}
if (item.opcode == Op::NOOP) {
// No record to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
if (item.opcode == Op::NOOP) {
// No record to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
Write(io::Buffer(item.data));
NotifyWritten(allow_await);
});
Write(io::Buffer(item.data));
NotifyWritten(allow_await);
});
}
void JournalStreamer::Cancel() {
@ -55,12 +59,12 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal
DCHECK(slice != nullptr);
}
void RestoreStreamer::Start(io::Sink* dest) {
void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
VLOG(2) << "RestoreStreamer start";
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
JournalStreamer::Start(dest);
JournalStreamer::Start(dest, send_lsn);
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;

View file

@ -25,7 +25,7 @@ class JournalStreamer : protected BufferedStreamerBase {
JournalStreamer(JournalStreamer&& other) = delete;
// Register journal listener and start writer in fiber.
virtual void Start(io::Sink* dest);
virtual void Start(io::Sink* dest, bool send_lsn);
// Must be called on context cancellation for unblocking
// and manual cleanup.
@ -40,7 +40,6 @@ class JournalStreamer : protected BufferedStreamerBase {
return true;
}
private:
Context* cntx_;
uint32_t journal_cb_id_{0};
@ -56,7 +55,7 @@ class RestoreStreamer : public JournalStreamer {
RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx);
~RestoreStreamer() override;
void Start(io::Sink* dest) override;
void Start(io::Sink* dest, bool send_lsn = false) override;
// Cancel() must be called if Start() is called
void Cancel() override;

View file

@ -54,8 +54,10 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
opcode = entry.opcode;
switch (entry.opcode) {
case journal::Op::PING:
case journal::Op::LSN:
lsn = entry.lsn;
return;
case journal::Op::PING:
case journal::Op::FIN:
return;
case journal::Op::EXPIRED:
@ -107,13 +109,25 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
cntx->ReportError(res.error());
return std::nullopt;
}
if (lsn_.has_value()) {
++*lsn_;
}
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN ||
(res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_))
return TransactionData::FromSingle(std::move(res.value()));
res->opcode == journal::Op::LSN ||
(res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) {
TransactionData tx_data = TransactionData::FromSingle(std::move(res.value()));
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
DCHECK_NE(tx_data.lsn, 0u);
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 1000)
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
DCHECK_EQ(tx_data.lsn, *lsn_);
}
return tx_data;
}
// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);

View file

@ -51,13 +51,15 @@ struct TransactionData {
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
journal::Op opcode = journal::Op::NOOP;
uint64_t lsn = 0;
};
// Utility for reading TransactionData from a journal reader.
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
TransactionReader(bool accumulate_multi) : accumulate_multi_(accumulate_multi) {
TransactionReader(bool accumulate_multi, std::optional<uint64_t> lsn = std::nullopt)
: accumulate_multi_(accumulate_multi), lsn_(lsn) {
}
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
@ -65,6 +67,7 @@ struct TransactionReader {
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
bool accumulate_multi_ = false;
std::optional<uint64_t> lsn_ = 0;
};
} // namespace dfly

View file

@ -22,7 +22,8 @@ enum class Op : uint8_t {
MULTI_COMMAND = 11,
EXEC = 12,
PING = 13,
FIN = 14
FIN = 14,
LSN = 15
};
struct EntryBase {
@ -31,6 +32,7 @@ struct EntryBase {
DbIndex dbid;
uint32_t shard_cnt;
std::optional<SlotId> slot;
LSN lsn{0};
};
// This struct represents a single journal entry.
@ -49,12 +51,12 @@ struct Entry : public EntryBase {
}
Entry(journal::Op opcode, DbIndex dbid, std::optional<SlotId> slot_id)
: EntryBase{0, opcode, dbid, 0, slot_id}, payload{} {
: EntryBase{0, opcode, dbid, 0, slot_id, 0} {
}
Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt,
std::optional<SlotId> slot_id)
: EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{} {
: EntryBase{txid, opcode, dbid, shard_cnt, slot_id, 0} {
}
bool HasPayload() const {

View file

@ -812,7 +812,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
io::PrefixSource ps{prefix, Sock()};
JournalReader reader{&ps, 0};
TransactionReader tx_reader{use_multi_shard_exe_sync_};
TransactionReader tx_reader{use_multi_shard_exe_sync_, journal_rec_executed_};
if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
@ -830,8 +830,9 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
break;
last_io_time_ = Proactor()->GetMonotonicTimeNs();
if (tx_data->opcode == journal::Op::PING) {
if (tx_data->opcode == journal::Op::LSN) {
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else if (tx_data->opcode == journal::Op::PING) {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else if (tx_data->opcode == journal::Op::EXEC) {

View file

@ -331,7 +331,8 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// We ignore EXEC and NOOP entries because we they have no meaning during
// the LOAD phase on replica.
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC)
if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC ||
item.opcode == journal::Op::LSN)
return;
serializer_->WriteJournalEntry(item.data);

View file

@ -696,7 +696,7 @@ void Transaction::RunCallback(EngineShard* shard) {
coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard
}
// Log to jounrnal only once the command finished running
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
}

View file

@ -33,8 +33,11 @@ enum class DflyVersion {
// ACL with user replication
VER3,
// - Periodic lag checks from master to replica
VER4,
// Always points to the latest version
CURRENT_VER = VER3,
CURRENT_VER = VER4,
};
} // namespace dfly

View file

@ -136,7 +136,6 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
if not waiting_for:
return
await asyncio.sleep(0.2)
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
finished_list = await asyncio.gather(
*(check_replica_finished_exec(c, m_offset) for c in waiting_for)
@ -1655,7 +1654,6 @@ async def test_network_disconnect(df_local_factory, df_seeder_factory):
master.stop()
replica.stop()
assert replica.is_in_logs("partial sync finished in")
async def test_network_disconnect_active_stream(df_local_factory, df_seeder_factory):
@ -1698,7 +1696,6 @@ async def test_network_disconnect_active_stream(df_local_factory, df_seeder_fact
master.stop()
replica.stop()
assert replica.is_in_logs("partial sync finished in")
async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_factory):
@ -2147,6 +2144,7 @@ async def test_replica_reconnect(df_local_factory, break_conn):
assert await c_master.execute_command("get k") == None
assert await c_replica.execute_command("get k") == None
assert await c_master.execute_command("set k 6789")
await check_all_replicas_finished([c_replica], c_master)
assert await c_replica.execute_command("get k") == "6789"
assert not await is_replicaiton_conn_down(c_replica)