diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index b76cb5ab4..3d71befbc 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -543,7 +543,7 @@ void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) { // TODO: Break slot migration upon FLUSHSLOTS journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0, /* shard_cnt= */ shard_set->size(), nullopt, - Payload("DFLYCLUSTER", args_view), false); + Payload("DFLYCLUSTER", args_view)); }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 90c10a54f..5b6661616 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -144,6 +144,8 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e if (db_slice_->WillBlockOnJournalWrite()) { return res; } + // Disable flush journal changes to prevent preemtion in GarbageCollect. + journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal()); // bool should_print = (eb.key_hash % 128) == 0; @@ -172,6 +174,8 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { if (!can_evict_ || db_slice_->WillBlockOnJournalWrite()) return 0; + // Disable flush journal changes to prevent preemtion in evict. + journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal()); constexpr size_t kNumStashBuckets = ABSL_ARRAYSIZE(eb.probes.by_type.stash_buckets); @@ -195,7 +199,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT // log the evicted keys to journal. if (auto journal = db_slice_->shard_owner()->journal(); journal) { - RecordExpiry(cntx_.db_index, key, false); + RecordExpiry(cntx_.db_index, key); } db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); @@ -453,7 +457,7 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: } if (res.it->second.HasExpire()) { // check expiry state - res = ExpireIfNeeded(cntx, res.it, true); + res = ExpireIfNeeded(cntx, res.it); if (!IsValid(res.it)) { return OpStatus::KEY_NOTFOUND; } @@ -1077,12 +1081,11 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size } DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const { - auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false); + auto res = ExpireIfNeeded(cntx, it.GetInnerIt()); return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)}; } -DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it, - bool preempts) const { +DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const { if (!it->second.HasExpire()) { LOG(ERROR) << "Invalid call to ExpireIfNeeded"; return {it, ExpireIterator{}}; @@ -1112,7 +1115,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato // Replicate expiry if (auto journal = owner_->journal(); journal) { - RecordExpiry(cntx.db_index, key, preempts); + RecordExpiry(cntx.db_index, key); } if (expired_keys_events_recording_) @@ -1136,6 +1139,8 @@ void DbSlice::ExpireAllIfNeeded() { // We hold no locks to any of the keys so we should Wait() here such that // we don't preempt in ExpireIfNeeded block_counter_.Wait(); + // Disable flush journal changes to prevent preemtion in traverse. + journal::JournalFlushGuard journal_flush_guard(owner_->journal()); for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) { if (!db_arr_[db_index]) @@ -1148,7 +1153,7 @@ void DbSlice::ExpireAllIfNeeded() { LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table"; return; } - ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false); + ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it); }; ExpireTable::Cursor cursor; @@ -1212,7 +1217,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); result.deleted_bytes += prime_it->first.MallocUsed() + prime_it->second.MallocUsed(); - ExpireIfNeeded(cntx, prime_it, false); + ExpireIfNeeded(cntx, prime_it); ++result.deleted; } else { result.survivor_ttl_sum += ttl; @@ -1280,7 +1285,7 @@ pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s // fiber preemption could happen in this phase. for (string_view key : keys_to_journal) { if (auto journal = owner_->journal(); journal) - RecordExpiry(db_ind, key, false); + RecordExpiry(db_ind, key); if (expired_keys_events_recording_) db_table->expired_keys_events_.emplace_back(key); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 263fa26b3..4f442910e 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -553,7 +553,7 @@ class DbSlice { ExpireIterator exp_it; }; - PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const; + PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const; OpResult AddOrFindInternal(const Context& cntx, std::string_view key); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 5e42e2cdc..547fefb3e 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -78,7 +78,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r // We don't want any writes to the journal after we send the `PING`, // and expirations could ruin that. namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); - shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}, true); + shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}); const FlowInfo* flow = &replica->flows[shard->shard_id()]; while (flow->last_acked_lsn < shard->journal()->GetLsn()) { diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index f637f1a11..ddb674760 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -12,8 +12,8 @@ extern "C" { #include "redis/zmalloc.h" } - #include "server/engine_shard_set.h" +#include "server/journal/journal.h" #include "server/namespaces.h" #include "server/search/doc_index.h" #include "server/server_state.h" @@ -756,62 +756,56 @@ void EngineShard::Heartbeat() { } void EngineShard::RetireExpiredAndEvict() { - { - FiberAtomicGuard guard; - // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); - constexpr double kTtlDeleteLimit = 200; + // Disable flush journal changes to prevent preemtion + journal::JournalFlushGuard journal_flush_guard(journal_); - uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); - uint32_t deleted = GetMovingSum6(TTL_DELETE); - unsigned ttl_delete_target = 5; + // TODO: iterate over all namespaces + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); + constexpr double kTtlDeleteLimit = 200; - if (deleted > 10) { - // deleted should be <= traversed. - // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). - // The higher ttl_delete_target the more likely we have lots of expired items that need - // to be deleted. - ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); - } + uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); + uint32_t deleted = GetMovingSum6(TTL_DELETE); + unsigned ttl_delete_target = 5; - DbContext db_cntx; - db_cntx.time_now_ms = GetCurrentTimeMs(); - - size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; - - for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { - if (!db_slice.IsDbValid(i)) - continue; - - db_cntx.db_index = i; - auto [pt, expt] = db_slice.GetTables(i); - if (expt->size() > pt->size() / 4) { - DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); - - eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); - counter_[TTL_TRAVERSE].IncBy(stats.traversed); - counter_[TTL_DELETE].IncBy(stats.deleted); - } - - if (eviction_goal) { - uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); - auto [evicted_items, evicted_bytes] = - db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal); - - DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal - << " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes - << " bytes. Max eviction per heartbeat: " - << GetFlag(FLAGS_max_eviction_per_heartbeat); - - eviction_goal -= std::min(eviction_goal, evicted_bytes); - } - } + if (deleted > 10) { + // deleted should be <= traversed. + // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). + // The higher ttl_delete_target the more likely we have lots of expired items that need + // to be deleted. + ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); } - // Journal entries for expired entries are not writen to socket in the loop above. - // Trigger write to socket when loop finishes. - if (auto journal = EngineShard::tlocal()->journal(); journal) { - TriggerJournalWriteToSink(); + DbContext db_cntx; + db_cntx.time_now_ms = GetCurrentTimeMs(); + + size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; + + for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { + if (!db_slice.IsDbValid(i)) + continue; + + db_cntx.db_index = i; + auto [pt, expt] = db_slice.GetTables(i); + if (expt->size() > pt->size() / 4) { + DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); + + eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); + counter_[TTL_TRAVERSE].IncBy(stats.traversed); + counter_[TTL_DELETE].IncBy(stats.deleted); + } + + if (eviction_goal) { + uint32_t starting_segment_id = rand() % pt->GetSegmentCount(); + auto [evicted_items, evicted_bytes] = + db_slice.FreeMemWithEvictionStep(i, starting_segment_id, eviction_goal); + + DVLOG(2) << "Heartbeat eviction: Expected to evict " << eviction_goal + << " bytes. Actually evicted " << evicted_items << " items, " << evicted_bytes + << " bytes. Max eviction per heartbeat: " + << GetFlag(FLAGS_max_eviction_per_heartbeat); + + eviction_goal -= std::min(eviction_goal, evicted_bytes); + } } } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 1e0cb7508..17725d1c7 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -616,7 +616,8 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, // we enter the callback in a timing when journaling will not cause preemptions. Otherwise, // the bucket might change as we Traverse and yield. db_slice.BlockingCounter()->Wait(); - + // Disable flush journal changes to prevent preemtion in traverse. + journal::JournalFlushGuard journal_flush_guard(op_args.shard->journal()); util::FiberAtomicGuard guard; unsigned cnt = 0; @@ -630,6 +631,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, cur = prime_table->Traverse( cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, &scratch, vec); }); } while (cur && cnt < scan_opts.limit); + VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); *cursor = cur.value(); } diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index e1e2f994e..3e4212eb7 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -84,8 +84,12 @@ LSN Journal::GetLsn() const { } void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload, bool await) { - journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await); + std::optional slot, Entry::Payload payload) { + journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}); +} + +void Journal::SetFlushMode(bool allow_flush) { + journal_slice.SetFlushMode(allow_flush); } } // namespace journal diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 2c2be714a..ae275471a 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -3,8 +3,8 @@ // #pragma once - #include "server/journal/types.h" +#include "util/fibers/detail/fiber_interface.h" #include "util/proactor_pool.h" namespace dfly { @@ -35,11 +35,36 @@ class Journal { LSN GetLsn() const; void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - std::optional slot, Entry::Payload payload, bool await); + std::optional slot, Entry::Payload payload); + + void SetFlushMode(bool allow_flush); private: mutable util::fb2::Mutex state_mu_; }; +class JournalFlushGuard { + public: + explicit JournalFlushGuard(Journal* journal) : journal_(journal) { + if (journal_) { + journal_->SetFlushMode(false); + } + util::fb2::detail::EnterFiberAtomicSection(); + } + + ~JournalFlushGuard() { + util::fb2::detail::LeaveFiberAtomicSection(); + if (journal_) { + journal_->SetFlushMode(true); // Restore the state on destruction + } + } + + JournalFlushGuard(const JournalFlushGuard&) = delete; + JournalFlushGuard& operator=(const JournalFlushGuard&) = delete; + + private: + Journal* journal_; +}; + } // namespace journal } // namespace dfly diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index c3e9c9680..48bfe722e 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -142,34 +142,37 @@ std::string_view JournalSlice::GetEntry(LSN lsn) const { return (*ring_buffer_)[lsn - start].data; } -void JournalSlice::AddLogRecord(const Entry& entry, bool await) { +void JournalSlice::SetFlushMode(bool allow_flush) { + DCHECK(allow_flush != enable_journal_flush_); + enable_journal_flush_ = allow_flush; + if (allow_flush) { + JournalItem item; + item.lsn = -1; + item.opcode = Op::NOOP; + item.data = ""; + item.slot = {}; + CallOnChange(item); + } +} + +void JournalSlice::AddLogRecord(const Entry& entry) { DCHECK(ring_buffer_); - JournalItem dummy; - JournalItem* item; - if (entry.opcode == Op::NOOP) { - item = &dummy; - item->lsn = -1; - item->opcode = entry.opcode; - item->data = ""; - item->slot = entry.slot; - } else { + JournalItem item; + { FiberAtomicGuard fg; - // GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry - // if the buffer is full. - item = &dummy; - item->opcode = entry.opcode; - item->lsn = lsn_++; - item->cmd = entry.payload.cmd; - item->slot = entry.slot; + item.opcode = entry.opcode; + item.lsn = lsn_++; + item.cmd = entry.payload.cmd; + item.slot = entry.slot; io::BufSink buf_sink{&ring_serialize_buf_}; JournalWriter writer{&buf_sink}; writer.Write(entry); - item->data = io::View(ring_serialize_buf_.InputBuffer()); + item.data = io::View(ring_serialize_buf_.InputBuffer()); ring_serialize_buf_.Clear(); - VLOG(2) << "Writing item [" << item->lsn << "]: " << entry.ToString(); + VLOG(2) << "Writing item [" << item.lsn << "]: " << entry.ToString(); } #if 0 @@ -180,19 +183,17 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { file_offset_ += line.size(); } #endif + CallOnChange(item); +} - // TODO: Remove the callbacks, replace with notifiers - { - std::shared_lock lk(cb_mu_); - DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString() - << " num callbacks: " << change_cb_arr_.size(); +void JournalSlice::CallOnChange(const JournalItem& item) { + std::shared_lock lk(cb_mu_); - const size_t size = change_cb_arr_.size(); - auto k_v = change_cb_arr_.begin(); - for (size_t i = 0; i < size; ++i) { - k_v->second(*item, await); - ++k_v; - } + const size_t size = change_cb_arr_.size(); + auto k_v = change_cb_arr_.begin(); + for (size_t i = 0; i < size; ++i) { + k_v->second(item, enable_journal_flush_); + ++k_v; } } diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 8534d78f7..da0b18ea7 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -37,7 +37,7 @@ class JournalSlice { return slice_index_ != UINT32_MAX; } - void AddLogRecord(const Entry& entry, bool await); + void AddLogRecord(const Entry& entry); // Register a callback that will be called every time a new entry is // added to the journal. @@ -54,8 +54,16 @@ class JournalSlice { /// from the buffer. bool IsLSNInBuffer(LSN lsn) const; std::string_view GetEntry(LSN lsn) const; + // SetFlushMode with allow_flush=false is used to disable preemptions during + // subsequent calls to AddLogRecord. + // SetFlushMode with allow_flush=true flushes all log records aggregated + // since the last call with allow_flush=false. This call may preempt. + // The caller must ensure that no preemptions occur between the initial call + // with allow_flush=false and the subsequent call with allow_flush=true. + void SetFlushMode(bool allow_flush); private: + void CallOnChange(const JournalItem& item); // std::string shard_path_; // std::unique_ptr shard_file_; std::optional> ring_buffer_; @@ -69,6 +77,7 @@ class JournalSlice { uint32_t slice_index_ = UINT32_MAX; uint32_t next_cb_id_ = 1; std::error_code status_ec_; + bool enable_journal_flush_ = true; }; } // namespace journal diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 90d94ca6e..91480a181 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -56,6 +56,7 @@ void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { if (allow_await) { ThrottleIfNeeded(); // No record to write, just await if data was written so consumer will read the data. + // TODO: shouldnt we trigger async write in noop?? if (item.opcode == Op::NOOP) return; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 36372a2a4..ddfc54021 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1474,15 +1474,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul } // Record to journal autojournal commands, here we allow await which anables writing to sync // the journal change. - LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, true); + LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_); } void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt, bool allow_await) const { + uint32_t shard_cnt) const { auto journal = shard->journal(); CHECK(journal); journal->RecordEntry(txid_, journal::Op::COMMAND, db_index_, shard_cnt, - unique_slot_checker_.GetUniqueSlotId(), std::move(payload), allow_await); + unique_slot_checker_.GetUniqueSlotId(), std::move(payload)); } void Transaction::ReviveAutoJournal() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 5e209eb96..22fa79374 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -341,8 +341,8 @@ class Transaction { void PrepareMultiForScheduleSingleHop(Namespace* ns, ShardId sid, DbIndex db, CmdArgList args); // Write a journal entry to a shard journal with the given payload. - void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, - bool allow_await) const; + void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, + uint32_t shard_cnt) const; // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. void ReviveAutoJournal(); diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 70436f8c2..83bfefa93 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -53,32 +53,21 @@ size_t ShardArgs::Size() const { void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args, uint32_t shard_cnt) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); - op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true); + op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt); } void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args, uint32_t shard_cnt) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); - op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true); + op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt); } -void RecordExpiry(DbIndex dbid, string_view key, bool preempts) { +void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - if (!preempts) { - util::FiberAtomicGuard guard; - journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), - Payload("DEL", ArgSlice{key}), preempts); - return; - } + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), - Payload("DEL", ArgSlice{key}), preempts); -} - -void TriggerJournalWriteToSink() { - auto journal = EngineShard::tlocal()->journal(); - CHECK(journal); - journal->RecordEntry(0, journal::Op::NOOP, 0, 0, nullopt, {}, true); + Payload("DEL", ArgSlice{key})); } LockTag::LockTag(std::string_view key) { diff --git a/src/server/tx_base.h b/src/server/tx_base.h index 125378886..af7091a62 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -224,7 +224,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, // Record expiry in journal with independent transaction. Must be called from shard thread holding // key. -void RecordExpiry(DbIndex dbid, std::string_view key, bool preempts = false); +void RecordExpiry(DbIndex dbid, std::string_view key); // Trigger journal write to sink, no journal record will be added to journal. // Must be called from shard thread of journal to sink.