diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index a5b5cb8ab..584fed2a0 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -39,4 +39,4 @@ cxx_test(json_family_test dfly_test_lib LABELS DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test json_family_test list_family_test generic_family_test memcache_parser_test rdb_test - redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test) + redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test) diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 29d3086c9..eb3eb1d2b 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -184,25 +184,23 @@ bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry) { // Helper functions to access the data or change it class OverrideValue { - EngineShard* shard_ = nullptr; - DbIndex index_ = 0; + const OpArgs& args_; public: - explicit OverrideValue(const OpArgs& args) : shard_{args.shard}, index_{args.db_ind} { + explicit OverrideValue(const OpArgs& args) : args_{args} { } OpResult Set(std::string_view key, uint32_t offset, bool bit_value); }; OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bit_value) { - auto& db_slice = shard_->db_slice(); - DbIndex index = index_; + auto& db_slice = args_.shard->db_slice(); - DCHECK(db_slice.IsDbValid(index_)); + DCHECK(db_slice.IsDbValid(args_.db_cntx.db_index)); std::pair add_res; try { - add_res = db_slice.AddOrFind(index_, key); + add_res = db_slice.AddOrFind(args_.db_cntx, key); } catch (const std::bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -210,9 +208,9 @@ OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bi PrimeIterator& it = add_res.first; bool added = add_res.second; auto UpdateBitMapValue = [&](std::string_view value) { - db_slice.PreUpdate(index, it); + db_slice.PreUpdate(args_.db_cntx.db_index, it); it->second.SetString(value); - db_slice.PostUpdate(index, it, key, !added); + db_slice.PostUpdate(args_.db_cntx.db_index, it, key, !added); }; if (added) { // this is a new entry in the "table" @@ -224,7 +222,7 @@ OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bi return OpStatus::WRONG_TYPE; } bool reset = false; - std::string existing_entry{GetString(shard_, it->second)}; + std::string existing_entry{GetString(args_.shard, it->second)}; if ((existing_entry.size() * OFFSET_FACTOR) <= offset) { // need to resize first existing_entry.resize(GetByteIndex(offset) + 1, 0); reset = true; @@ -398,7 +396,7 @@ OpResult ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui } OpResult ReadValue(const OpArgs& op_args, std::string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) { return it_res.status(); } diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 700d6699c..eb1f234f7 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -106,18 +106,22 @@ void BlockingController::RunStep(Transaction* completed_t) { } } + DbContext context; + context.time_now_ms = GetCurrentTimeMs(); + for (DbIndex index : awakened_indices_) { auto dbit = watched_dbs_.find(index); if (dbit == watched_dbs_.end()) continue; + context.db_index = index; DbWatchTable& wt = *dbit->second; for (auto key : wt.awakened_keys) { string_view sv_key = static_cast(key); DVLOG(1) << "Processing awakened key " << sv_key; // Double verify we still got the item. - auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key); + auto [it, exp_it] = owner_->db_slice().FindExt(context, sv_key); if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. continue; diff --git a/src/server/common.h b/src/server/common.h index 7be5b7eaa..8c8522d6b 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -69,15 +69,20 @@ struct KeyIndex { } }; +struct DbContext { + DbIndex db_index = 0; + uint64_t time_now_ms = 0; +}; + struct OpArgs { EngineShard* shard; TxId txid; - DbIndex db_ind; + DbContext db_cntx; - OpArgs() : shard(nullptr), txid(0), db_ind(0) { + OpArgs() : shard(nullptr), txid(0) { } - OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) { + OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) { } }; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index f045df5f8..f73d242b4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -61,9 +61,9 @@ class PrimeEvictionPolicy { static constexpr bool can_evict = true; // we implement eviction functionality. static constexpr bool can_gc = true; - PrimeEvictionPolicy(DbIndex db_indx, bool can_evict, ssize_t mem_budget, ssize_t soft_limit, - DbSlice* db_slice) - : db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), db_indx_(db_indx), + PrimeEvictionPolicy(const DbContext& cntx, bool can_evict, ssize_t mem_budget, + ssize_t soft_limit, DbSlice* db_slice) + : db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), cntx_(cntx), can_evict_(can_evict) { } @@ -94,9 +94,10 @@ class PrimeEvictionPolicy { DbSlice* db_slice_; ssize_t mem_budget_; ssize_t soft_limit_ = 0; + const DbContext cntx_; + unsigned evicted_ = 0; unsigned checked_ = 0; - const DbIndex db_indx_; // unlike static constexpr can_evict, this parameter tells whether we can evict // items in runtime. @@ -138,7 +139,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e for (; !bucket_it.is_done(); ++bucket_it) { if (bucket_it->second.HasExpire()) { ++checked_; - auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, bucket_it); + auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(cntx_, bucket_it); if (prime_it.is_done()) ++res; } @@ -164,7 +165,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT return 0; } - DbTable* table = db_slice_->GetDBTable(db_indx_); + DbTable* table = db_slice_->GetDBTable(cntx_.db_index); EvictItemFun(last_slot_it, table); ++evicted_; } @@ -256,9 +257,9 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { db->prime.Reserve(key_size); } -auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const +auto DbSlice::Find(const Context& cntx, string_view key, unsigned req_obj_type) const -> OpResult { - auto it = FindExt(db_index, key).first; + auto it = FindExt(cntx, key).first; if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; @@ -270,13 +271,13 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con return it; } -pair DbSlice::FindExt(DbIndex db_ind, string_view key) const { +pair DbSlice::FindExt(const Context& cntx, string_view key) const { pair res; - if (!IsDbValid(db_ind)) + if (!IsDbValid(cntx.db_index)) return res; - auto& db = *db_arr_[db_ind]; + auto& db = *db_arr_[cntx.db_index]; res.first = db.prime.Find(key); if (!IsValid(res.first)) { @@ -284,14 +285,14 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view } if (res.first->second.HasExpire()) { // check expiry state - res = ExpireIfNeeded(db_ind, res.first); + res = ExpireIfNeeded(cntx, res.first); } if (caching_mode_ && IsValid(res.first)) { if (!change_cb_.empty()) { auto bump_cb = [&](PrimeTable::bucket_iterator bit) { for (const auto& ccb : change_cb_) { - ccb.second(db_ind, bit); + ccb.second(cntx.db_index, bit); } }; @@ -306,12 +307,12 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view return res; } -OpResult> DbSlice::FindFirst(DbIndex db_index, ArgSlice args) { +OpResult> DbSlice::FindFirst(const Context& cntx, ArgSlice args) { DCHECK(!args.empty()); for (unsigned i = 0; i < args.size(); ++i) { string_view s = args[i]; - OpResult res = Find(db_index, s, OBJ_LIST); + OpResult res = Find(cntx, s, OBJ_LIST); if (res) return make_pair(res.value(), i); if (res.status() != OpStatus::KEY_NOTFOUND) @@ -322,20 +323,20 @@ OpResult> DbSlice::FindFirst(DbIndex db_index, Arg return OpStatus::KEY_NOTFOUND; } -pair DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false) { - auto res = AddOrFind2(db_index, key); +pair DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) { + auto res = AddOrFind2(cntx, key); return make_pair(get<0>(res), get<2>(res)); } -tuple DbSlice::AddOrFind2(DbIndex db_index, +tuple DbSlice::AddOrFind2(const Context& cntx, string_view key) noexcept(false) { - DCHECK(IsDbValid(db_index)); + DCHECK(IsDbValid(cntx.db_index)); - DbTable& db = *db_arr_[db_index]; + DbTable& db = *db_arr_[cntx.db_index]; // If we have some registered onchange callbacks, we must know in advance whether its Find or Add. if (!change_cb_.empty()) { - auto res = FindExt(db_index, key); + auto res = FindExt(cntx, key); if (IsValid(res.first)) { return tuple_cat(res, make_tuple(true)); @@ -343,11 +344,11 @@ tuple DbSlice::AddOrFind2(DbIndex db_index, // It's a new entry. for (const auto& ccb : change_cb_) { - ccb.second(db_index, key); + ccb.second(cntx.db_index, key); } } - PrimeEvictionPolicy evp{db_index, bool(caching_mode_), int64_t(memory_budget_ - key.size()), + PrimeEvictionPolicy evp{cntx, bool(caching_mode_), int64_t(memory_budget_ - key.size()), ssize_t(soft_budget_limit_), this}; // If we are over limit in non-cache scenario, just be conservative and throw. @@ -405,7 +406,7 @@ tuple DbSlice::AddOrFind2(DbIndex db_index, // TODO: to implement the incremental update of expiry values using multi-generation // expire_base_ update. Right now we use only index 0. - uint64_t delta_ms = now_ms_ - expire_base_[0]; + uint64_t delta_ms = cntx.time_now_ms - expire_base_[0]; if (expire_it->second.duration_ms() <= delta_ms) { db.expire.Erase(expire_it); @@ -535,28 +536,27 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const { return it.is_done() ? 0 : it->second; } -PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj, +PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - auto [it, added] = AddEntry(db_ind, key, std::move(obj), expire_at_ms); + auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms); CHECK(added); return it; } -pair DbSlice::AddEntry(DbIndex db_ind, string_view key, PrimeValue obj, +pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - DCHECK_LT(db_ind, db_arr_.size()); DCHECK(!obj.IsRef()); - pair res = AddOrFind(db_ind, key); + pair res = AddOrFind(cntx, key); if (!res.second) // have not inserted. return res; - auto& db = *db_arr_[db_ind]; + auto& db = *db_arr_[cntx.db_index]; auto& it = res.first; it->second = std::move(obj); - PostUpdate(db_ind, it, key, false); + PostUpdate(cntx.db_index, it, key, false); if (expire_at_ms) { it->second.SetExpire(true); @@ -685,10 +685,10 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, } } -pair DbSlice::ExpireIfNeeded(DbIndex db_ind, +pair DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const { DCHECK(it->second.HasExpire()); - auto& db = db_arr_[db_ind]; + auto& db = db_arr_[cntx.db_index]; auto expire_it = db->expire.Find(it->first); @@ -697,7 +697,7 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, // TODO: to employ multi-generation update of expire-base and the underlying values. time_t expire_time = ExpireTime(expire_it); - if (now_ms_ < expire_time) + if (time_t(cntx.time_now_ms) < expire_time) return make_pair(it, expire_it); db->expire.Erase(expire_it); @@ -725,17 +725,17 @@ void DbSlice::UnregisterOnChange(uint64_t id) { LOG(DFATAL) << "Could not find " << id << " to unregister"; } -auto DbSlice::DeleteExpiredStep(DbIndex db_ind, unsigned count) -> DeleteExpiredStats { - auto& db = *db_arr_[db_ind]; +auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats { + auto& db = *db_arr_[cntx.db_index]; DeleteExpiredStats result; auto cb = [&](ExpireIterator it) { result.traversed++; - time_t ttl = ExpireTime(it) - Now(); + time_t ttl = ExpireTime(it) - cntx.time_now_ms; if (ttl <= 0) { auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); - ExpireIfNeeded(db_ind, prime_it); + ExpireIfNeeded(cntx, prime_it); ++result.deleted; } else { result.survivor_ttl_sum += ttl; @@ -853,7 +853,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t return freed_memory_fun(); }; -void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) { +void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, + ConnectionState::ExecInfo* exec_info) { db_arr_[db_indx]->watched_keys[key].push_back(exec_info); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e253b6bdb..0efba681d 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -8,8 +8,8 @@ #include "facade/op_status.h" #include "server/common.h" -#include "server/table.h" #include "server/conn_context.h" +#include "server/table.h" namespace util { class ProactorBase; @@ -65,6 +65,8 @@ class DbSlice { size_t small_string_bytes = 0; }; + using Context = DbContext; + // ChangeReq - describes the change to the table. struct ChangeReq { // If iterator is set then it's an update to the existing bucket. @@ -90,12 +92,6 @@ class DbSlice { // Returns statistics for the whole db slice. A bit heavy operation. Stats GetStats() const; - //! UpdateExpireClock updates the expire clock for this db slice. - //! Must be a wall clock so we could replicate it between machines. - void UpdateExpireClock(uint64_t now_ms) { - now_ms_ = now_ms; - } - void UpdateExpireBase(uint64_t now, unsigned generation) { expire_base_[generation & 1] = now; } @@ -124,37 +120,34 @@ class DbSlice { return ExpirePeriod{time_ms - expire_base_[0]}; } - // returns wall clock in millis as it has been set via UpdateExpireClock. - time_t Now() const { - return now_ms_; - } - - OpResult Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const; + OpResult Find(const Context& cntx, std::string_view key, + unsigned req_obj_type) const; // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. - std::pair FindExt(DbIndex db_ind, std::string_view key) const; + std::pair FindExt(const Context& cntx, std::string_view key) const; // Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise. // If multiple keys are found, returns the first index in the ArgSlice. - OpResult> FindFirst(DbIndex db_index, ArgSlice args); + OpResult> FindFirst(const Context& cntx, ArgSlice args); // Return .second=true if insertion ocurred, false if we return the existing key. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddOrFind(DbIndex db_index, std::string_view key) noexcept(false); + std::pair AddOrFind(const Context& cntx, + std::string_view key) noexcept(false); - std::tuple AddOrFind2(DbIndex db_index, + std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddEntry(DbIndex db_ind, std::string_view key, PrimeValue obj, + std::pair AddEntry(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. // throws: bad_alloc is insertion could not happen due to out of memory. - PrimeIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, + PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. @@ -218,7 +211,8 @@ class DbSlice { // Callback functions called upon writing to the existing key. void PreUpdate(DbIndex db_ind, PrimeIterator it); - void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, bool existing_entry = true); + void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, + bool existing_entry = true); DbTableStats* MutableStats(DbIndex db_ind) { return &db_arr_[db_ind]->stats; @@ -226,7 +220,8 @@ class DbSlice { // Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it // from both tables and return PrimeIterator{}. - std::pair ExpireIfNeeded(DbIndex db_ind, PrimeIterator it) const; + std::pair ExpireIfNeeded(const Context& cntx, + PrimeIterator it) const; // Current version of this slice. // We maintain a shared versioning scheme for all databases in the slice. @@ -251,7 +246,7 @@ class DbSlice { }; // Deletes some amount of possible expired items. - DeleteExpiredStats DeleteExpiredStep(DbIndex db_indx, unsigned count); + DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count); void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes); const DbTableArray& databases() const { @@ -262,7 +257,8 @@ class DbSlice { caching_mode_ = 1; } - void RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info); + void RegisterWatchedKey(DbIndex db_indx, std::string_view key, + ConnectionState::ExecInfo* exec_info); // Unregisted all watched key entries for connection. void UnregisterConnectionWatches(ConnectionState::ExecInfo* exec_info); @@ -284,7 +280,6 @@ class DbSlice { EngineShard* owner_; - time_t now_ms_ = 0; // Used for expire logic, represents a real clock. time_t expire_base_[2]; // Used for expire logic, represents a real clock. uint64_t version_ = 1; // Used to version entries in the PrimeTable. diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 41599f74e..24a0be2c7 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -60,7 +60,8 @@ struct ObjInfo { void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, const PopulateBatch& batch) { - OpArgs op_args(EngineShard::tlocal(), 0, params.db_index); + DbContext db_cntx{batch.dbid, 0}; + OpArgs op_args(EngineShard::tlocal(), 0, db_cntx); SetCmd sg(op_args); for (unsigned i = 0; i < batch.sz; ++i) { @@ -277,7 +278,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view DbIndex db_indx = cntx_->db_index(); EngineShardSet& ess = *shard_set; std::vector ps(ess.size(), PopulateBatch{db_indx}); - SetCmd::SetParams params{db_indx}; + SetCmd::SetParams params; for (uint64_t i = from; i < from + len; ++i) { StrAppend(&key, i); @@ -324,7 +325,7 @@ void DebugCmd::Inspect(string_view key) { CHECK(!exp_it.is_done()); time_t exp_time = db_slice.ExpireTime(exp_it); - oinfo.ttl = exp_time - db_slice.Now(); + oinfo.ttl = exp_time - GetCurrentTimeMs(); oinfo.has_sec_precision = exp_it->second.is_second_precision(); } } diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 8a1ef874c..8fbbade05 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -612,7 +612,9 @@ TEST_F(DflyEngineTest, Watch) { // Check EXEC doesn't miss watched key expiration. Run({"watch", "a"}); Run({"expire", "a", "1"}); - UpdateTime(expire_now_ + 1000); + + AdvanceTime(1000); + Run({"multi"}); ASSERT_THAT(Run({"exec"}), kExecFail); @@ -637,7 +639,9 @@ TEST_F(DflyEngineTest, Watch) { Run({"watch", "a"}); Run({"set", "c", "1"}); Run({"expire", "a", "1"}); // a existed - UpdateTime(expire_now_ + 1000); + + AdvanceTime(1000); + Run({"multi"}); ASSERT_THAT(Run({"exec"}), kExecFail); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 9227cbb46..2e96b5001 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -22,10 +22,9 @@ using namespace std; ABSL_FLAG(string, backing_prefix, "", ""); -ABSL_FLAG(uint32_t, hz, 1000, - "Base frequency at which the server updates its expiry clock " - "and performs other background tasks. Warning: not advised to decrease in production, " - "because it can affect expiry precision for PSETEX etc."); +ABSL_FLAG(uint32_t, hz, 100, + "Base frequency at which the server performs other background tasks. " + "Warning: not advised to decrease in production."); ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " @@ -48,6 +47,7 @@ constexpr size_t kQueueLen = 64; thread_local EngineShard* EngineShard::shard_ = nullptr; EngineShardSet* shard_set = nullptr; +uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { ooo_runs += o.ooo_runs; @@ -284,43 +284,41 @@ bool EngineShard::HasResultConverged(TxId notifyid) const { #endif void EngineShard::Heartbeat() { - // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. - db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); + CacheStats(); + constexpr double kTtlDeleteLimit = 200; + constexpr double kRedLimitFactor = 0.1; - if (task_iters_++ % 8 == 0) { - CacheStats(); - constexpr double kTtlDeleteLimit = 200; - constexpr double kRedLimitFactor = 0.1; + uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); + uint32_t deleted = GetMovingSum6(TTL_DELETE); + unsigned ttl_delete_target = 5; - uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); - uint32_t deleted = GetMovingSum6(TTL_DELETE); - unsigned ttl_delete_target = 5; + if (deleted > 10) { + // deleted should be <= traversed. + // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). + // The higher t + ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + } - if (deleted > 10) { - // deleted should be <= traversed. - // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). - // The higher t - ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size(); + DbContext db_cntx; + db_cntx.time_now_ms = GetCurrentTimeMs(); + + for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) { + if (!db_slice_.IsDbValid(i)) + continue; + + db_cntx.db_index = i; + auto [pt, expt] = db_slice_.GetTables(i); + if (expt->size() > pt->size() / 4) { + DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(db_cntx, ttl_delete_target); + + counter_[TTL_TRAVERSE].IncBy(stats.traversed); + counter_[TTL_DELETE].IncBy(stats.deleted); } - ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size(); - - for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) { - if (!db_slice_.IsDbValid(i)) - continue; - - auto [pt, expt] = db_slice_.GetTables(i); - if (expt->size() > pt->size() / 4) { - DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(i, ttl_delete_target); - - counter_[TTL_TRAVERSE].IncBy(stats.traversed); - counter_[TTL_DELETE].IncBy(stats.deleted); - } - - // if our budget is below the limit - if (db_slice_.memory_budget() < redline) { - db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget()); - } + // if our budget is below the limit + if (db_slice_.memory_budget() < redline) { + db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget()); } } } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 200b58be2..3587371e1 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -176,7 +176,6 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; - uint64_t task_iters_ = 0; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_; @@ -287,6 +286,16 @@ inline ShardId Shard(std::string_view v, ShardId shard_num) { } +// absl::GetCurrentTimeNanos is twice faster than clock_gettime(CLOCK_REALTIME) on my laptop +// and 4 times faster than on a VM. it takes 5-10ns to do a call. + +extern uint64_t TEST_current_time_ms; + +inline uint64_t GetCurrentTimeMs() { + return TEST_current_time_ms ? TEST_current_time_ms : absl::GetCurrentTimeNanos() / 1000000; +} + + extern EngineShardSet* shard_set; } // namespace dfly diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 6343d071f..4a649a755 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -31,7 +31,7 @@ namespace { class Renamer { public: - Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { + Renamer(ShardId source_id) : src_sid_(source_id) { } void Find(Transaction* t); @@ -46,7 +46,6 @@ class Renamer { OpStatus MoveSrc(Transaction* t, EngineShard* es); OpStatus UpdateDest(Transaction* t, EngineShard* es); - DbIndex db_indx_; ShardId src_sid_; struct FindResult { @@ -73,7 +72,7 @@ void Renamer::Find(Transaction* t) { res->key = args.front(); auto& db_slice = EngineShard::tlocal()->db_slice(); - auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key); + auto [it, exp_it] = db_slice.FindExt(t->db_context(), res->key); res->found = IsValid(it); if (IsValid(it)) { @@ -116,7 +115,7 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { if (es->shard_id() == src_sid_) { // Handle source key. // TODO: to call PreUpdate/PostUpdate. - auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first; + auto it = es->db_slice().FindExt(t->db_context(), src_res_.key).first; CHECK(IsValid(it)); // We distinguish because of the SmallString that is pinned to its thread by design, @@ -129,7 +128,7 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { pv_ = std::move(it->second); it->second.SetExpire(has_expire); } - CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it. + CHECK(es->db_slice().Del(t->db_index(), it)); // delete the entry with empty value in it. } return OpStatus::OK; @@ -139,7 +138,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { if (es->shard_id() != src_sid_) { auto& db_slice = es->db_slice(); string_view dest_key = dest_res_.key; - PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; + PrimeIterator dest_it = db_slice.FindExt(t->db_context(), dest_key).first; bool is_prior_list = false; if (IsValid(dest_it)) { @@ -152,18 +151,18 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { dest_it->second = std::move(pv_); } dest_it->second.SetExpire(has_expire); // preserve expire flag. - db_slice.UpdateExpire(db_indx_, dest_it, src_res_.expire_ts); + db_slice.UpdateExpire(t->db_index(), dest_it, src_res_.expire_ts); } else { if (src_res_.ref_val.ObjType() == OBJ_STRING) { pv_.SetString(str_val_); } - dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts); + dest_it = db_slice.AddNew(t->db_context(), dest_key, std::move(pv_), src_res_.expire_ts); } dest_it->first.SetSticky(src_res_.sticky); if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { - es->blocking_controller()->AwakeWatched(db_indx_, dest_key); + es->blocking_controller()->AwakeWatched(t->db_index(), dest_key); } } @@ -181,7 +180,7 @@ struct ScanOpts { bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) { auto& db_slice = op_args.shard->db_slice(); if (it->second.HasExpire()) { - it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first; + it = db_slice.ExpireIfNeeded(op_args.db_cntx, it).first; } if (!IsValid(it)) @@ -211,15 +210,15 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) { auto& db_slice = op_args.shard->db_slice(); - DCHECK(db_slice.IsDbValid(op_args.db_ind)); + DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); unsigned cnt = 0; - VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has " - << db_slice.DbSize(op_args.db_ind); + VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has " + << db_slice.DbSize(op_args.db_cntx.db_index); PrimeTable::Cursor cur = *cursor; - auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind); + auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); do { cur = prime_table->Traverse( cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); }); @@ -245,10 +244,11 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys } cursor >>= 10; + DbContext db_cntx{.db_index = cntx->conn_state.db_index, .time_now_ms = GetCurrentTimeMs()}; do { ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index}; + OpArgs op_args{EngineShard::tlocal(), 0, db_cntx}; OpScan(op_args, scan_opts, &cursor, keys); }); @@ -543,7 +543,7 @@ OpResult OpFetchSortEntries(const OpArgs& op_args, std::string_vi bool alpha) { using namespace container_utils; - auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_ind, key); + auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_cntx, key); if (!IsValid(it) || !IsContainer(it->second)) { return OpStatus::KEY_NOTFOUND; } @@ -731,7 +731,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto it = shard->db_slice().FindExt(t->db_index(), key).first; + auto it = shard->db_slice().FindExt(t->db_context(), key).first; if (!it.is_done()) { return it->second.ObjType(); } else { @@ -746,6 +746,19 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { } } +void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) { + uint64_t now_usec; + if (cntx->transaction) { + now_usec = cntx->transaction->db_context().time_now_ms * 1000; + } else { + now_usec = absl::GetCurrentTimeNanos() / 1000; + } + + (*cntx)->StartArray(2); + (*cntx)->SendLong(now_usec / 1000000); + (*cntx)->SendLong(now_usec % 1000000); +} + OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest, ConnectionContext* cntx) { string_view key[2] = {ArgS(args, 1), ArgS(args, 2)}; @@ -763,7 +776,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des transaction->Schedule(); unsigned shard_count = shard_set->size(); - Renamer renamer{transaction->db_index(), Shard(key[0], shard_count)}; + Renamer renamer{Shard(key[0], shard_count)}; // Phase 1 -> Fetch keys from both shards. // Phase 2 -> If everything is ok, clone the source object, delete the destination object, and @@ -835,23 +848,23 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, const ExpireParams& params) { auto& db_slice = op_args.shard->db_slice(); - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; int64_t msec = (params.unit == TimeUnit::SEC) ? params.ts * 1000 : params.ts; - int64_t now_msec = db_slice.Now(); + int64_t now_msec = op_args.db_cntx.time_now_ms; int64_t rel_msec = params.absolute ? msec - now_msec : msec; if (rel_msec > kMaxExpireDeadlineSec * 1000) { return OpStatus::OUT_OF_RANGE; } if (rel_msec <= 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } else if (IsValid(expire_it)) { expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec); } else { - db_slice.UpdateExpire(op_args.db_ind, it, rel_msec + now_msec); + db_slice.UpdateExpire(op_args.db_cntx.db_index, it, rel_msec + now_msec); } return OpStatus::OK; @@ -859,14 +872,14 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) { auto& db_slice = shard->db_slice(); - auto [it, expire_it] = db_slice.FindExt(t->db_index(), key); + auto [it, expire_it] = db_slice.FindExt(t->db_context(), key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; if (!IsValid(expire_it)) return OpStatus::SKIPPED; - int64_t ttl_ms = db_slice.ExpireTime(expire_it) - db_slice.Now(); + int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->db_context().time_now_ms; DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null. return ttl_ms; } @@ -878,10 +891,10 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, ArgSlice keys) { uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto fres = db_slice.FindExt(op_args.db_ind, keys[i]); + auto fres = db_slice.FindExt(op_args.db_cntx, keys[i]); if (!IsValid(fres.first)) continue; - res += int(db_slice.Del(op_args.db_ind, fres.first)); + res += int(db_slice.Del(op_args.db_cntx.db_index, fres.first)); } return res; @@ -893,7 +906,7 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys) uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto find_res = db_slice.FindExt(op_args.db_ind, keys[i]); + auto find_res = db_slice.FindExt(op_args.db_cntx, keys[i]); res += IsValid(find_res.first); } return res; @@ -903,12 +916,12 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, bool skip_exists) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); - auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key); + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, from_key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; bool is_prior_list = false; - auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key); + auto [to_it, to_expire] = db_slice.FindExt(op_args.db_cntx, to_key); if (IsValid(to_it)) { if (skip_exists) return OpStatus::KEY_EXISTS; @@ -932,20 +945,20 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, // It is guaranteed that UpdateExpire() call does not erase the element because then // from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators, // therefore we can delete 'from_it'. - db_slice.UpdateExpire(op_args.db_ind, to_it, exp_ts); - CHECK(db_slice.Del(op_args.db_ind, from_it)); + db_slice.UpdateExpire(op_args.db_cntx.db_index, to_it, exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); } else { // Here we first delete from_it because AddNew below could invalidate from_it. // On the other hand, AddNew does not rely on the iterators - this is why we keep // the value in `from_obj`. - CHECK(db_slice.Del(op_args.db_ind, from_it)); - to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); + to_it = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts); } to_it->first.SetSticky(sticky); if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { - es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key); + es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, to_key); } return OpStatus::OK; } @@ -957,7 +970,7 @@ OpResult GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys) uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto [it, _] = db_slice.FindExt(op_args.db_ind, keys[i]); + auto [it, _] = db_slice.FindExt(op_args.db_cntx, keys[i]); if (IsValid(it) && !it->first.IsSticky()) { it->first.SetSticky(true); ++res; @@ -974,12 +987,14 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t auto& db_slice = op_args.shard->db_slice(); // Fetch value at key in current db. - auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key); + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; // Fetch value at key in target db. - auto [to_it, _] = db_slice.FindExt(target_db, key); + DbContext target_cntx = op_args.db_cntx; + target_cntx.db_index = target_db; + auto [to_it, _] = db_slice.FindExt(target_cntx, key); if (IsValid(to_it)) return OpStatus::KEY_EXISTS; @@ -993,8 +1008,8 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t // Restore expire flag after std::move. from_it->second.SetExpire(IsValid(from_expire)); - CHECK(db_slice.Del(op_args.db_ind, from_it)); - to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); + to_it = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts); to_it->first.SetSticky(sticky); if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) { @@ -1029,6 +1044,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan) << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) + << CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(Time) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 346e8a8eb..a2c9e1a55 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -60,6 +60,7 @@ class GenericFamily { static void Echo(CmdArgList args, ConnectionContext* cntx); static void Select(CmdArgList args, ConnectionContext* cntx); static void Scan(CmdArgList args, ConnectionContext* cntx); + static void Time(CmdArgList args, ConnectionContext* cntx); static void Type(CmdArgList args, ConnectionContext* cntx); static OpResult RenameGeneric(CmdArgList args, bool skip_exist_dest, diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 932be0dae..70211db2c 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -30,23 +30,23 @@ TEST_F(GenericFamilyTest, Expire) { auto resp = Run({"expire", "key", "1"}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 1000); + AdvanceTime(1000); resp = Run({"get", "key"}); EXPECT_THAT(resp, ArgType(RespExpr::NIL)); Run({"set", "key", "val"}); - resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 2000)}); + resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 2000)}); EXPECT_THAT(resp, IntArg(1)); // override - resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 3000)}); + resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 3000)}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 2999); + AdvanceTime(2999); resp = Run({"get", "key"}); EXPECT_THAT(resp, "val"); - UpdateTime(expire_now_ + 3000); + AdvanceTime(1); resp = Run({"get", "key"}); EXPECT_THAT(resp, ArgType(RespExpr::NIL)); } @@ -331,4 +331,28 @@ TEST_F(GenericFamilyTest, Sort) { ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double")); } +TEST_F(GenericFamilyTest, Time) { + auto resp = Run({"time"}); + EXPECT_THAT(resp, ArrLen(2)); + EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64)); + EXPECT_THAT(resp.GetVec()[1], ArgType(RespExpr::INT64)); + + // Check that time is the same inside a transaction. + Run({"multi"}); + Run({"time"}); + usleep(2000); + Run({"time"}); + resp = Run({"exec"}); + EXPECT_THAT(resp, ArrLen(2)); + + ASSERT_THAT(resp.GetVec()[0], ArrLen(2)); + ASSERT_THAT(resp.GetVec()[1], ArrLen(2)); + + for (int i = 0; i < 2; ++i) { + int64_t val0 = get(resp.GetVec()[0].GetVec()[i].u); + int64_t val1 = get(resp.GetVec()[1].GetVec()[i].u); + EXPECT_EQ(val0, val1); + } +} + } // namespace dfly diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 6f4862fd6..3d3586118 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -131,7 +131,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH); + auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH); if (it_res) { robj* hset = (*it_res)->second.AsRObj(); @@ -387,7 +387,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH); + auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -437,12 +437,12 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd auto& db_slice = op_args.shard->db_slice(); pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch(bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); robj* hset = nullptr; uint8_t* lp = nullptr; @@ -459,7 +459,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } hset = it->second.AsRObj(); @@ -509,7 +509,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd } } it->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); return created; } @@ -518,17 +518,17 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd DCHECK(!values.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); - db_slice.PreUpdate(op_args.db_ind, *it_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res); CompactObj& co = (*it_res)->second; robj* hset = co.AsRObj(); unsigned deleted = 0; bool key_remove = false; - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr); @@ -548,12 +548,12 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd co.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *it_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key); if (key_remove) { if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_blob_cnt--; } - db_slice.Del(op_args.db_ind, *it_res); + db_slice.Del(op_args.db_cntx.db_index, *it_res); } else if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr); } @@ -566,7 +566,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList DCHECK(!fields.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -631,7 +631,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList OpResult HSetFamily::OpLen(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (it_res) { robj* hset = (*it_res)->second.AsRObj(); @@ -644,7 +644,7 @@ OpResult HSetFamily::OpLen(const OpArgs& op_args, string_view key) { OpResult HSetFamily::OpGet(const OpArgs& op_args, string_view key, string_view field) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -682,7 +682,7 @@ OpResult HSetFamily::OpGet(const OpArgs& op_args, string_view key, strin OpResult> HSetFamily::OpGetAll(const OpArgs& op_args, string_view key, uint8_t mask) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { if (it_res.status() == OpStatus::KEY_NOTFOUND) return vector{}; @@ -729,7 +729,7 @@ OpResult> HSetFamily::OpGetAll(const OpArgs& op_args, string_view OpResult HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, string_view field) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { if (it_res.status() == OpStatus::KEY_NOTFOUND) @@ -761,9 +761,9 @@ OpResult HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, st OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_view field, IncrByParam* param) { auto& db_slice = op_args.shard->db_slice(); - const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + const auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); robj* hset = nullptr; size_t lpb = 0; @@ -777,7 +777,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); hset = it->second.AsRObj(); if (hset->encoding == OBJ_ENCODING_LISTPACK) { @@ -874,14 +874,14 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie } it->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); return OpStatus::OK; } OpResult HSetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_HASH); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH); if (!find_res) return find_res.status(); diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 5b5994250..973ad3160 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -53,17 +53,18 @@ string GetString(EngineShard* shard, const PrimeValue& pv) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; op_args.shard->journal()->RecordEntry(entry); } } void SetString(const OpArgs& op_args, string_view key, const string& value) { auto& db_slice = op_args.shard->db_slice(); - auto [it_output, added] = db_slice.AddOrFind(op_args.db_ind, key); - db_slice.PreUpdate(op_args.db_ind, it_output); + DbIndex db_index = op_args.db_cntx.db_index; + auto [it_output, added] = db_slice.AddOrFind(op_args.db_cntx, key); + db_slice.PreUpdate(db_index, it_output); it_output->second.SetString(value); - db_slice.PostUpdate(op_args.db_ind, it_output, key); + db_slice.PostUpdate(db_index, it_output, key); RecordJournal(op_args, key, it_output->second); } @@ -140,7 +141,7 @@ bool JsonErrorHandler(json_errc ec, const ser_context&) { } OpResult GetJson(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -447,8 +448,8 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path) { long total_deletions = 0; if (path.empty()) { auto& db_slice = op_args.shard->db_slice(); - auto [it, _] = db_slice.FindExt(op_args.db_ind, key); - total_deletions += long(db_slice.Del(op_args.db_ind, it)); + auto [it, _] = db_slice.FindExt(op_args.db_cntx, key); + total_deletions += long(db_slice.Del(op_args.db_cntx.db_index, it)); return total_deletions; } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 33d8c5aba..163a2f254 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -126,7 +126,7 @@ OpResult FindFirst(Transaction* trans) { auto args = t->ShardArgsInShard(shard->shard_id()); OpResult> ff_res = - shard->db_slice().FindFirst(t->db_index(), args); + shard->db_slice().FindFirst(t->db_context(), args); if (ff_res) { FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); @@ -260,7 +260,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key_, OBJ_LIST); + auto it_res = db_slice.Find(t->db_context(), key_, OBJ_LIST); CHECK(it_res); // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); @@ -278,7 +278,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) { auto& db_slice = op_args.shard->db_slice(); - auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST); + auto src_res = db_slice.Find(op_args.db_cntx, src, OBJ_LIST); if (!src_res) return src_res.status(); @@ -286,11 +286,11 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, quicklist* src_ql = GetQL(src_it->second); if (src == dest) { // simple case. - db_slice.PreUpdate(op_args.db_ind, src_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); string val = ListPop(ListDir::RIGHT, src_ql); quicklistPushHead(src_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it, src); + db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); return val; } @@ -299,7 +299,7 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, PrimeIterator dest_it; bool new_key = false; try { - tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_ind, dest); + tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_cntx, dest); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -312,25 +312,25 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, dest_it->second.ImportRObj(obj); // Insertion of dest could invalidate src_it. Find it again. - src_it = db_slice.GetTables(op_args.db_ind).first->Find(src); + src_it = db_slice.GetTables(op_args.db_cntx.db_index).first->Find(src); } else { if (dest_it->second.ObjType() != OBJ_LIST) return OpStatus::WRONG_TYPE; dest_ql = GetQL(dest_it->second); - db_slice.PreUpdate(op_args.db_ind, dest_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, dest_it); } - db_slice.PreUpdate(op_args.db_ind, src_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); string val = ListPop(ListDir::RIGHT, src_ql); quicklistPushHead(dest_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it, src); - db_slice.PostUpdate(op_args.db_ind, dest_it, dest, !new_key); + db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); + db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key); if (quicklistCount(src_ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, src_it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, src_it)); } return val; @@ -339,7 +339,7 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, // Read-only peek operation that determines wether the list exists and optionally // returns the first from right value without popping it from the list. OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { - auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) { return it_res.status(); } @@ -366,13 +366,13 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d bool new_key = false; if (skip_notexist) { - auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); it = *it_res; } else { try { - tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key); + tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -389,7 +389,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d } else { if (it->second.ObjType() != OBJ_LIST) return OpStatus::WRONG_TYPE; - es->db_slice().PreUpdate(op_args.db_ind, it); + es->db_slice().PreUpdate(op_args.db_cntx.db_index, it); ql = GetQL(it->second); } @@ -405,10 +405,10 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d if (es->blocking_controller()) { string tmp; string_view key = it->first.GetSlice(&tmp); - es->blocking_controller()->AwakeWatched(op_args.db_ind, key); + es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key); } } else { - es->db_slice().PostUpdate(op_args.db_ind, it, key, true); + es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true); } return quicklistCount(ql); @@ -417,13 +417,13 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count, bool return_results) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); StringVec res; if (quicklistCount(ql) < count) { @@ -441,10 +441,10 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u } } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return res; @@ -821,7 +821,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -831,7 +831,7 @@ OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key } OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key, long index) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); quicklist* ql = GetQL(res.value()->second); @@ -856,7 +856,7 @@ OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot, string_view elem, int insert_param) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -874,14 +874,14 @@ OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, strin int res = -1; if (found) { - db_slice.PreUpdate(op_args.db_ind, *it_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res); if (insert_param == LIST_TAIL) { quicklistInsertAfter(qiter, &entry, elem.data(), elem.size()); } else { DCHECK_EQ(LIST_HEAD, insert_param); quicklistInsertBefore(qiter, &entry, elem.data(), elem.size()); } - db_slice.PostUpdate(op_args.db_ind, *it_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key); res = quicklistCount(ql); } quicklistReleaseIterator(qiter); @@ -892,7 +892,7 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str long count) { DCHECK(!elem.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -912,7 +912,7 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str unsigned removed = 0; const uint8_t* elem_ptr = reinterpret_cast(elem.data()); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); while (quicklistNext(qiter, &entry)) { if (quicklistCompare(&entry, elem_ptr, elem.size())) { quicklistDelEntry(qiter, &entry); @@ -921,12 +921,12 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str break; } } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); quicklistReleaseIterator(qiter); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return removed; @@ -935,16 +935,16 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) { DCHECK(!elem.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size()); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (!replaced) { return OpStatus::OUT_OF_RANGE; @@ -954,7 +954,7 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view e OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -985,20 +985,20 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, rtrim = llen - end - 1; } - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); quicklistDelRange(ql, 0, ltrim); quicklistDelRange(ql, -rtrim, rtrim); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return OpStatus::OK; } OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view key, long start, long end) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -1025,7 +1025,7 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view str_vec.emplace_back(ce.ToString()); return true; }, start, end); - + return str_vec; } diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 4b66105aa..5f5f522b1 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -52,7 +52,8 @@ TEST_F(ListFamilyTest, Expire) { resp = Run({"expire", kKey1, "1"}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 1000); + AdvanceTime(1000); + resp = Run({"lpush", kKey1, "1"}); EXPECT_THAT(resp, IntArg(1)); } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index f3bda0f9d..f73bfa2f8 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1232,6 +1232,8 @@ void RdbLoader::FlushShardAsync(ShardId sid) { void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbSlice& db_slice = EngineShard::tlocal()->db_slice(); + DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()}; + for (const auto& item : ib) { std::string_view key{item.key}; PrimeValue pv; @@ -1245,7 +1247,10 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { break; } - auto [it, added] = db_slice.AddEntry(db_ind, key, std::move(pv), item.expire_ms); + if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) + continue; + + auto [it, added] = db_slice.AddEntry(db_cntx, key, std::move(pv), item.expire_ms); if (!added) { LOG(WARNING) << "RDB has duplicated key '" << key << "' in DB " << db_ind; diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 5411fefe4..889553760 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -484,8 +484,8 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v // to overwrite the key. However, if the set is empty it means we should delete the // key if it exists. if (overwrite && vals.empty()) { - auto it = db_slice.FindExt(op_args.db_ind, key).first; - db_slice.Del(op_args.db_ind, it); + auto it = db_slice.FindExt(op_args.db_cntx, key).first; + db_slice.Del(op_args.db_cntx.db_index, it); return 0; } @@ -494,7 +494,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v bool new_key = false; try { - tie(it, new_key) = db_slice.AddOrFind(op_args.db_ind, key); + tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc& e) { return OpStatus::OUT_OF_MEMORY; } @@ -507,7 +507,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v return OpStatus::WRONG_TYPE; // Update stats and trigger any handle the old value if needed. - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } if (new_key || overwrite) { @@ -555,7 +555,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v res = AddStrSet(std::move(vals), &co); } - db_slice.PostUpdate(op_args.db_ind, it, key, !new_key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); return res; } @@ -563,20 +563,20 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v OpResult OpRem(const OpArgs& op_args, std::string_view key, const ArgSlice& vals) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); - OpResult find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) { return find_res.status(); } - db_slice.PreUpdate(op_args.db_ind, *find_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *find_res); CompactObj& co = find_res.value()->second; auto [removed, isempty] = RemoveSet(vals, &co); - db_slice.PostUpdate(op_args.db_ind, *find_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *find_res, key); if (isempty) { - CHECK(db_slice.Del(op_args.db_ind, find_res.value())); + CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value())); } return removed; @@ -610,7 +610,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { for (auto k : largs) { unsigned index = (k == src_) ? 0 : 1; - OpResult res = es->db_slice().Find(t->db_index(), k, OBJ_SET); + OpResult res = es->db_slice().Find(t->db_context(), k, OBJ_SET); if (res && index == 0) { // successful src find. DCHECK(!res->is_done()); const CompactObj& val = res.value()->second; @@ -676,7 +676,7 @@ OpResult OpUnion(const OpArgs& op_args, ArgSlice keys) { absl::flat_hash_set uniques; for (std::string_view key : keys) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET); if (find_res) { container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){ uniques.emplace(ce.ToString()); @@ -698,7 +698,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!keys.empty()); DVLOG(1) << "OpDiff from " << keys.front(); EngineShard* es = op_args.shard; - OpResult find_res = es->db_slice().Find(op_args.db_ind, keys.front(), OBJ_SET); + OpResult find_res = es->db_slice().Find(op_args.db_cntx, keys.front(), OBJ_SET); if (!find_res) { return find_res.status(); @@ -713,7 +713,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!uniques.empty()); // otherwise the key would not exist. for (size_t i = 1; i < keys.size(); ++i) { - OpResult diff_res = es->db_slice().Find(op_args.db_ind, keys[i], OBJ_SET); + OpResult diff_res = es->db_slice().Find(op_args.db_cntx, keys[i], OBJ_SET); if (!diff_res) { if (diff_res.status() == OpStatus::WRONG_TYPE) { return OpStatus::WRONG_TYPE; @@ -750,7 +750,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f StringVec result; if (keys.size() == 1) { - OpResult find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET); + OpResult find_res = es->db_slice().Find(t->db_context(), keys.front(), OBJ_SET); if (!find_res) return find_res.status(); @@ -767,7 +767,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f OpStatus status = OpStatus::OK; for (size_t i = 0; i < keys.size(); ++i) { - OpResult find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET); + OpResult find_res = es->db_slice().Find(t->db_context(), keys[i], OBJ_SET); if (!find_res) { if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND || find_res.status() != OpStatus::KEY_NOTFOUND) { @@ -841,7 +841,7 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) { std::string_view val = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; @@ -906,7 +906,7 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); if (!find_res) { return find_res.status(); } @@ -1220,7 +1220,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) { OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned count) { auto& db_slice = op_args.shard->db_slice(); - OpResult find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); @@ -1241,10 +1241,10 @@ OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key }); /* Delete the set as it is now empty */ - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } else { SetType st{it->second.RObjPtr(), it->second.Encoding()}; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); if (st.second == kEncodingIntSet) { intset* is = (intset*)st.first; int64_t val = 0; @@ -1260,14 +1260,14 @@ OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key } else { result = PopStrSet(count, st); } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); } return result; } OpResult SetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 49c65ea6d..4202e78c0 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -145,7 +145,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -161,7 +161,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& if (it->second.ObjType() != OBJ_STREAM) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } stream* stream_inst = (stream*)it->second.RObjPtr(); @@ -201,7 +201,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -247,7 +247,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO OpResult OpLen(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); CompactObj& cobj = (*res_it)->second; @@ -255,9 +255,10 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { return s->length; } -OpResult> OpListGroups(DbIndex db_index, string_view key, EngineShard* shard) { +OpResult> OpListGroups(const DbContext& db_cntx, string_view key, + EngineShard* shard) { auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(db_index, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -294,7 +295,7 @@ struct CreateOpts { OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -323,7 +324,7 @@ OpResult> FindGroup(const OpArgs& op_args, string_view string_view gname) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -406,7 +407,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -445,7 +446,7 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { OpResult OpDel(const OpArgs& op_args, string_view key, absl::Span ids) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -719,7 +720,8 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { // We do not use transactional xemantics for xinfo since it's informational command. auto cb = [&]() { EngineShard* shard = EngineShard::tlocal(); - return OpListGroups(cntx->db_index(), key, shard); + DbContext db_context{.db_index = cntx->db_index(), .time_now_ms = GetCurrentTimeMs()}; + return OpListGroups(db_context, key, shard); }; OpResult> result = shard_set->Await(sid, std::move(cb)); @@ -744,9 +746,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(t->GetOpArgs(shard), key); - }; + auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result || result.status() == OpStatus::KEY_NOTFOUND) { diff --git a/src/server/string_family.cc b/src/server/string_family.cc index ca590d529..d322a645a 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -66,7 +66,7 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; op_args.shard->journal()->RecordEntry(entry); } } @@ -77,7 +77,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta size_t range_len = start + value.size(); if (range_len == 0) { - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (it_res) { return it_res.value()->second.Size(); } else { @@ -85,7 +85,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta } } - auto [it, added] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, added] = db_slice.AddOrFind(op_args.db_cntx, key); string s; @@ -99,12 +99,12 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta if (s.size() < range_len) s.resize(range_len); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); - db_slice.PostUpdate(op_args.db_ind, it, key, !added); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !added); RecordJournal(op_args, key, it->second); return it->second.Size(); @@ -112,7 +112,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -142,8 +142,8 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, - string_view val, bool prepend) { +size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val, + bool prepend) { string tmp, new_val; auto* shard = op_args.shard; string_view slice = GetSlice(shard, it->second, &tmp); @@ -153,9 +153,9 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, new_val = absl::StrCat(slice, val); auto& db_slice = shard->db_slice(); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, it, key, true); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); RecordJournal(op_args, key, it->second); return new_val.size(); @@ -166,10 +166,10 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi bool prepend) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); if (inserted) { it->second.SetString(val); - db_slice.PostUpdate(op_args.db_ind, it, key, false); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); RecordJournal(op_args, key, it->second); return val.size(); @@ -181,10 +181,9 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi return ExtendExisting(op_args, it, key, val, prepend); } -OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, - bool prepend) { +OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res) { return false; } @@ -193,7 +192,7 @@ OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view } OpResult OpGet(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -204,14 +203,14 @@ OpResult OpGet(const OpArgs& op_args, string_view key) { OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) { auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); char buf[128]; if (inserted) { char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it, key, false); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); RecordJournal(op_args, key, it->second); return val; @@ -241,9 +240,9 @@ OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it, key, true); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); RecordJournal(op_args, key, it->second); return base; @@ -255,7 +254,7 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, auto& db_slice = op_args.shard->db_slice(); // we avoid using AddOrFind because of skip_on_missing option for memcache. - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(it)) { if (skip_on_missing) @@ -266,7 +265,7 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, // AddNew calls PostUpdate inside. try { - it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); + it = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -293,9 +292,9 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, int64_t new_val = prev + incr; DCHECK(!it->second.IsExternal()); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetInt(new_val); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); RecordJournal(op_args, key, it->second); return new_val; @@ -318,7 +317,7 @@ int64_t CalculateAbsTime(int64_t unix_time, bool as_milli) { OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { DCHECK(!args.empty() && args.size() % 2 == 0); - SetCmd::SetParams params{op_args.db_ind}; + SetCmd::SetParams params; SetCmd sg(op_args); for (size_t i = 0; i < args.size(); i += 2) { @@ -332,19 +331,29 @@ OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { return OpStatus::OK; } +OpResult SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, + string_view key, string_view value) { + DCHECK(cntx->transaction); + + auto cb = [&](Transaction* t, EngineShard* shard) { + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); + }; + return cntx->transaction->ScheduleSingleHop(std::move(cb)); +} + } // namespace OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) { EngineShard* shard = op_args_.shard; auto& db_slice = shard->db_slice(); - DCHECK_LT(params.db_index, db_slice.db_array_size()); - DCHECK(db_slice.IsDbValid(params.db_index)); + DCHECK(db_slice.IsDbValid(op_args_.db_cntx.db_index)); VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") "; if (params.IsConditionalSet()) { - const auto [it, expire_it] = db_slice.FindExt(params.db_index, key); + const auto [it, expire_it] = db_slice.FindExt(op_args_.db_cntx, key); // Make sure that we have this key, and only add it if it does exists if (params.how == SET_IF_EXISTS) { if (IsValid(it)) { @@ -363,7 +372,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value // Trying to add a new entry. tuple add_res; try { - add_res = db_slice.AddOrFind2(params.db_index, key); + add_res = db_slice.AddOrFind2(op_args_.db_cntx, key); } catch (bad_alloc& e) { return OpStatus::OUT_OF_MEMORY; } @@ -377,20 +386,21 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value PrimeValue tvalue{value}; tvalue.SetFlag(params.memcache_flags != 0); it->second = std::move(tvalue); - db_slice.PostUpdate(params.db_index, it, key, false); + db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key, false); if (params.expire_after_ms) { - db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now()); + db_slice.UpdateExpire(op_args_.db_cntx.db_index, it, + params.expire_after_ms + op_args_.db_cntx.time_now_ms); } if (params.memcache_flags) - db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags); if (shard->tiered_storage()) { // external storage enabled. // TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid // afterwards. if (value.size() >= kMinTieredLen) { - shard->tiered_storage()->UnloadItem(params.db_index, it); + shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it); } } @@ -415,23 +425,24 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt } DbSlice& db_slice = shard->db_slice(); - uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0; + uint64_t at_ms = + params.expire_after_ms ? params.expire_after_ms + op_args_.db_cntx.time_now_ms : 0; if (IsValid(e_it) && at_ms) { e_it->second = db_slice.FromAbsoluteTime(at_ms); } else { // We need to update expiry, or maybe erase the object if it was expired. - bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms); + bool changed = db_slice.UpdateExpire(op_args_.db_cntx.db_index, it, at_ms); if (changed && at_ms == 0) // erased. return OpStatus::OK; // TODO: to update journal with deletion. } - db_slice.PreUpdate(params.db_index, it); + db_slice.PreUpdate(op_args_.db_cntx.db_index, it); // Check whether we need to update flags table. bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag(); if (req_flag_update) { prime_value.SetFlag(params.memcache_flags != 0); - db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags); } // overwrite existing entry. @@ -443,11 +454,11 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt // can be invalid after the function returns and the functions that follow may access invalid // entry. if (shard->tiered_storage()) { - shard->tiered_storage()->UnloadItem(params.db_index, it); + shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it); } } - db_slice.PostUpdate(params.db_index, it, key); + db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key); RecordJournal(op_args_, key, it->second); return OpStatus::OK; @@ -459,7 +470,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view value = ArgS(args, 2); - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.memcache_flags = cntx->conn_state.memcache_flag; int64_t int_arg; @@ -518,7 +529,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { } } - const auto result{SetGeneric(cntx, std::move(sparams), key, value)}; + const auto result{SetGeneric(cntx, sparams, key, value)}; + if (result == OpStatus::OK) { return builder->SendStored(); } @@ -532,17 +544,6 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { return builder->SendSetSkipped(); } -OpResult StringFamily::SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams, - std::string_view key, std::string_view value) { - DCHECK(cntx->transaction); - - auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(t->GetOpArgs(shard)); - return sg.Set(sparams, key, value); - }; - return cntx->transaction->ScheduleSingleHop(std::move(cb)); -} - void StringFamily::SetEx(CmdArgList args, ConnectionContext* cntx) { SetExGeneric(true, std::move(args), cntx); } @@ -555,7 +556,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view value = ArgS(args, 2); - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.how = SetCmd::SET_IF_NOTEXIST; sparams.memcache_flags = cntx->conn_state.memcache_flag; const auto results{SetGeneric(cntx, std::move(sparams), key, value)}; @@ -601,7 +602,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { string_view value = ArgS(args, 2); std::optional prev_val; - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.prev_val = &prev_val; auto cb = [&](Transaction* t, EngineShard* shard) { @@ -771,7 +772,7 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext return (*cntx)->SendError(InvalidExpireTime(ArgS(args, 0))); } - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; if (seconds) sparams.expire_after_ms = uint64_t(unit_vals) * 1000; else @@ -876,7 +877,7 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* es) { auto args = t->ShardArgsInShard(es->shard_id()); for (size_t i = 0; i < args.size(); i += 2) { - auto it = es->db_slice().FindExt(t->db_index(), args[i]).first; + auto it = es->db_slice().FindExt(t->db_context(), args[i]).first; if (IsValid(it)) { exists.store(true, memory_order_relaxed); break; @@ -906,7 +907,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING); + OpResult it_res = shard->db_slice().Find(t->db_context(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -993,7 +994,7 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction auto& db_slice = shard->db_slice(); for (size_t i = 0; i < args.size(); ++i) { - OpResult it_res = db_slice.Find(t->db_index(), args[i], OBJ_STRING); + OpResult it_res = db_slice.Find(t->db_context(), args[i], OBJ_STRING); if (!it_res) continue; diff --git a/src/server/string_family.h b/src/server/string_family.h index bc0d80f19..a2031e3c6 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -26,7 +26,6 @@ class SetCmd { struct SetParams { SetHow how = SET_ALWAYS; - DbIndex db_index = 0; uint32_t memcache_flags = 0; // Relative value based on now. 0 means no expiration. @@ -34,9 +33,6 @@ class SetCmd { mutable std::optional* prev_val = nullptr; // GETSET option bool keep_expire = false; // KEEPTTL - TODO: to implement it. - explicit SetParams(DbIndex dib) : db_index(dib) { - } - constexpr bool IsConditionalSet() const { return how == SET_IF_NOTEXIST || how == SET_IF_EXISTS; } @@ -82,8 +78,6 @@ class StringFamily { static void IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx); static void ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx); static void SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx); - static OpResult SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams, - std::string_view key, std::string_view value); struct GetResp { std::string value; diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index c9c78dcb6..142552892 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -70,6 +70,8 @@ TEST_F(StringFamilyTest, Incr) { TEST_F(StringFamilyTest, Append) { Run({"setex", "key", "100", "val"}); + EXPECT_THAT(Run({"ttl", "key"}), IntArg(100)); + EXPECT_THAT(Run({"append", "key", "bar"}), IntArg(6)); EXPECT_THAT(Run({"ttl", "key"}), IntArg(100)); } @@ -77,17 +79,17 @@ TEST_F(StringFamilyTest, Append) { TEST_F(StringFamilyTest, Expire) { ASSERT_EQ(Run({"set", "key", "val", "PX", "20"}), "OK"); - UpdateTime(expire_now_ + 10); + AdvanceTime(10); EXPECT_EQ(Run({"get", "key"}), "val"); - UpdateTime(expire_now_ + 20); + AdvanceTime(10); EXPECT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL)); ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), "OK"); ASSERT_THAT(Run({"incr", "i"}), IntArg(2)); - UpdateTime(expire_now_ + 30); + AdvanceTime(10); ASSERT_THAT(Run({"incr", "i"}), IntArg(1)); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index ce9acc19f..a79a22d62 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -131,10 +131,9 @@ void BaseFamilyTest::SetUp() { opts.disable_time_update = true; service_->Init(nullptr, nullptr, opts); - expire_now_ = absl::GetCurrentTimeNanos() / 1000000; + TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; auto cb = [&](EngineShard* s) { - s->db_slice().UpdateExpireBase(expire_now_ - 1000, 0); - s->db_slice().UpdateExpireClock(expire_now_); + s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); }; shard_set->RunBriefInParallel(cb); @@ -151,12 +150,6 @@ void BaseFamilyTest::TearDown() { LOG(INFO) << "Finishing " << test_info->name(); } -// ts is ms -void BaseFamilyTest::UpdateTime(uint64_t ms) { - auto cb = [ms](EngineShard* s) { s->db_slice().UpdateExpireClock(ms); }; - shard_set->RunBriefInParallel(cb); -} - void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) { auto step = 50us; auto timeout_micro = chrono::duration_cast (1000ms * timeout); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 7ac06084f..c8b93791e 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -68,8 +68,9 @@ class BaseFamilyTest : public ::testing::Test { TestConnWrapper* AddFindConn(Protocol proto, std::string_view id); static std::vector StrArray(const RespExpr& expr); - // ts is ms - void UpdateTime(uint64_t ms); + void AdvanceTime(int64_t ms) { + TEST_current_time_ms += ms; + } // Wait for a locked key to unlock. Aborts after timeout seconds passed. void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3); @@ -88,7 +89,7 @@ class BaseFamilyTest : public ::testing::Test { absl::flat_hash_map> connections_; ::boost::fibers::mutex mu_; ConnectionContext::DebugInfo last_cmd_dbg_info_; - uint64_t expire_now_; + std::vector resp_vec_; bool single_response_ = true; }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1d81af00b..f0219be8f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -24,7 +24,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space; namespace { -std::atomic_uint64_t op_seq{1}; +atomic_uint64_t op_seq{1}; [[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction); @@ -449,10 +449,12 @@ void Transaction::ScheduleInternal() { } while (true) { - txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + txid_ = op_seq.fetch_add(1, memory_order_relaxed); - std::atomic_uint32_t lock_granted_cnt{0}; - std::atomic_uint32_t success{0}; + atomic_uint32_t lock_granted_cnt{0}; + atomic_uint32_t success{0}; + + time_now_ms_ = GetCurrentTimeMs(); auto cb = [&](EngineShard* shard) { pair res = ScheduleInShard(shard); @@ -551,6 +553,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // above. // IsArmedInShard() first checks run_count_ before accessing shard_data. run_count_.fetch_add(1, memory_order_release); + time_now_ms_ = GetCurrentTimeMs(); // Please note that schedule_cb can not update any data on ScheduleSingleHop stack // since the latter can exit before ScheduleUniqueShard returns. @@ -805,7 +808,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { } // we can do it because only a single thread writes into txid_ and sd. - txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + txid_ = op_seq.fetch_add(1, memory_order_relaxed); sd.pq_pos = shard->txq()->Insert(this); DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); @@ -1111,7 +1114,7 @@ inline uint32_t Transaction::DecreaseRunCnt() { ::boost::intrusive_ptr guard(this); // We use release so that no stores will be reordered after. - uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); + uint32_t res = run_count_.fetch_sub(1, memory_order_release); if (res == 1) { run_ec_.notify(); } diff --git a/src/server/transaction.h b/src/server/transaction.h index d222d874f..dbca99c1f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -135,10 +135,6 @@ class Transaction { const char* Name() const; - DbIndex db_index() const { - return db_index_; // TODO: support multiple db indexes. - } - uint32_t unique_shard_cnt() const { return unique_shard_cnt_; } @@ -180,7 +176,15 @@ class Transaction { KeyLockArgs GetLockArgs(ShardId sid) const; OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, txid_, db_index_}; + return OpArgs{shard, txid_, db_context()}; + } + + DbContext db_context() const { + return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_}; + } + + DbIndex db_index() const { + return db_index_; } private: @@ -287,15 +291,15 @@ class Transaction { const CommandId* cid_; TxId txid_{0}; + uint64_t time_now_ms_{0}; std::atomic notify_txid_{kuint64max}; - std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; // unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ ShardId unique_shard_id_{kInvalidSid}; - DbIndex db_index_ = 0; + DbIndex db_index_; // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. OpStatus local_result_ = OpStatus::OK; diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 4a4c1235f..dd5e02182 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -87,13 +87,13 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args size_t member_len) { auto& db_slice = op_args.shard->db_slice(); if (zparams.flags & ZADD_IN_XX) { - return db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + return db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); } pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -110,13 +110,13 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args DVLOG(2) << "Created zset " << zobj->ptr; if (!add_res.second) { - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } it->second.ImportRObj(zobj); } else { if (it->second.ObjType() != OBJ_ZSET) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } return it; @@ -615,7 +615,7 @@ OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest return OpStatus::OK; // return empty map for (unsigned j = start; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); if (!it_res) @@ -661,7 +661,7 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest return OpStatus::SKIPPED; // return noop for (unsigned j = start; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); @@ -710,8 +710,8 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ auto& db_slice = op_args.shard->db_slice(); if (zparams.override && members.empty()) { - auto it = db_slice.FindExt(op_args.db_ind, key).first; - db_slice.Del(op_args.db_ind, it); + auto it = db_slice.FindExt(op_args.db_cntx, key).first; + db_slice.Del(op_args.db_cntx.db_index, it); return OpStatus::OK; } @@ -763,7 +763,7 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ DVLOG(2) << "ZAdd " << zobj->ptr; res_it.value()->second.SyncRObj(); - op_args.shard->db_slice().PostUpdate(op_args.db_ind, *res_it, key); + op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zparams.flags & ZADD_IN_INCR) { aresult.new_score = new_score; @@ -934,7 +934,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_ZSET); if (!find_res) { return find_res.status(); } @@ -1505,7 +1505,7 @@ bool ZSetFamily::ParseRangeByScoreParams(CmdArgList args, RangeParams* params) { OpResult ZSetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!find_res) return find_res.status(); @@ -1564,11 +1564,11 @@ OpResult ZSetFamily::OpScan(const OpArgs& op_args, std::string_view k OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); - db_slice.PreUpdate(op_args.db_ind, *res_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); robj* zobj = res_it.value()->second.AsRObj(); sds& tmp_str = op_args.shard->tmp_str1; unsigned deleted = 0; @@ -1578,17 +1578,17 @@ OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg } auto zlen = zsetLength(zobj); res_it.value()->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *res_it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value())); + CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } return deleted; } OpResult ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1605,7 +1605,7 @@ OpResult ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1620,11 +1620,11 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key, const ZRangeSpec& range_spec) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); - db_slice.PreUpdate(op_args.db_ind, *res_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); robj* zobj = res_it.value()->second.AsRObj(); @@ -1632,11 +1632,11 @@ OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key std::visit(iv, range_spec.interval); res_it.value()->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *res_it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); auto zlen = zsetLength(zobj); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value())); + CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } return iv.removed(); @@ -1644,7 +1644,7 @@ OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key OpResult ZSetFamily::OpRank(const OpArgs& op_args, string_view key, string_view member, bool reverse) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1659,7 +1659,7 @@ OpResult ZSetFamily::OpRank(const OpArgs& op_args, string_view key, st OpResult ZSetFamily::OpCount(const OpArgs& op_args, std::string_view key, const ScoreInterval& interval) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1730,7 +1730,7 @@ OpResult ZSetFamily::OpCount(const OpArgs& op_args, std::string_view k OpResult ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key, const ZSetFamily::LexInterval& interval) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status();