mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(server): write journal record with optional await based on flag… (#791)
* feat(server): write journal recorod with optional await based on flag issue #788 Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
8068e1a2ae
commit
50f50c8380
16 changed files with 65 additions and 31 deletions
|
@ -164,7 +164,8 @@ bool ParseDouble(string_view src, double* value) {
|
|||
|
||||
void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt,
|
||||
bool multi_commands) {
|
||||
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands);
|
||||
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands,
|
||||
false);
|
||||
}
|
||||
|
||||
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
|
||||
|
@ -174,7 +175,13 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
|
|||
void RecordExpiry(DbIndex dbid, string_view key) {
|
||||
auto journal = EngineShard::tlocal()->journal();
|
||||
CHECK(journal);
|
||||
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}));
|
||||
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}), false);
|
||||
}
|
||||
|
||||
void TriggerJournalWriteToSink() {
|
||||
auto journal = EngineShard::tlocal()->journal();
|
||||
CHECK(journal);
|
||||
journal->RecordEntry(0, journal::Op::NOOP, 0, 0, {}, true);
|
||||
}
|
||||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
|
|
@ -103,6 +103,10 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
|
|||
// key.
|
||||
void RecordExpiry(DbIndex dbid, std::string_view key);
|
||||
|
||||
// Trigger journal write to sink, no journal record will be added to journal.
|
||||
// Must be called from shard thread of journal to sink.
|
||||
void TriggerJournalWriteToSink();
|
||||
|
||||
struct TieredStats {
|
||||
size_t tiered_reads = 0;
|
||||
size_t tiered_writes = 0;
|
||||
|
|
|
@ -482,6 +482,11 @@ void EngineShard::Heartbeat() {
|
|||
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
|
||||
}
|
||||
}
|
||||
// Journal entries for expired entries are not writen to socket in the loop above.
|
||||
// Trigger write to socket when loop finishes.
|
||||
if (auto journal = EngineShard::tlocal()->journal(); journal) {
|
||||
TriggerJournalWriteToSink();
|
||||
}
|
||||
}
|
||||
|
||||
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
|
||||
|
|
|
@ -16,14 +16,24 @@ io::Result<size_t> BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t le
|
|||
return io::BufSink{&producer_buf_}.WriteSome(vec, len);
|
||||
}
|
||||
|
||||
void BufferedStreamerBase::NotifyWritten() {
|
||||
void BufferedStreamerBase::NotifyWritten(bool allow_await) {
|
||||
if (IsStopped())
|
||||
return;
|
||||
buffered_++;
|
||||
// Wake up the consumer.
|
||||
waker_.notify();
|
||||
// Block if we're stalled because the consumer is not keeping up.
|
||||
waker_.await([this]() { return !IsStalled() || IsStopped(); });
|
||||
if (allow_await) {
|
||||
waker_.await([this]() { return !IsStalled() || IsStopped(); });
|
||||
}
|
||||
}
|
||||
|
||||
void BufferedStreamerBase::AwaitIfWritten() {
|
||||
if (IsStopped())
|
||||
return;
|
||||
if (buffered_) {
|
||||
waker_.await([this]() { return !IsStalled() || IsStopped(); });
|
||||
}
|
||||
}
|
||||
|
||||
error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) {
|
||||
|
|
|
@ -41,8 +41,11 @@ class BufferedStreamerBase : public io::Sink {
|
|||
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override;
|
||||
|
||||
// Report that a batch of data has been written and the consumer can be woken up.
|
||||
// Blocks if the consumer if not keeping up.
|
||||
void NotifyWritten();
|
||||
// Blocks if the consumer if not keeping up, if allow_await is set to true.
|
||||
void NotifyWritten(bool allow_await);
|
||||
|
||||
// Blocks the if the consumer if not keeping up.
|
||||
void AwaitIfWritten();
|
||||
|
||||
// Report producer finished.
|
||||
void Finalize();
|
||||
|
|
|
@ -102,8 +102,8 @@ bool Journal::EnterLameDuck() {
|
|||
}
|
||||
|
||||
void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
|
||||
Entry::Payload payload) {
|
||||
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)});
|
||||
Entry::Payload payload, bool await) {
|
||||
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}, await);
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -49,7 +49,8 @@ class Journal {
|
|||
*/
|
||||
LSN GetLsn() const;
|
||||
|
||||
void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload);
|
||||
void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload,
|
||||
bool await);
|
||||
|
||||
private:
|
||||
mutable boost::fibers::mutex state_mu_;
|
||||
|
|
|
@ -115,11 +115,11 @@ error_code JournalSlice::Close() {
|
|||
return ec;
|
||||
}
|
||||
|
||||
void JournalSlice::AddLogRecord(const Entry& entry) {
|
||||
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
||||
DCHECK(ring_buffer_);
|
||||
iterating_cb_arr_ = true;
|
||||
for (const auto& k_v : change_cb_arr_) {
|
||||
k_v.second(entry);
|
||||
k_v.second(entry, await);
|
||||
}
|
||||
iterating_cb_arr_ = false;
|
||||
|
||||
|
|
|
@ -42,7 +42,7 @@ class JournalSlice {
|
|||
return bool(shard_file_);
|
||||
}
|
||||
|
||||
void AddLogRecord(const Entry& entry);
|
||||
void AddLogRecord(const Entry& entry, bool await);
|
||||
|
||||
uint32_t RegisterOnChange(ChangeCallback cb);
|
||||
void UnregisterOnChange(uint32_t);
|
||||
|
|
|
@ -8,11 +8,16 @@ namespace dfly {
|
|||
|
||||
void JournalStreamer::Start(io::Sink* dest) {
|
||||
write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest);
|
||||
journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) {
|
||||
writer_.Write(entry);
|
||||
record_cnt_.fetch_add(1, std::memory_order_relaxed);
|
||||
NotifyWritten();
|
||||
});
|
||||
journal_cb_id_ =
|
||||
journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) {
|
||||
if (entry.opcode == journal::Op::NOOP) {
|
||||
// No recode to write, just await if data was written so consumer will read the data.
|
||||
return AwaitIfWritten();
|
||||
}
|
||||
writer_.Write(entry);
|
||||
record_cnt_.fetch_add(1, std::memory_order_relaxed);
|
||||
NotifyWritten(allow_await);
|
||||
});
|
||||
}
|
||||
|
||||
uint64_t JournalStreamer::GetRecordCount() const {
|
||||
|
|
|
@ -64,7 +64,7 @@ struct ParsedEntry : public EntryBase {
|
|||
CmdData cmd;
|
||||
};
|
||||
|
||||
using ChangeCallback = std::function<void(const Entry&)>;
|
||||
using ChangeCallback = std::function<void(const Entry&, bool await)>;
|
||||
|
||||
} // namespace journal
|
||||
} // namespace dfly
|
||||
|
|
|
@ -278,7 +278,9 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
|
|||
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
|
||||
// no database switch can be performed between those two calls, because they are part of one
|
||||
// transaction.
|
||||
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
|
||||
// OnJournalEntry registers for changes in journal, the journal change function signature is
|
||||
// (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument.
|
||||
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_await_arg) {
|
||||
// We ignore non payload entries like EXEC because we have no transactional ordering during
|
||||
// LOAD phase on replica.
|
||||
if (!entry.HasPayload()) {
|
||||
|
|
|
@ -102,7 +102,7 @@ class SliceSnapshot {
|
|||
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
|
||||
|
||||
// Journal listener
|
||||
void OnJournalEntry(const journal::Entry& entry);
|
||||
void OnJournalEntry(const journal::Entry& entry, bool unused_await_arg);
|
||||
|
||||
// Close dest channel if not closed yet.
|
||||
void CloseRecordChannel();
|
||||
|
|
|
@ -1063,7 +1063,7 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
|
|||
auto journal = shard->journal();
|
||||
|
||||
if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) {
|
||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {});
|
||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true);
|
||||
}
|
||||
|
||||
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
|
||||
|
@ -1209,18 +1209,19 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
|
|||
auto cmd = facade::ToSV(cmd_with_full_args_.front());
|
||||
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
|
||||
}
|
||||
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false);
|
||||
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true);
|
||||
}
|
||||
|
||||
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
|
||||
uint32_t shard_cnt, bool multi_commands) const {
|
||||
uint32_t shard_cnt, bool multi_commands,
|
||||
bool allow_await) const {
|
||||
auto journal = shard->journal();
|
||||
CHECK(journal);
|
||||
if (multi_) {
|
||||
multi_->shard_journal_write[shard->shard_id()] = true;
|
||||
}
|
||||
auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
|
||||
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
|
||||
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload), allow_await);
|
||||
}
|
||||
|
||||
void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
|
||||
|
@ -1229,7 +1230,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
|
|||
}
|
||||
auto journal = shard->journal();
|
||||
CHECK(journal);
|
||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {});
|
||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false);
|
||||
}
|
||||
|
||||
void Transaction::BreakOnShutdown() {
|
||||
|
|
|
@ -120,10 +120,6 @@ class Transaction {
|
|||
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED.
|
||||
void BreakOnShutdown();
|
||||
|
||||
// Log a journal entry on shard with payload and shard count.
|
||||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
|
||||
uint32_t shard_cnt) const;
|
||||
|
||||
// In some cases for non auto-journaling commands we want to enable the auto journal flow.
|
||||
void RenableAutoJournal() {
|
||||
renabled_auto_journal_.store(true, std::memory_order_relaxed);
|
||||
|
@ -209,7 +205,7 @@ class Transaction {
|
|||
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
|
||||
// entry.
|
||||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
|
||||
bool multi_commands) const;
|
||||
bool multi_commands, bool allow_await) const;
|
||||
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
|
||||
|
||||
private:
|
||||
|
|
|
@ -590,7 +590,7 @@ async def test_expiry(df_local_factory, n_keys=1000):
|
|||
# Set key differnt expries times in ms
|
||||
pipe = c_master.pipeline(transaction=True)
|
||||
for k, _ in gen_test_data(n_keys):
|
||||
ms = random.randint(100, 500)
|
||||
ms = random.randint(20, 500)
|
||||
pipe.pexpire(k, ms)
|
||||
await pipe.execute()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue