From 50f50c8380862dd4a9c7139bcadadbf5c0e31646 Mon Sep 17 00:00:00 2001 From: adiholden Date: Wed, 15 Feb 2023 09:34:24 +0200 Subject: [PATCH] =?UTF-8?q?feat(server):=20write=20journal=20record=20with?= =?UTF-8?q?=20optional=20await=20based=20on=20flag=E2=80=A6=20(#791)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(server): write journal recorod with optional await based on flag issue #788 Signed-off-by: adi_holden --- src/server/common.cc | 11 +++++++++-- src/server/common.h | 4 ++++ src/server/engine_shard_set.cc | 5 +++++ src/server/io_utils.cc | 14 ++++++++++++-- src/server/io_utils.h | 7 +++++-- src/server/journal/journal.cc | 4 ++-- src/server/journal/journal.h | 3 ++- src/server/journal/journal_slice.cc | 4 ++-- src/server/journal/journal_slice.h | 2 +- src/server/journal/streamer.cc | 15 ++++++++++----- src/server/journal/types.h | 2 +- src/server/snapshot.cc | 4 +++- src/server/snapshot.h | 2 +- src/server/transaction.cc | 11 ++++++----- src/server/transaction.h | 6 +----- tests/dragonfly/replication_test.py | 2 +- 16 files changed, 65 insertions(+), 31 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index ded7c2552..b98b31e67 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -164,7 +164,8 @@ bool ParseDouble(string_view src, double* value) { void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt, bool multi_commands) { - op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands); + op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands, + false); } void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { @@ -174,7 +175,13 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key})); + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}), false); +} + +void TriggerJournalWriteToSink() { + auto journal = EngineShard::tlocal()->journal(); + CHECK(journal); + journal->RecordEntry(0, journal::Op::NOOP, 0, 0, {}, true); } #define ADD(x) (x) += o.x diff --git a/src/server/common.h b/src/server/common.h index 2b2d5e09b..90d4072d3 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -103,6 +103,10 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt); // key. void RecordExpiry(DbIndex dbid, std::string_view key); +// Trigger journal write to sink, no journal record will be added to journal. +// Must be called from shard thread of journal to sink. +void TriggerJournalWriteToSink(); + struct TieredStats { size_t tiered_reads = 0; size_t tiered_writes = 0; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 9677f3f2a..763c17b2c 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -482,6 +482,11 @@ void EngineShard::Heartbeat() { db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget()); } } + // Journal entries for expired entries are not writen to socket in the loop above. + // Trigger write to socket when loop finishes. + if (auto journal = EngineShard::tlocal()->journal(); journal) { + TriggerJournalWriteToSink(); + } } void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { diff --git a/src/server/io_utils.cc b/src/server/io_utils.cc index a44583da4..0e652e463 100644 --- a/src/server/io_utils.cc +++ b/src/server/io_utils.cc @@ -16,14 +16,24 @@ io::Result BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t le return io::BufSink{&producer_buf_}.WriteSome(vec, len); } -void BufferedStreamerBase::NotifyWritten() { +void BufferedStreamerBase::NotifyWritten(bool allow_await) { if (IsStopped()) return; buffered_++; // Wake up the consumer. waker_.notify(); // Block if we're stalled because the consumer is not keeping up. - waker_.await([this]() { return !IsStalled() || IsStopped(); }); + if (allow_await) { + waker_.await([this]() { return !IsStalled() || IsStopped(); }); + } +} + +void BufferedStreamerBase::AwaitIfWritten() { + if (IsStopped()) + return; + if (buffered_) { + waker_.await([this]() { return !IsStalled() || IsStopped(); }); + } } error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) { diff --git a/src/server/io_utils.h b/src/server/io_utils.h index 459f83b77..807922845 100644 --- a/src/server/io_utils.h +++ b/src/server/io_utils.h @@ -41,8 +41,11 @@ class BufferedStreamerBase : public io::Sink { io::Result WriteSome(const iovec* vec, uint32_t len) override; // Report that a batch of data has been written and the consumer can be woken up. - // Blocks if the consumer if not keeping up. - void NotifyWritten(); + // Blocks if the consumer if not keeping up, if allow_await is set to true. + void NotifyWritten(bool allow_await); + + // Blocks the if the consumer if not keeping up. + void AwaitIfWritten(); // Report producer finished. void Finalize(); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 78e868b18..37f12961e 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -102,8 +102,8 @@ bool Journal::EnterLameDuck() { } void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - Entry::Payload payload) { - journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}); + Entry::Payload payload, bool await) { + journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}, await); } /* diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 1002cdf7b..cdcfe795e 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -49,7 +49,8 @@ class Journal { */ LSN GetLsn() const; - void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload); + void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload, + bool await); private: mutable boost::fibers::mutex state_mu_; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index f8e42aa20..88aa3d190 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -115,11 +115,11 @@ error_code JournalSlice::Close() { return ec; } -void JournalSlice::AddLogRecord(const Entry& entry) { +void JournalSlice::AddLogRecord(const Entry& entry, bool await) { DCHECK(ring_buffer_); iterating_cb_arr_ = true; for (const auto& k_v : change_cb_arr_) { - k_v.second(entry); + k_v.second(entry, await); } iterating_cb_arr_ = false; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index ef63e59de..be083dcb3 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -42,7 +42,7 @@ class JournalSlice { return bool(shard_file_); } - void AddLogRecord(const Entry& entry); + void AddLogRecord(const Entry& entry, bool await); uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 0b6cc11b7..5b5ca5507 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -8,11 +8,16 @@ namespace dfly { void JournalStreamer::Start(io::Sink* dest) { write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest); - journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) { - writer_.Write(entry); - record_cnt_.fetch_add(1, std::memory_order_relaxed); - NotifyWritten(); - }); + journal_cb_id_ = + journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) { + if (entry.opcode == journal::Op::NOOP) { + // No recode to write, just await if data was written so consumer will read the data. + return AwaitIfWritten(); + } + writer_.Write(entry); + record_cnt_.fetch_add(1, std::memory_order_relaxed); + NotifyWritten(allow_await); + }); } uint64_t JournalStreamer::GetRecordCount() const { diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 4b3a14eda..6e5f674f8 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -64,7 +64,7 @@ struct ParsedEntry : public EntryBase { CmdData cmd; }; -using ChangeCallback = std::function; +using ChangeCallback = std::function; } // namespace journal } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index fbea40440..d3c929bb2 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -278,7 +278,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and // no database switch can be performed between those two calls, because they are part of one // transaction. -void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { +// OnJournalEntry registers for changes in journal, the journal change function signature is +// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument. +void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_await_arg) { // We ignore non payload entries like EXEC because we have no transactional ordering during // LOAD phase on replica. if (!entry.HasPayload()) { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 2d81b6f36..4845264dd 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -102,7 +102,7 @@ class SliceSnapshot { void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); // Journal listener - void OnJournalEntry(const journal::Entry& entry); + void OnJournalEntry(const journal::Entry& entry, bool unused_await_arg); // Close dest channel if not closed yet. void CloseRecordChannel(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f982fe5fa..dc4ee0f3d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1063,7 +1063,7 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E auto journal = shard->journal(); if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true); } if (multi_->multi_opts & CO::GLOBAL_TRANS) { @@ -1209,18 +1209,19 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { auto cmd = facade::ToSV(cmd_with_full_args_.front()); entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id())); } - LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false); + LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); } void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt, bool multi_commands) const { + uint32_t shard_cnt, bool multi_commands, + bool allow_await) const { auto journal = shard->journal(); CHECK(journal); if (multi_) { multi_->shard_journal_write[shard->shard_id()] = true; } auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; - journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload)); + journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload), allow_await); } void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const { @@ -1229,7 +1230,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt } auto journal = shard->journal(); CHECK(journal); - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false); } void Transaction::BreakOnShutdown() { diff --git a/src/server/transaction.h b/src/server/transaction.h index cb29af117..515c3f953 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -120,10 +120,6 @@ class Transaction { // Cancel all blocking watches on shutdown. Set COORD_CANCELLED. void BreakOnShutdown(); - // Log a journal entry on shard with payload and shard count. - void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt) const; - // In some cases for non auto-journaling commands we want to enable the auto journal flow. void RenableAutoJournal() { renabled_auto_journal_.store(true, std::memory_order_relaxed); @@ -209,7 +205,7 @@ class Transaction { // multi_commands to true and call the FinishLogJournalOnShard function after logging the final // entry. void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, - bool multi_commands) const; + bool multi_commands, bool allow_await) const; void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; private: diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 0b2843c4c..c9ccb387a 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -590,7 +590,7 @@ async def test_expiry(df_local_factory, n_keys=1000): # Set key differnt expries times in ms pipe = c_master.pipeline(transaction=True) for k, _ in gen_test_data(n_keys): - ms = random.randint(100, 500) + ms = random.randint(20, 500) pipe.pexpire(k, ms) await pipe.execute()