From d8fda40d4dc223fb3e06952faddee264bd6dc704 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Wed, 4 Dec 2024 14:46:00 +0100 Subject: [PATCH] chore: split RecordExpiry preemptive and non-preemptive flows (#4252) * add FiberGuard to RecordExpiry for non-preemptive flows --------- Signed-off-by: kostas --- src/server/db_slice.cc | 17 +++++++++-------- src/server/db_slice.h | 2 +- src/server/generic_family.cc | 1 + src/server/tx_base.cc | 10 ++++++++-- src/server/tx_base.h | 2 +- 5 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ba1e477d6..21a85f48d 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -190,7 +190,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT // log the evicted keys to journal. if (auto journal = db_slice_->shard_owner()->journal(); journal) { - RecordExpiry(cntx_.db_index, key); + RecordExpiry(cntx_.db_index, key, false); } // Safe we already acquired util::fb2::LockGuard lk(db_slice_->GetSerializationMutex()); // on the flows that call this function @@ -450,7 +450,7 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: } if (res.it->second.HasExpire()) { // check expiry state - res = ExpireIfNeeded(cntx, res.it); + res = ExpireIfNeeded(cntx, res.it, true); if (!IsValid(res.it)) { return OpStatus::KEY_NOTFOUND; } @@ -1088,11 +1088,12 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size } DbSlice::ItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, Iterator it) const { - auto res = ExpireIfNeeded(cntx, it.GetInnerIt()); + auto res = ExpireIfNeeded(cntx, it.GetInnerIt(), false); return {.it = Iterator::FromPrime(res.it), .exp_it = ExpIterator::FromPrime(res.exp_it)}; } -DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const { +DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it, + bool preempts) const { if (!it->second.HasExpire()) { LOG(ERROR) << "Invalid call to ExpireIfNeeded"; return {it, ExpireIterator{}}; @@ -1122,7 +1123,7 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato // Replicate expiry if (auto journal = owner_->journal(); journal) { - RecordExpiry(cntx.db_index, key); + RecordExpiry(cntx.db_index, key, preempts); } if (expired_keys_events_recording_) @@ -1153,7 +1154,7 @@ void DbSlice::ExpireAllIfNeeded() { LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table"; return; } - ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it); + ExpireIfNeeded(Context{nullptr, db_index, GetCurrentTimeMs()}, prime_it, false); }; ExpireTable::Cursor cursor; @@ -1215,7 +1216,7 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx if (ttl <= 0) { auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); - ExpireIfNeeded(cntx, prime_it); + ExpireIfNeeded(cntx, prime_it, false); ++result.deleted; } else { result.survivor_ttl_sum += ttl; @@ -1283,7 +1284,7 @@ pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s // fiber preemption could happen in this phase. for (string_view key : keys_to_journal) { if (auto journal = owner_->journal(); journal) - RecordExpiry(db_ind, key); + RecordExpiry(db_ind, key, false); if (expired_keys_events_recording_) db_table->expired_keys_events_.emplace_back(key); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 3321aff99..a9c72823d 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -555,7 +555,7 @@ class DbSlice { ExpireIterator exp_it; }; - PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const; + PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it, bool preempts = false) const; OpResult AddOrFindInternal(const Context& cntx, std::string_view key); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 0047b978d..fd1f2743d 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -597,6 +597,7 @@ void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, auto& db_slice = op_args.GetDbSlice(); DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); + util::FiberAtomicGuard guard; unsigned cnt = 0; VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has " diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 9eb6ba09b..70436f8c2 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -62,11 +62,17 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, true); } -void RecordExpiry(DbIndex dbid, string_view key) { +void RecordExpiry(DbIndex dbid, string_view key, bool preempts) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); + if (!preempts) { + util::FiberAtomicGuard guard; + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), + Payload("DEL", ArgSlice{key}), preempts); + return; + } journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), - Payload("DEL", ArgSlice{key}), false); + Payload("DEL", ArgSlice{key}), preempts); } void TriggerJournalWriteToSink() { diff --git a/src/server/tx_base.h b/src/server/tx_base.h index af7091a62..125378886 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -224,7 +224,7 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, // Record expiry in journal with independent transaction. Must be called from shard thread holding // key. -void RecordExpiry(DbIndex dbid, std::string_view key); +void RecordExpiry(DbIndex dbid, std::string_view key, bool preempts = false); // Trigger journal write to sink, no journal record will be added to journal. // Must be called from shard thread of journal to sink.