From 0925829afb7f5f5d9e621f5917dbd12494ea3045 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 25 Sep 2022 16:18:59 +0300 Subject: [PATCH] feat(server): Introduce transaction clock. Partially implements #6. Before, each shard lazily updated its clock used for the expiry evaluation. Now, the clock value is set during the transaction scheduling phase and is assigned to each transaction. From now on DbSlice methods use this value when checking whether the entry is expired via passed DbContext argument. Also, implemented transactionally consistent TIME command and verify that time is the same during the transaction. See https://ably.com/blog/redis-keys-do-not-expire-atomically for motivation. Still have not implemented any lamport style updates for background processes (not sure if it's the right way to proceed). --- src/server/CMakeLists.txt | 2 +- src/server/bitops_family.cc | 20 +++-- src/server/blocking_controller.cc | 6 +- src/server/common.h | 11 ++- src/server/db_slice.cc | 79 ++++++++++---------- src/server/db_slice.h | 43 +++++------ src/server/debugcmd.cc | 7 +- src/server/dragonfly_test.cc | 8 +- src/server/engine_shard_set.cc | 70 +++++++++--------- src/server/engine_shard_set.h | 11 ++- src/server/generic_family.cc | 94 ++++++++++++++---------- src/server/generic_family.h | 1 + src/server/generic_family_test.cc | 34 +++++++-- src/server/hset_family.cc | 42 +++++------ src/server/json_family.cc | 15 ++-- src/server/list_family.cc | 80 ++++++++++---------- src/server/list_family_test.cc | 3 +- src/server/rdb_load.cc | 7 +- src/server/set_family.cc | 44 +++++------ src/server/stream_family.cc | 28 +++---- src/server/string_family.cc | 117 +++++++++++++++--------------- src/server/string_family.h | 6 -- src/server/string_family_test.cc | 8 +- src/server/test_utils.cc | 11 +-- src/server/test_utils.h | 7 +- src/server/transaction.cc | 15 ++-- src/server/transaction.h | 18 +++-- src/server/zset_family.cc | 48 ++++++------ 28 files changed, 448 insertions(+), 387 deletions(-) diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index a5b5cb8ab..584fed2a0 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -39,4 +39,4 @@ cxx_test(json_family_test dfly_test_lib LABELS DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test json_family_test list_family_test generic_family_test memcache_parser_test rdb_test - redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test) + redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test) diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 29d3086c9..eb3eb1d2b 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -184,25 +184,23 @@ bool SetBitValue(uint32_t offset, bool bit_value, std::string* entry) { // Helper functions to access the data or change it class OverrideValue { - EngineShard* shard_ = nullptr; - DbIndex index_ = 0; + const OpArgs& args_; public: - explicit OverrideValue(const OpArgs& args) : shard_{args.shard}, index_{args.db_ind} { + explicit OverrideValue(const OpArgs& args) : args_{args} { } OpResult Set(std::string_view key, uint32_t offset, bool bit_value); }; OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bit_value) { - auto& db_slice = shard_->db_slice(); - DbIndex index = index_; + auto& db_slice = args_.shard->db_slice(); - DCHECK(db_slice.IsDbValid(index_)); + DCHECK(db_slice.IsDbValid(args_.db_cntx.db_index)); std::pair add_res; try { - add_res = db_slice.AddOrFind(index_, key); + add_res = db_slice.AddOrFind(args_.db_cntx, key); } catch (const std::bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -210,9 +208,9 @@ OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bi PrimeIterator& it = add_res.first; bool added = add_res.second; auto UpdateBitMapValue = [&](std::string_view value) { - db_slice.PreUpdate(index, it); + db_slice.PreUpdate(args_.db_cntx.db_index, it); it->second.SetString(value); - db_slice.PostUpdate(index, it, key, !added); + db_slice.PostUpdate(args_.db_cntx.db_index, it, key, !added); }; if (added) { // this is a new entry in the "table" @@ -224,7 +222,7 @@ OpResult OverrideValue::Set(std::string_view key, uint32_t offset, bool bi return OpStatus::WRONG_TYPE; } bool reset = false; - std::string existing_entry{GetString(shard_, it->second)}; + std::string existing_entry{GetString(args_.shard, it->second)}; if ((existing_entry.size() * OFFSET_FACTOR) <= offset) { // need to resize first existing_entry.resize(GetByteIndex(offset) + 1, 0); reset = true; @@ -398,7 +396,7 @@ OpResult ReadValueBitsetAt(const OpArgs& op_args, std::string_view key, ui } OpResult ReadValue(const OpArgs& op_args, std::string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) { return it_res.status(); } diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 700d6699c..eb1f234f7 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -106,18 +106,22 @@ void BlockingController::RunStep(Transaction* completed_t) { } } + DbContext context; + context.time_now_ms = GetCurrentTimeMs(); + for (DbIndex index : awakened_indices_) { auto dbit = watched_dbs_.find(index); if (dbit == watched_dbs_.end()) continue; + context.db_index = index; DbWatchTable& wt = *dbit->second; for (auto key : wt.awakened_keys) { string_view sv_key = static_cast(key); DVLOG(1) << "Processing awakened key " << sv_key; // Double verify we still got the item. - auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key); + auto [it, exp_it] = owner_->db_slice().FindExt(context, sv_key); if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. continue; diff --git a/src/server/common.h b/src/server/common.h index 7be5b7eaa..8c8522d6b 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -69,15 +69,20 @@ struct KeyIndex { } }; +struct DbContext { + DbIndex db_index = 0; + uint64_t time_now_ms = 0; +}; + struct OpArgs { EngineShard* shard; TxId txid; - DbIndex db_ind; + DbContext db_cntx; - OpArgs() : shard(nullptr), txid(0), db_ind(0) { + OpArgs() : shard(nullptr), txid(0) { } - OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) { + OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) { } }; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index f045df5f8..f73d242b4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -61,9 +61,9 @@ class PrimeEvictionPolicy { static constexpr bool can_evict = true; // we implement eviction functionality. static constexpr bool can_gc = true; - PrimeEvictionPolicy(DbIndex db_indx, bool can_evict, ssize_t mem_budget, ssize_t soft_limit, - DbSlice* db_slice) - : db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), db_indx_(db_indx), + PrimeEvictionPolicy(const DbContext& cntx, bool can_evict, ssize_t mem_budget, + ssize_t soft_limit, DbSlice* db_slice) + : db_slice_(db_slice), mem_budget_(mem_budget), soft_limit_(soft_limit), cntx_(cntx), can_evict_(can_evict) { } @@ -94,9 +94,10 @@ class PrimeEvictionPolicy { DbSlice* db_slice_; ssize_t mem_budget_; ssize_t soft_limit_ = 0; + const DbContext cntx_; + unsigned evicted_ = 0; unsigned checked_ = 0; - const DbIndex db_indx_; // unlike static constexpr can_evict, this parameter tells whether we can evict // items in runtime. @@ -138,7 +139,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e for (; !bucket_it.is_done(); ++bucket_it) { if (bucket_it->second.HasExpire()) { ++checked_; - auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(db_indx_, bucket_it); + auto [prime_it, exp_it] = db_slice_->ExpireIfNeeded(cntx_, bucket_it); if (prime_it.is_done()) ++res; } @@ -164,7 +165,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT return 0; } - DbTable* table = db_slice_->GetDBTable(db_indx_); + DbTable* table = db_slice_->GetDBTable(cntx_.db_index); EvictItemFun(last_slot_it, table); ++evicted_; } @@ -256,9 +257,9 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { db->prime.Reserve(key_size); } -auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const +auto DbSlice::Find(const Context& cntx, string_view key, unsigned req_obj_type) const -> OpResult { - auto it = FindExt(db_index, key).first; + auto it = FindExt(cntx, key).first; if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; @@ -270,13 +271,13 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con return it; } -pair DbSlice::FindExt(DbIndex db_ind, string_view key) const { +pair DbSlice::FindExt(const Context& cntx, string_view key) const { pair res; - if (!IsDbValid(db_ind)) + if (!IsDbValid(cntx.db_index)) return res; - auto& db = *db_arr_[db_ind]; + auto& db = *db_arr_[cntx.db_index]; res.first = db.prime.Find(key); if (!IsValid(res.first)) { @@ -284,14 +285,14 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view } if (res.first->second.HasExpire()) { // check expiry state - res = ExpireIfNeeded(db_ind, res.first); + res = ExpireIfNeeded(cntx, res.first); } if (caching_mode_ && IsValid(res.first)) { if (!change_cb_.empty()) { auto bump_cb = [&](PrimeTable::bucket_iterator bit) { for (const auto& ccb : change_cb_) { - ccb.second(db_ind, bit); + ccb.second(cntx.db_index, bit); } }; @@ -306,12 +307,12 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view return res; } -OpResult> DbSlice::FindFirst(DbIndex db_index, ArgSlice args) { +OpResult> DbSlice::FindFirst(const Context& cntx, ArgSlice args) { DCHECK(!args.empty()); for (unsigned i = 0; i < args.size(); ++i) { string_view s = args[i]; - OpResult res = Find(db_index, s, OBJ_LIST); + OpResult res = Find(cntx, s, OBJ_LIST); if (res) return make_pair(res.value(), i); if (res.status() != OpStatus::KEY_NOTFOUND) @@ -322,20 +323,20 @@ OpResult> DbSlice::FindFirst(DbIndex db_index, Arg return OpStatus::KEY_NOTFOUND; } -pair DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false) { - auto res = AddOrFind2(db_index, key); +pair DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) { + auto res = AddOrFind2(cntx, key); return make_pair(get<0>(res), get<2>(res)); } -tuple DbSlice::AddOrFind2(DbIndex db_index, +tuple DbSlice::AddOrFind2(const Context& cntx, string_view key) noexcept(false) { - DCHECK(IsDbValid(db_index)); + DCHECK(IsDbValid(cntx.db_index)); - DbTable& db = *db_arr_[db_index]; + DbTable& db = *db_arr_[cntx.db_index]; // If we have some registered onchange callbacks, we must know in advance whether its Find or Add. if (!change_cb_.empty()) { - auto res = FindExt(db_index, key); + auto res = FindExt(cntx, key); if (IsValid(res.first)) { return tuple_cat(res, make_tuple(true)); @@ -343,11 +344,11 @@ tuple DbSlice::AddOrFind2(DbIndex db_index, // It's a new entry. for (const auto& ccb : change_cb_) { - ccb.second(db_index, key); + ccb.second(cntx.db_index, key); } } - PrimeEvictionPolicy evp{db_index, bool(caching_mode_), int64_t(memory_budget_ - key.size()), + PrimeEvictionPolicy evp{cntx, bool(caching_mode_), int64_t(memory_budget_ - key.size()), ssize_t(soft_budget_limit_), this}; // If we are over limit in non-cache scenario, just be conservative and throw. @@ -405,7 +406,7 @@ tuple DbSlice::AddOrFind2(DbIndex db_index, // TODO: to implement the incremental update of expiry values using multi-generation // expire_base_ update. Right now we use only index 0. - uint64_t delta_ms = now_ms_ - expire_base_[0]; + uint64_t delta_ms = cntx.time_now_ms - expire_base_[0]; if (expire_it->second.duration_ms() <= delta_ms) { db.expire.Erase(expire_it); @@ -535,28 +536,27 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const { return it.is_done() ? 0 : it->second; } -PrimeIterator DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj, +PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - auto [it, added] = AddEntry(db_ind, key, std::move(obj), expire_at_ms); + auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms); CHECK(added); return it; } -pair DbSlice::AddEntry(DbIndex db_ind, string_view key, PrimeValue obj, +pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - DCHECK_LT(db_ind, db_arr_.size()); DCHECK(!obj.IsRef()); - pair res = AddOrFind(db_ind, key); + pair res = AddOrFind(cntx, key); if (!res.second) // have not inserted. return res; - auto& db = *db_arr_[db_ind]; + auto& db = *db_arr_[cntx.db_index]; auto& it = res.first; it->second = std::move(obj); - PostUpdate(db_ind, it, key, false); + PostUpdate(cntx.db_index, it, key, false); if (expire_at_ms) { it->second.SetExpire(true); @@ -685,10 +685,10 @@ void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, } } -pair DbSlice::ExpireIfNeeded(DbIndex db_ind, +pair DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterator it) const { DCHECK(it->second.HasExpire()); - auto& db = db_arr_[db_ind]; + auto& db = db_arr_[cntx.db_index]; auto expire_it = db->expire.Find(it->first); @@ -697,7 +697,7 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, // TODO: to employ multi-generation update of expire-base and the underlying values. time_t expire_time = ExpireTime(expire_it); - if (now_ms_ < expire_time) + if (time_t(cntx.time_now_ms) < expire_time) return make_pair(it, expire_it); db->expire.Erase(expire_it); @@ -725,17 +725,17 @@ void DbSlice::UnregisterOnChange(uint64_t id) { LOG(DFATAL) << "Could not find " << id << " to unregister"; } -auto DbSlice::DeleteExpiredStep(DbIndex db_ind, unsigned count) -> DeleteExpiredStats { - auto& db = *db_arr_[db_ind]; +auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats { + auto& db = *db_arr_[cntx.db_index]; DeleteExpiredStats result; auto cb = [&](ExpireIterator it) { result.traversed++; - time_t ttl = ExpireTime(it) - Now(); + time_t ttl = ExpireTime(it) - cntx.time_now_ms; if (ttl <= 0) { auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); - ExpireIfNeeded(db_ind, prime_it); + ExpireIfNeeded(cntx, prime_it); ++result.deleted; } else { result.survivor_ttl_sum += ttl; @@ -853,7 +853,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t return freed_memory_fun(); }; -void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info) { +void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key, + ConnectionState::ExecInfo* exec_info) { db_arr_[db_indx]->watched_keys[key].push_back(exec_info); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e253b6bdb..0efba681d 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -8,8 +8,8 @@ #include "facade/op_status.h" #include "server/common.h" -#include "server/table.h" #include "server/conn_context.h" +#include "server/table.h" namespace util { class ProactorBase; @@ -65,6 +65,8 @@ class DbSlice { size_t small_string_bytes = 0; }; + using Context = DbContext; + // ChangeReq - describes the change to the table. struct ChangeReq { // If iterator is set then it's an update to the existing bucket. @@ -90,12 +92,6 @@ class DbSlice { // Returns statistics for the whole db slice. A bit heavy operation. Stats GetStats() const; - //! UpdateExpireClock updates the expire clock for this db slice. - //! Must be a wall clock so we could replicate it between machines. - void UpdateExpireClock(uint64_t now_ms) { - now_ms_ = now_ms; - } - void UpdateExpireBase(uint64_t now, unsigned generation) { expire_base_[generation & 1] = now; } @@ -124,37 +120,34 @@ class DbSlice { return ExpirePeriod{time_ms - expire_base_[0]}; } - // returns wall clock in millis as it has been set via UpdateExpireClock. - time_t Now() const { - return now_ms_; - } - - OpResult Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const; + OpResult Find(const Context& cntx, std::string_view key, + unsigned req_obj_type) const; // Returns (value, expire) dict entries if key exists, null if it does not exist or has expired. - std::pair FindExt(DbIndex db_ind, std::string_view key) const; + std::pair FindExt(const Context& cntx, std::string_view key) const; // Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise. // If multiple keys are found, returns the first index in the ArgSlice. - OpResult> FindFirst(DbIndex db_index, ArgSlice args); + OpResult> FindFirst(const Context& cntx, ArgSlice args); // Return .second=true if insertion ocurred, false if we return the existing key. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddOrFind(DbIndex db_index, std::string_view key) noexcept(false); + std::pair AddOrFind(const Context& cntx, + std::string_view key) noexcept(false); - std::tuple AddOrFind2(DbIndex db_index, + std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddEntry(DbIndex db_ind, std::string_view key, PrimeValue obj, + std::pair AddEntry(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. // throws: bad_alloc is insertion could not happen due to out of memory. - PrimeIterator AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, + PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Either adds or removes (if at == 0) expiry. Returns true if a change was made. @@ -218,7 +211,8 @@ class DbSlice { // Callback functions called upon writing to the existing key. void PreUpdate(DbIndex db_ind, PrimeIterator it); - void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, bool existing_entry = true); + void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, + bool existing_entry = true); DbTableStats* MutableStats(DbIndex db_ind) { return &db_arr_[db_ind]->stats; @@ -226,7 +220,8 @@ class DbSlice { // Check whether 'it' has not expired. Returns it if it's still valid. Otherwise, erases it // from both tables and return PrimeIterator{}. - std::pair ExpireIfNeeded(DbIndex db_ind, PrimeIterator it) const; + std::pair ExpireIfNeeded(const Context& cntx, + PrimeIterator it) const; // Current version of this slice. // We maintain a shared versioning scheme for all databases in the slice. @@ -251,7 +246,7 @@ class DbSlice { }; // Deletes some amount of possible expired items. - DeleteExpiredStats DeleteExpiredStep(DbIndex db_indx, unsigned count); + DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count); void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes); const DbTableArray& databases() const { @@ -262,7 +257,8 @@ class DbSlice { caching_mode_ = 1; } - void RegisterWatchedKey(DbIndex db_indx, std::string_view key, ConnectionState::ExecInfo* exec_info); + void RegisterWatchedKey(DbIndex db_indx, std::string_view key, + ConnectionState::ExecInfo* exec_info); // Unregisted all watched key entries for connection. void UnregisterConnectionWatches(ConnectionState::ExecInfo* exec_info); @@ -284,7 +280,6 @@ class DbSlice { EngineShard* owner_; - time_t now_ms_ = 0; // Used for expire logic, represents a real clock. time_t expire_base_[2]; // Used for expire logic, represents a real clock. uint64_t version_ = 1; // Used to version entries in the PrimeTable. diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 41599f74e..24a0be2c7 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -60,7 +60,8 @@ struct ObjInfo { void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, const PopulateBatch& batch) { - OpArgs op_args(EngineShard::tlocal(), 0, params.db_index); + DbContext db_cntx{batch.dbid, 0}; + OpArgs op_args(EngineShard::tlocal(), 0, db_cntx); SetCmd sg(op_args); for (unsigned i = 0; i < batch.sz; ++i) { @@ -277,7 +278,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view DbIndex db_indx = cntx_->db_index(); EngineShardSet& ess = *shard_set; std::vector ps(ess.size(), PopulateBatch{db_indx}); - SetCmd::SetParams params{db_indx}; + SetCmd::SetParams params; for (uint64_t i = from; i < from + len; ++i) { StrAppend(&key, i); @@ -324,7 +325,7 @@ void DebugCmd::Inspect(string_view key) { CHECK(!exp_it.is_done()); time_t exp_time = db_slice.ExpireTime(exp_it); - oinfo.ttl = exp_time - db_slice.Now(); + oinfo.ttl = exp_time - GetCurrentTimeMs(); oinfo.has_sec_precision = exp_it->second.is_second_precision(); } } diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 8a1ef874c..8fbbade05 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -612,7 +612,9 @@ TEST_F(DflyEngineTest, Watch) { // Check EXEC doesn't miss watched key expiration. Run({"watch", "a"}); Run({"expire", "a", "1"}); - UpdateTime(expire_now_ + 1000); + + AdvanceTime(1000); + Run({"multi"}); ASSERT_THAT(Run({"exec"}), kExecFail); @@ -637,7 +639,9 @@ TEST_F(DflyEngineTest, Watch) { Run({"watch", "a"}); Run({"set", "c", "1"}); Run({"expire", "a", "1"}); // a existed - UpdateTime(expire_now_ + 1000); + + AdvanceTime(1000); + Run({"multi"}); ASSERT_THAT(Run({"exec"}), kExecFail); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 9227cbb46..2e96b5001 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -22,10 +22,9 @@ using namespace std; ABSL_FLAG(string, backing_prefix, "", ""); -ABSL_FLAG(uint32_t, hz, 1000, - "Base frequency at which the server updates its expiry clock " - "and performs other background tasks. Warning: not advised to decrease in production, " - "because it can affect expiry precision for PSETEX etc."); +ABSL_FLAG(uint32_t, hz, 100, + "Base frequency at which the server performs other background tasks. " + "Warning: not advised to decrease in production."); ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " @@ -48,6 +47,7 @@ constexpr size_t kQueueLen = 64; thread_local EngineShard* EngineShard::shard_ = nullptr; EngineShardSet* shard_set = nullptr; +uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { ooo_runs += o.ooo_runs; @@ -284,43 +284,41 @@ bool EngineShard::HasResultConverged(TxId notifyid) const { #endif void EngineShard::Heartbeat() { - // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. - db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); + CacheStats(); + constexpr double kTtlDeleteLimit = 200; + constexpr double kRedLimitFactor = 0.1; - if (task_iters_++ % 8 == 0) { - CacheStats(); - constexpr double kTtlDeleteLimit = 200; - constexpr double kRedLimitFactor = 0.1; + uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); + uint32_t deleted = GetMovingSum6(TTL_DELETE); + unsigned ttl_delete_target = 5; - uint32_t traversed = GetMovingSum6(TTL_TRAVERSE); - uint32_t deleted = GetMovingSum6(TTL_DELETE); - unsigned ttl_delete_target = 5; + if (deleted > 10) { + // deleted should be <= traversed. + // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). + // The higher t + ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + } - if (deleted > 10) { - // deleted should be <= traversed. - // hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit). - // The higher t - ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10); + ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size(); + DbContext db_cntx; + db_cntx.time_now_ms = GetCurrentTimeMs(); + + for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) { + if (!db_slice_.IsDbValid(i)) + continue; + + db_cntx.db_index = i; + auto [pt, expt] = db_slice_.GetTables(i); + if (expt->size() > pt->size() / 4) { + DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(db_cntx, ttl_delete_target); + + counter_[TTL_TRAVERSE].IncBy(stats.traversed); + counter_[TTL_DELETE].IncBy(stats.deleted); } - ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size(); - - for (unsigned i = 0; i < db_slice_.db_array_size(); ++i) { - if (!db_slice_.IsDbValid(i)) - continue; - - auto [pt, expt] = db_slice_.GetTables(i); - if (expt->size() > pt->size() / 4) { - DbSlice::DeleteExpiredStats stats = db_slice_.DeleteExpiredStep(i, ttl_delete_target); - - counter_[TTL_TRAVERSE].IncBy(stats.traversed); - counter_[TTL_DELETE].IncBy(stats.deleted); - } - - // if our budget is below the limit - if (db_slice_.memory_budget() < redline) { - db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget()); - } + // if our budget is below the limit + if (db_slice_.memory_budget() < redline) { + db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget()); } } } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 200b58be2..3587371e1 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -176,7 +176,6 @@ class EngineShard { IntentLock shard_lock_; uint32_t periodic_task_ = 0; - uint64_t task_iters_ = 0; std::unique_ptr tiered_storage_; std::unique_ptr blocking_controller_; @@ -287,6 +286,16 @@ inline ShardId Shard(std::string_view v, ShardId shard_num) { } +// absl::GetCurrentTimeNanos is twice faster than clock_gettime(CLOCK_REALTIME) on my laptop +// and 4 times faster than on a VM. it takes 5-10ns to do a call. + +extern uint64_t TEST_current_time_ms; + +inline uint64_t GetCurrentTimeMs() { + return TEST_current_time_ms ? TEST_current_time_ms : absl::GetCurrentTimeNanos() / 1000000; +} + + extern EngineShardSet* shard_set; } // namespace dfly diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 6343d071f..4a649a755 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -31,7 +31,7 @@ namespace { class Renamer { public: - Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { + Renamer(ShardId source_id) : src_sid_(source_id) { } void Find(Transaction* t); @@ -46,7 +46,6 @@ class Renamer { OpStatus MoveSrc(Transaction* t, EngineShard* es); OpStatus UpdateDest(Transaction* t, EngineShard* es); - DbIndex db_indx_; ShardId src_sid_; struct FindResult { @@ -73,7 +72,7 @@ void Renamer::Find(Transaction* t) { res->key = args.front(); auto& db_slice = EngineShard::tlocal()->db_slice(); - auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key); + auto [it, exp_it] = db_slice.FindExt(t->db_context(), res->key); res->found = IsValid(it); if (IsValid(it)) { @@ -116,7 +115,7 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { if (es->shard_id() == src_sid_) { // Handle source key. // TODO: to call PreUpdate/PostUpdate. - auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first; + auto it = es->db_slice().FindExt(t->db_context(), src_res_.key).first; CHECK(IsValid(it)); // We distinguish because of the SmallString that is pinned to its thread by design, @@ -129,7 +128,7 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { pv_ = std::move(it->second); it->second.SetExpire(has_expire); } - CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it. + CHECK(es->db_slice().Del(t->db_index(), it)); // delete the entry with empty value in it. } return OpStatus::OK; @@ -139,7 +138,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { if (es->shard_id() != src_sid_) { auto& db_slice = es->db_slice(); string_view dest_key = dest_res_.key; - PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; + PrimeIterator dest_it = db_slice.FindExt(t->db_context(), dest_key).first; bool is_prior_list = false; if (IsValid(dest_it)) { @@ -152,18 +151,18 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { dest_it->second = std::move(pv_); } dest_it->second.SetExpire(has_expire); // preserve expire flag. - db_slice.UpdateExpire(db_indx_, dest_it, src_res_.expire_ts); + db_slice.UpdateExpire(t->db_index(), dest_it, src_res_.expire_ts); } else { if (src_res_.ref_val.ObjType() == OBJ_STRING) { pv_.SetString(str_val_); } - dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts); + dest_it = db_slice.AddNew(t->db_context(), dest_key, std::move(pv_), src_res_.expire_ts); } dest_it->first.SetSticky(src_res_.sticky); if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { - es->blocking_controller()->AwakeWatched(db_indx_, dest_key); + es->blocking_controller()->AwakeWatched(t->db_index(), dest_key); } } @@ -181,7 +180,7 @@ struct ScanOpts { bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) { auto& db_slice = op_args.shard->db_slice(); if (it->second.HasExpire()) { - it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first; + it = db_slice.ExpireIfNeeded(op_args.db_cntx, it).first; } if (!IsValid(it)) @@ -211,15 +210,15 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) { auto& db_slice = op_args.shard->db_slice(); - DCHECK(db_slice.IsDbValid(op_args.db_ind)); + DCHECK(db_slice.IsDbValid(op_args.db_cntx.db_index)); unsigned cnt = 0; - VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has " - << db_slice.DbSize(op_args.db_ind); + VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_cntx.db_index << " has " + << db_slice.DbSize(op_args.db_cntx.db_index); PrimeTable::Cursor cur = *cursor; - auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind); + auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_cntx.db_index); do { cur = prime_table->Traverse( cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); }); @@ -245,10 +244,11 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys } cursor >>= 10; + DbContext db_cntx{.db_index = cntx->conn_state.db_index, .time_now_ms = GetCurrentTimeMs()}; do { ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index}; + OpArgs op_args{EngineShard::tlocal(), 0, db_cntx}; OpScan(op_args, scan_opts, &cursor, keys); }); @@ -543,7 +543,7 @@ OpResult OpFetchSortEntries(const OpArgs& op_args, std::string_vi bool alpha) { using namespace container_utils; - auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_ind, key); + auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_cntx, key); if (!IsValid(it) || !IsContainer(it->second)) { return OpStatus::KEY_NOTFOUND; } @@ -731,7 +731,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto it = shard->db_slice().FindExt(t->db_index(), key).first; + auto it = shard->db_slice().FindExt(t->db_context(), key).first; if (!it.is_done()) { return it->second.ObjType(); } else { @@ -746,6 +746,19 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { } } +void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) { + uint64_t now_usec; + if (cntx->transaction) { + now_usec = cntx->transaction->db_context().time_now_ms * 1000; + } else { + now_usec = absl::GetCurrentTimeNanos() / 1000; + } + + (*cntx)->StartArray(2); + (*cntx)->SendLong(now_usec / 1000000); + (*cntx)->SendLong(now_usec % 1000000); +} + OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_dest, ConnectionContext* cntx) { string_view key[2] = {ArgS(args, 1), ArgS(args, 2)}; @@ -763,7 +776,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des transaction->Schedule(); unsigned shard_count = shard_set->size(); - Renamer renamer{transaction->db_index(), Shard(key[0], shard_count)}; + Renamer renamer{Shard(key[0], shard_count)}; // Phase 1 -> Fetch keys from both shards. // Phase 2 -> If everything is ok, clone the source object, delete the destination object, and @@ -835,23 +848,23 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, const ExpireParams& params) { auto& db_slice = op_args.shard->db_slice(); - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; int64_t msec = (params.unit == TimeUnit::SEC) ? params.ts * 1000 : params.ts; - int64_t now_msec = db_slice.Now(); + int64_t now_msec = op_args.db_cntx.time_now_ms; int64_t rel_msec = params.absolute ? msec - now_msec : msec; if (rel_msec > kMaxExpireDeadlineSec * 1000) { return OpStatus::OUT_OF_RANGE; } if (rel_msec <= 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } else if (IsValid(expire_it)) { expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec); } else { - db_slice.UpdateExpire(op_args.db_ind, it, rel_msec + now_msec); + db_slice.UpdateExpire(op_args.db_cntx.db_index, it, rel_msec + now_msec); } return OpStatus::OK; @@ -859,14 +872,14 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key, OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) { auto& db_slice = shard->db_slice(); - auto [it, expire_it] = db_slice.FindExt(t->db_index(), key); + auto [it, expire_it] = db_slice.FindExt(t->db_context(), key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; if (!IsValid(expire_it)) return OpStatus::SKIPPED; - int64_t ttl_ms = db_slice.ExpireTime(expire_it) - db_slice.Now(); + int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->db_context().time_now_ms; DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null. return ttl_ms; } @@ -878,10 +891,10 @@ OpResult GenericFamily::OpDel(const OpArgs& op_args, ArgSlice keys) { uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto fres = db_slice.FindExt(op_args.db_ind, keys[i]); + auto fres = db_slice.FindExt(op_args.db_cntx, keys[i]); if (!IsValid(fres.first)) continue; - res += int(db_slice.Del(op_args.db_ind, fres.first)); + res += int(db_slice.Del(op_args.db_cntx.db_index, fres.first)); } return res; @@ -893,7 +906,7 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys) uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto find_res = db_slice.FindExt(op_args.db_ind, keys[i]); + auto find_res = db_slice.FindExt(op_args.db_cntx, keys[i]); res += IsValid(find_res.first); } return res; @@ -903,12 +916,12 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, bool skip_exists) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); - auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key); + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, from_key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; bool is_prior_list = false; - auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key); + auto [to_it, to_expire] = db_slice.FindExt(op_args.db_cntx, to_key); if (IsValid(to_it)) { if (skip_exists) return OpStatus::KEY_EXISTS; @@ -932,20 +945,20 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, // It is guaranteed that UpdateExpire() call does not erase the element because then // from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators, // therefore we can delete 'from_it'. - db_slice.UpdateExpire(op_args.db_ind, to_it, exp_ts); - CHECK(db_slice.Del(op_args.db_ind, from_it)); + db_slice.UpdateExpire(op_args.db_cntx.db_index, to_it, exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); } else { // Here we first delete from_it because AddNew below could invalidate from_it. // On the other hand, AddNew does not rely on the iterators - this is why we keep // the value in `from_obj`. - CHECK(db_slice.Del(op_args.db_ind, from_it)); - to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); + to_it = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts); } to_it->first.SetSticky(sticky); if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { - es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key); + es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, to_key); } return OpStatus::OK; } @@ -957,7 +970,7 @@ OpResult GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys) uint32_t res = 0; for (uint32_t i = 0; i < keys.size(); ++i) { - auto [it, _] = db_slice.FindExt(op_args.db_ind, keys[i]); + auto [it, _] = db_slice.FindExt(op_args.db_cntx, keys[i]); if (IsValid(it) && !it->first.IsSticky()) { it->first.SetSticky(true); ++res; @@ -974,12 +987,14 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t auto& db_slice = op_args.shard->db_slice(); // Fetch value at key in current db. - auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key); + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; // Fetch value at key in target db. - auto [to_it, _] = db_slice.FindExt(target_db, key); + DbContext target_cntx = op_args.db_cntx; + target_cntx.db_index = target_db; + auto [to_it, _] = db_slice.FindExt(target_cntx, key); if (IsValid(to_it)) return OpStatus::KEY_EXISTS; @@ -993,8 +1008,8 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t // Restore expire flag after std::move. from_it->second.SetExpire(IsValid(from_expire)); - CHECK(db_slice.Del(op_args.db_ind, from_it)); - to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts); + CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it)); + to_it = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts); to_it->first.SetSticky(sticky); if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) { @@ -1029,6 +1044,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan) << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) + << CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(Time) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 346e8a8eb..a2c9e1a55 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -60,6 +60,7 @@ class GenericFamily { static void Echo(CmdArgList args, ConnectionContext* cntx); static void Select(CmdArgList args, ConnectionContext* cntx); static void Scan(CmdArgList args, ConnectionContext* cntx); + static void Time(CmdArgList args, ConnectionContext* cntx); static void Type(CmdArgList args, ConnectionContext* cntx); static OpResult RenameGeneric(CmdArgList args, bool skip_exist_dest, diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 932be0dae..70211db2c 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -30,23 +30,23 @@ TEST_F(GenericFamilyTest, Expire) { auto resp = Run({"expire", "key", "1"}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 1000); + AdvanceTime(1000); resp = Run({"get", "key"}); EXPECT_THAT(resp, ArgType(RespExpr::NIL)); Run({"set", "key", "val"}); - resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 2000)}); + resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 2000)}); EXPECT_THAT(resp, IntArg(1)); // override - resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 3000)}); + resp = Run({"pexpireat", "key", absl::StrCat(TEST_current_time_ms + 3000)}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 2999); + AdvanceTime(2999); resp = Run({"get", "key"}); EXPECT_THAT(resp, "val"); - UpdateTime(expire_now_ + 3000); + AdvanceTime(1); resp = Run({"get", "key"}); EXPECT_THAT(resp, ArgType(RespExpr::NIL)); } @@ -331,4 +331,28 @@ TEST_F(GenericFamilyTest, Sort) { ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double")); } +TEST_F(GenericFamilyTest, Time) { + auto resp = Run({"time"}); + EXPECT_THAT(resp, ArrLen(2)); + EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64)); + EXPECT_THAT(resp.GetVec()[1], ArgType(RespExpr::INT64)); + + // Check that time is the same inside a transaction. + Run({"multi"}); + Run({"time"}); + usleep(2000); + Run({"time"}); + resp = Run({"exec"}); + EXPECT_THAT(resp, ArrLen(2)); + + ASSERT_THAT(resp.GetVec()[0], ArrLen(2)); + ASSERT_THAT(resp.GetVec()[1], ArrLen(2)); + + for (int i = 0; i < 2; ++i) { + int64_t val0 = get(resp.GetVec()[0].GetVec()[i].u); + int64_t val1 = get(resp.GetVec()[1].GetVec()[i].u); + EXPECT_EQ(val0, val1); + } +} + } // namespace dfly diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 6f4862fd6..3d3586118 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -131,7 +131,7 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH); + auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH); if (it_res) { robj* hset = (*it_res)->second.AsRObj(); @@ -387,7 +387,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key, OBJ_HASH); + auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -437,12 +437,12 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd auto& db_slice = op_args.shard->db_slice(); pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch(bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); robj* hset = nullptr; uint8_t* lp = nullptr; @@ -459,7 +459,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } hset = it->second.AsRObj(); @@ -509,7 +509,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd } } it->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); return created; } @@ -518,17 +518,17 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd DCHECK(!values.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); - db_slice.PreUpdate(op_args.db_ind, *it_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res); CompactObj& co = (*it_res)->second; robj* hset = co.AsRObj(); unsigned deleted = 0; bool key_remove = false; - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr); @@ -548,12 +548,12 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd co.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *it_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key); if (key_remove) { if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_blob_cnt--; } - db_slice.Del(op_args.db_ind, *it_res); + db_slice.Del(op_args.db_cntx.db_index, *it_res); } else if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_bytes += lpBytes((uint8_t*)hset->ptr); } @@ -566,7 +566,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList DCHECK(!fields.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -631,7 +631,7 @@ auto HSetFamily::OpMGet(const OpArgs& op_args, std::string_view key, CmdArgList OpResult HSetFamily::OpLen(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (it_res) { robj* hset = (*it_res)->second.AsRObj(); @@ -644,7 +644,7 @@ OpResult HSetFamily::OpLen(const OpArgs& op_args, string_view key) { OpResult HSetFamily::OpGet(const OpArgs& op_args, string_view key, string_view field) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) return it_res.status(); @@ -682,7 +682,7 @@ OpResult HSetFamily::OpGet(const OpArgs& op_args, string_view key, strin OpResult> HSetFamily::OpGetAll(const OpArgs& op_args, string_view key, uint8_t mask) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { if (it_res.status() == OpStatus::KEY_NOTFOUND) return vector{}; @@ -729,7 +729,7 @@ OpResult> HSetFamily::OpGetAll(const OpArgs& op_args, string_view OpResult HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, string_view field) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_HASH); if (!it_res) { if (it_res.status() == OpStatus::KEY_NOTFOUND) @@ -761,9 +761,9 @@ OpResult HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, st OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_view field, IncrByParam* param) { auto& db_slice = op_args.shard->db_slice(); - const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + const auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); - DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index); robj* hset = nullptr; size_t lpb = 0; @@ -777,7 +777,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); hset = it->second.AsRObj(); if (hset->encoding == OBJ_ENCODING_LISTPACK) { @@ -874,14 +874,14 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie } it->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); return OpStatus::OK; } OpResult HSetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_HASH); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_HASH); if (!find_res) return find_res.status(); diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 5b5994250..973ad3160 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -53,17 +53,18 @@ string GetString(EngineShard* shard, const PrimeValue& pv) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; op_args.shard->journal()->RecordEntry(entry); } } void SetString(const OpArgs& op_args, string_view key, const string& value) { auto& db_slice = op_args.shard->db_slice(); - auto [it_output, added] = db_slice.AddOrFind(op_args.db_ind, key); - db_slice.PreUpdate(op_args.db_ind, it_output); + DbIndex db_index = op_args.db_cntx.db_index; + auto [it_output, added] = db_slice.AddOrFind(op_args.db_cntx, key); + db_slice.PreUpdate(db_index, it_output); it_output->second.SetString(value); - db_slice.PostUpdate(op_args.db_ind, it_output, key); + db_slice.PostUpdate(db_index, it_output, key); RecordJournal(op_args, key, it_output->second); } @@ -140,7 +141,7 @@ bool JsonErrorHandler(json_errc ec, const ser_context&) { } OpResult GetJson(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -447,8 +448,8 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path) { long total_deletions = 0; if (path.empty()) { auto& db_slice = op_args.shard->db_slice(); - auto [it, _] = db_slice.FindExt(op_args.db_ind, key); - total_deletions += long(db_slice.Del(op_args.db_ind, it)); + auto [it, _] = db_slice.FindExt(op_args.db_cntx, key); + total_deletions += long(db_slice.Del(op_args.db_cntx.db_index, it)); return total_deletions; } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 33d8c5aba..163a2f254 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -126,7 +126,7 @@ OpResult FindFirst(Transaction* trans) { auto args = t->ShardArgsInShard(shard->shard_id()); OpResult> ff_res = - shard->db_slice().FindFirst(t->db_index(), args); + shard->db_slice().FindFirst(t->db_context(), args); if (ff_res) { FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); @@ -260,7 +260,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_index(), key_, OBJ_LIST); + auto it_res = db_slice.Find(t->db_context(), key_, OBJ_LIST); CHECK(it_res); // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); @@ -278,7 +278,7 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) { auto& db_slice = op_args.shard->db_slice(); - auto src_res = db_slice.Find(op_args.db_ind, src, OBJ_LIST); + auto src_res = db_slice.Find(op_args.db_cntx, src, OBJ_LIST); if (!src_res) return src_res.status(); @@ -286,11 +286,11 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, quicklist* src_ql = GetQL(src_it->second); if (src == dest) { // simple case. - db_slice.PreUpdate(op_args.db_ind, src_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); string val = ListPop(ListDir::RIGHT, src_ql); quicklistPushHead(src_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it, src); + db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); return val; } @@ -299,7 +299,7 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, PrimeIterator dest_it; bool new_key = false; try { - tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_ind, dest); + tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_cntx, dest); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -312,25 +312,25 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, dest_it->second.ImportRObj(obj); // Insertion of dest could invalidate src_it. Find it again. - src_it = db_slice.GetTables(op_args.db_ind).first->Find(src); + src_it = db_slice.GetTables(op_args.db_cntx.db_index).first->Find(src); } else { if (dest_it->second.ObjType() != OBJ_LIST) return OpStatus::WRONG_TYPE; dest_ql = GetQL(dest_it->second); - db_slice.PreUpdate(op_args.db_ind, dest_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, dest_it); } - db_slice.PreUpdate(op_args.db_ind, src_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); string val = ListPop(ListDir::RIGHT, src_ql); quicklistPushHead(dest_ql, val.data(), val.size()); - db_slice.PostUpdate(op_args.db_ind, src_it, src); - db_slice.PostUpdate(op_args.db_ind, dest_it, dest, !new_key); + db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); + db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key); if (quicklistCount(src_ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, src_it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, src_it)); } return val; @@ -339,7 +339,7 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, // Read-only peek operation that determines wether the list exists and optionally // returns the first from right value without popping it from the list. OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { - auto it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) { return it_res.status(); } @@ -366,13 +366,13 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d bool new_key = false; if (skip_notexist) { - auto it_res = es->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); it = *it_res; } else { try { - tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_ind, key); + tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -389,7 +389,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d } else { if (it->second.ObjType() != OBJ_LIST) return OpStatus::WRONG_TYPE; - es->db_slice().PreUpdate(op_args.db_ind, it); + es->db_slice().PreUpdate(op_args.db_cntx.db_index, it); ql = GetQL(it->second); } @@ -405,10 +405,10 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d if (es->blocking_controller()) { string tmp; string_view key = it->first.GetSlice(&tmp); - es->blocking_controller()->AwakeWatched(op_args.db_ind, key); + es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key); } } else { - es->db_slice().PostUpdate(op_args.db_ind, it, key, true); + es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true); } return quicklistCount(ql); @@ -417,13 +417,13 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count, bool return_results) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); StringVec res; if (quicklistCount(ql) < count) { @@ -441,10 +441,10 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u } } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return res; @@ -821,7 +821,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -831,7 +831,7 @@ OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key } OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key, long index) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); quicklist* ql = GetQL(res.value()->second); @@ -856,7 +856,7 @@ OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, string_view pivot, string_view elem, int insert_param) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -874,14 +874,14 @@ OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, strin int res = -1; if (found) { - db_slice.PreUpdate(op_args.db_ind, *it_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *it_res); if (insert_param == LIST_TAIL) { quicklistInsertAfter(qiter, &entry, elem.data(), elem.size()); } else { DCHECK_EQ(LIST_HEAD, insert_param); quicklistInsertBefore(qiter, &entry, elem.data(), elem.size()); } - db_slice.PostUpdate(op_args.db_ind, *it_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *it_res, key); res = quicklistCount(ql); } quicklistReleaseIterator(qiter); @@ -892,7 +892,7 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str long count) { DCHECK(!elem.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -912,7 +912,7 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str unsigned removed = 0; const uint8_t* elem_ptr = reinterpret_cast(elem.data()); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); while (quicklistNext(qiter, &entry)) { if (quicklistCompare(&entry, elem_ptr, elem.size())) { quicklistDelEntry(qiter, &entry); @@ -921,12 +921,12 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str break; } } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); quicklistReleaseIterator(qiter); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return removed; @@ -935,16 +935,16 @@ OpResult ListFamily::OpRem(const OpArgs& op_args, string_view key, str OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view elem, long index) { DCHECK(!elem.empty()); auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); int replaced = quicklistReplaceAtIndex(ql, index, elem.data(), elem.size()); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (!replaced) { return OpStatus::OUT_OF_RANGE; @@ -954,7 +954,7 @@ OpStatus ListFamily::OpSet(const OpArgs& op_args, string_view key, string_view e OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, long end) { auto& db_slice = op_args.shard->db_slice(); - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) return it_res.status(); @@ -985,20 +985,20 @@ OpStatus ListFamily::OpTrim(const OpArgs& op_args, string_view key, long start, rtrim = llen - end - 1; } - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); quicklistDelRange(ql, 0, ltrim); quicklistDelRange(ql, -rtrim, rtrim); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (quicklistCount(ql) == 0) { - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return OpStatus::OK; } OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view key, long start, long end) { - auto res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_LIST); + auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) return res.status(); @@ -1025,7 +1025,7 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view str_vec.emplace_back(ce.ToString()); return true; }, start, end); - + return str_vec; } diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 4b66105aa..5f5f522b1 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -52,7 +52,8 @@ TEST_F(ListFamilyTest, Expire) { resp = Run({"expire", kKey1, "1"}); EXPECT_THAT(resp, IntArg(1)); - UpdateTime(expire_now_ + 1000); + AdvanceTime(1000); + resp = Run({"lpush", kKey1, "1"}); EXPECT_THAT(resp, IntArg(1)); } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index f3bda0f9d..f73bfa2f8 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1232,6 +1232,8 @@ void RdbLoader::FlushShardAsync(ShardId sid) { void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbSlice& db_slice = EngineShard::tlocal()->db_slice(); + DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()}; + for (const auto& item : ib) { std::string_view key{item.key}; PrimeValue pv; @@ -1245,7 +1247,10 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { break; } - auto [it, added] = db_slice.AddEntry(db_ind, key, std::move(pv), item.expire_ms); + if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) + continue; + + auto [it, added] = db_slice.AddEntry(db_cntx, key, std::move(pv), item.expire_ms); if (!added) { LOG(WARNING) << "RDB has duplicated key '" << key << "' in DB " << db_ind; diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 5411fefe4..889553760 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -484,8 +484,8 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v // to overwrite the key. However, if the set is empty it means we should delete the // key if it exists. if (overwrite && vals.empty()) { - auto it = db_slice.FindExt(op_args.db_ind, key).first; - db_slice.Del(op_args.db_ind, it); + auto it = db_slice.FindExt(op_args.db_cntx, key).first; + db_slice.Del(op_args.db_cntx.db_index, it); return 0; } @@ -494,7 +494,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v bool new_key = false; try { - tie(it, new_key) = db_slice.AddOrFind(op_args.db_ind, key); + tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc& e) { return OpStatus::OUT_OF_MEMORY; } @@ -507,7 +507,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v return OpStatus::WRONG_TYPE; // Update stats and trigger any handle the old value if needed. - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } if (new_key || overwrite) { @@ -555,7 +555,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v res = AddStrSet(std::move(vals), &co); } - db_slice.PostUpdate(op_args.db_ind, it, key, !new_key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); return res; } @@ -563,20 +563,20 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v OpResult OpRem(const OpArgs& op_args, std::string_view key, const ArgSlice& vals) { auto* es = op_args.shard; auto& db_slice = es->db_slice(); - OpResult find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) { return find_res.status(); } - db_slice.PreUpdate(op_args.db_ind, *find_res); + db_slice.PreUpdate(op_args.db_cntx.db_index, *find_res); CompactObj& co = find_res.value()->second; auto [removed, isempty] = RemoveSet(vals, &co); - db_slice.PostUpdate(op_args.db_ind, *find_res, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *find_res, key); if (isempty) { - CHECK(db_slice.Del(op_args.db_ind, find_res.value())); + CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value())); } return removed; @@ -610,7 +610,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { for (auto k : largs) { unsigned index = (k == src_) ? 0 : 1; - OpResult res = es->db_slice().Find(t->db_index(), k, OBJ_SET); + OpResult res = es->db_slice().Find(t->db_context(), k, OBJ_SET); if (res && index == 0) { // successful src find. DCHECK(!res->is_done()); const CompactObj& val = res.value()->second; @@ -676,7 +676,7 @@ OpResult OpUnion(const OpArgs& op_args, ArgSlice keys) { absl::flat_hash_set uniques; for (std::string_view key : keys) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET); if (find_res) { container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){ uniques.emplace(ce.ToString()); @@ -698,7 +698,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!keys.empty()); DVLOG(1) << "OpDiff from " << keys.front(); EngineShard* es = op_args.shard; - OpResult find_res = es->db_slice().Find(op_args.db_ind, keys.front(), OBJ_SET); + OpResult find_res = es->db_slice().Find(op_args.db_cntx, keys.front(), OBJ_SET); if (!find_res) { return find_res.status(); @@ -713,7 +713,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { DCHECK(!uniques.empty()); // otherwise the key would not exist. for (size_t i = 1; i < keys.size(); ++i) { - OpResult diff_res = es->db_slice().Find(op_args.db_ind, keys[i], OBJ_SET); + OpResult diff_res = es->db_slice().Find(op_args.db_cntx, keys[i], OBJ_SET); if (!diff_res) { if (diff_res.status() == OpStatus::WRONG_TYPE) { return OpStatus::WRONG_TYPE; @@ -750,7 +750,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f StringVec result; if (keys.size() == 1) { - OpResult find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET); + OpResult find_res = es->db_slice().Find(t->db_context(), keys.front(), OBJ_SET); if (!find_res) return find_res.status(); @@ -767,7 +767,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f OpStatus status = OpStatus::OK; for (size_t i = 0; i < keys.size(); ++i) { - OpResult find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET); + OpResult find_res = es->db_slice().Find(t->db_context(), keys[i], OBJ_SET); if (!find_res) { if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND || find_res.status() != OpStatus::KEY_NOTFOUND) { @@ -841,7 +841,7 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) { std::string_view val = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; @@ -906,7 +906,7 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); if (!find_res) { return find_res.status(); } @@ -1220,7 +1220,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) { OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned count) { auto& db_slice = op_args.shard->db_slice(); - OpResult find_res = db_slice.Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); @@ -1241,10 +1241,10 @@ OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key }); /* Delete the set as it is now empty */ - CHECK(db_slice.Del(op_args.db_ind, it)); + CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } else { SetType st{it->second.RObjPtr(), it->second.Encoding()}; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); if (st.second == kEncodingIntSet) { intset* is = (intset*)st.first; int64_t val = 0; @@ -1260,14 +1260,14 @@ OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key } else { result = PopStrSet(count, st); } - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); } return result; } OpResult SetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET); if (!find_res) return find_res.status(); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 49c65ea6d..4202e78c0 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -145,7 +145,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -161,7 +161,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& if (it->second.ObjType() != OBJ_STREAM) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } stream* stream_inst = (stream*)it->second.RObjPtr(); @@ -201,7 +201,7 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -247,7 +247,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO OpResult OpLen(const OpArgs& op_args, string_view key) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); CompactObj& cobj = (*res_it)->second; @@ -255,9 +255,10 @@ OpResult OpLen(const OpArgs& op_args, string_view key) { return s->length; } -OpResult> OpListGroups(DbIndex db_index, string_view key, EngineShard* shard) { +OpResult> OpListGroups(const DbContext& db_cntx, string_view key, + EngineShard* shard) { auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(db_index, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -294,7 +295,7 @@ struct CreateOpts { OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -323,7 +324,7 @@ OpResult> FindGroup(const OpArgs& op_args, string_view string_view gname) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -406,7 +407,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -445,7 +446,7 @@ OpStatus OpSetId2(const OpArgs& op_args, string_view key, const streamID& sid) { OpResult OpDel(const OpArgs& op_args, string_view key, absl::Span ids) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_STREAM); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); @@ -719,7 +720,8 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { // We do not use transactional xemantics for xinfo since it's informational command. auto cb = [&]() { EngineShard* shard = EngineShard::tlocal(); - return OpListGroups(cntx->db_index(), key, shard); + DbContext db_context{.db_index = cntx->db_index(), .time_now_ms = GetCurrentTimeMs()}; + return OpListGroups(db_context, key, shard); }; OpResult> result = shard_set->Await(sid, std::move(cb)); @@ -744,9 +746,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(t->GetOpArgs(shard), key); - }; + auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result || result.status() == OpStatus::KEY_NOTFOUND) { diff --git a/src/server/string_family.cc b/src/server/string_family.cc index ca590d529..d322a645a 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -66,7 +66,7 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; op_args.shard->journal()->RecordEntry(entry); } } @@ -77,7 +77,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta size_t range_len = start + value.size(); if (range_len == 0) { - auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + auto it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (it_res) { return it_res.value()->second.Size(); } else { @@ -85,7 +85,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta } } - auto [it, added] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, added] = db_slice.AddOrFind(op_args.db_cntx, key); string s; @@ -99,12 +99,12 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta if (s.size() < range_len) s.resize(range_len); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); - db_slice.PostUpdate(op_args.db_ind, it, key, !added); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !added); RecordJournal(op_args, key, it->second); return it->second.Size(); @@ -112,7 +112,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -142,8 +142,8 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, - string_view val, bool prepend) { +size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val, + bool prepend) { string tmp, new_val; auto* shard = op_args.shard; string_view slice = GetSlice(shard, it->second, &tmp); @@ -153,9 +153,9 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, new_val = absl::StrCat(slice, val); auto& db_slice = shard->db_slice(); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, it, key, true); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); RecordJournal(op_args, key, it->second); return new_val.size(); @@ -166,10 +166,10 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi bool prepend) { auto* shard = op_args.shard; auto& db_slice = shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); if (inserted) { it->second.SetString(val); - db_slice.PostUpdate(op_args.db_ind, it, key, false); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); RecordJournal(op_args, key, it->second); return val.size(); @@ -181,10 +181,9 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi return ExtendExisting(op_args, it, key, val, prepend); } -OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, - bool prepend) { +OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) { auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = db_slice.Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res) { return false; } @@ -193,7 +192,7 @@ OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view } OpResult OpGet(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -204,14 +203,14 @@ OpResult OpGet(const OpArgs& op_args, string_view key) { OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) { auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key); char buf[128]; if (inserted) { char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it, key, false); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); RecordJournal(op_args, key, it->second); return val; @@ -241,9 +240,9 @@ OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it, key, true); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); RecordJournal(op_args, key, it->second); return base; @@ -255,7 +254,7 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, auto& db_slice = op_args.shard->db_slice(); // we avoid using AddOrFind because of skip_on_missing option for memcache. - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key); if (!IsValid(it)) { if (skip_on_missing) @@ -266,7 +265,7 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, // AddNew calls PostUpdate inside. try { - it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); + it = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -293,9 +292,9 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, int64_t new_val = prev + incr; DCHECK(!it->second.IsExternal()); - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetInt(new_val); - db_slice.PostUpdate(op_args.db_ind, it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); RecordJournal(op_args, key, it->second); return new_val; @@ -318,7 +317,7 @@ int64_t CalculateAbsTime(int64_t unix_time, bool as_milli) { OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { DCHECK(!args.empty() && args.size() % 2 == 0); - SetCmd::SetParams params{op_args.db_ind}; + SetCmd::SetParams params; SetCmd sg(op_args); for (size_t i = 0; i < args.size(); i += 2) { @@ -332,19 +331,29 @@ OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { return OpStatus::OK; } +OpResult SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, + string_view key, string_view value) { + DCHECK(cntx->transaction); + + auto cb = [&](Transaction* t, EngineShard* shard) { + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); + }; + return cntx->transaction->ScheduleSingleHop(std::move(cb)); +} + } // namespace OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) { EngineShard* shard = op_args_.shard; auto& db_slice = shard->db_slice(); - DCHECK_LT(params.db_index, db_slice.db_array_size()); - DCHECK(db_slice.IsDbValid(params.db_index)); + DCHECK(db_slice.IsDbValid(op_args_.db_cntx.db_index)); VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") "; if (params.IsConditionalSet()) { - const auto [it, expire_it] = db_slice.FindExt(params.db_index, key); + const auto [it, expire_it] = db_slice.FindExt(op_args_.db_cntx, key); // Make sure that we have this key, and only add it if it does exists if (params.how == SET_IF_EXISTS) { if (IsValid(it)) { @@ -363,7 +372,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value // Trying to add a new entry. tuple add_res; try { - add_res = db_slice.AddOrFind2(params.db_index, key); + add_res = db_slice.AddOrFind2(op_args_.db_cntx, key); } catch (bad_alloc& e) { return OpStatus::OUT_OF_MEMORY; } @@ -377,20 +386,21 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value PrimeValue tvalue{value}; tvalue.SetFlag(params.memcache_flags != 0); it->second = std::move(tvalue); - db_slice.PostUpdate(params.db_index, it, key, false); + db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key, false); if (params.expire_after_ms) { - db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now()); + db_slice.UpdateExpire(op_args_.db_cntx.db_index, it, + params.expire_after_ms + op_args_.db_cntx.time_now_ms); } if (params.memcache_flags) - db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags); if (shard->tiered_storage()) { // external storage enabled. // TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid // afterwards. if (value.size() >= kMinTieredLen) { - shard->tiered_storage()->UnloadItem(params.db_index, it); + shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it); } } @@ -415,23 +425,24 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt } DbSlice& db_slice = shard->db_slice(); - uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0; + uint64_t at_ms = + params.expire_after_ms ? params.expire_after_ms + op_args_.db_cntx.time_now_ms : 0; if (IsValid(e_it) && at_ms) { e_it->second = db_slice.FromAbsoluteTime(at_ms); } else { // We need to update expiry, or maybe erase the object if it was expired. - bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms); + bool changed = db_slice.UpdateExpire(op_args_.db_cntx.db_index, it, at_ms); if (changed && at_ms == 0) // erased. return OpStatus::OK; // TODO: to update journal with deletion. } - db_slice.PreUpdate(params.db_index, it); + db_slice.PreUpdate(op_args_.db_cntx.db_index, it); // Check whether we need to update flags table. bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag(); if (req_flag_update) { prime_value.SetFlag(params.memcache_flags != 0); - db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags); } // overwrite existing entry. @@ -443,11 +454,11 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt // can be invalid after the function returns and the functions that follow may access invalid // entry. if (shard->tiered_storage()) { - shard->tiered_storage()->UnloadItem(params.db_index, it); + shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it); } } - db_slice.PostUpdate(params.db_index, it, key); + db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key); RecordJournal(op_args_, key, it->second); return OpStatus::OK; @@ -459,7 +470,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view value = ArgS(args, 2); - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.memcache_flags = cntx->conn_state.memcache_flag; int64_t int_arg; @@ -518,7 +529,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { } } - const auto result{SetGeneric(cntx, std::move(sparams), key, value)}; + const auto result{SetGeneric(cntx, sparams, key, value)}; + if (result == OpStatus::OK) { return builder->SendStored(); } @@ -532,17 +544,6 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { return builder->SendSetSkipped(); } -OpResult StringFamily::SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams, - std::string_view key, std::string_view value) { - DCHECK(cntx->transaction); - - auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(t->GetOpArgs(shard)); - return sg.Set(sparams, key, value); - }; - return cntx->transaction->ScheduleSingleHop(std::move(cb)); -} - void StringFamily::SetEx(CmdArgList args, ConnectionContext* cntx) { SetExGeneric(true, std::move(args), cntx); } @@ -555,7 +556,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); string_view value = ArgS(args, 2); - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.how = SetCmd::SET_IF_NOTEXIST; sparams.memcache_flags = cntx->conn_state.memcache_flag; const auto results{SetGeneric(cntx, std::move(sparams), key, value)}; @@ -601,7 +602,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { string_view value = ArgS(args, 2); std::optional prev_val; - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; sparams.prev_val = &prev_val; auto cb = [&](Transaction* t, EngineShard* shard) { @@ -771,7 +772,7 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext return (*cntx)->SendError(InvalidExpireTime(ArgS(args, 0))); } - SetCmd::SetParams sparams{cntx->db_index()}; + SetCmd::SetParams sparams; if (seconds) sparams.expire_after_ms = uint64_t(unit_vals) * 1000; else @@ -876,7 +877,7 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* es) { auto args = t->ShardArgsInShard(es->shard_id()); for (size_t i = 0; i < args.size(); i += 2) { - auto it = es->db_slice().FindExt(t->db_index(), args[i]).first; + auto it = es->db_slice().FindExt(t->db_context(), args[i]).first; if (IsValid(it)) { exists.store(true, memory_order_relaxed); break; @@ -906,7 +907,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult it_res = shard->db_slice().Find(t->db_index(), key, OBJ_STRING); + OpResult it_res = shard->db_slice().Find(t->db_context(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -993,7 +994,7 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction auto& db_slice = shard->db_slice(); for (size_t i = 0; i < args.size(); ++i) { - OpResult it_res = db_slice.Find(t->db_index(), args[i], OBJ_STRING); + OpResult it_res = db_slice.Find(t->db_context(), args[i], OBJ_STRING); if (!it_res) continue; diff --git a/src/server/string_family.h b/src/server/string_family.h index bc0d80f19..a2031e3c6 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -26,7 +26,6 @@ class SetCmd { struct SetParams { SetHow how = SET_ALWAYS; - DbIndex db_index = 0; uint32_t memcache_flags = 0; // Relative value based on now. 0 means no expiration. @@ -34,9 +33,6 @@ class SetCmd { mutable std::optional* prev_val = nullptr; // GETSET option bool keep_expire = false; // KEEPTTL - TODO: to implement it. - explicit SetParams(DbIndex dib) : db_index(dib) { - } - constexpr bool IsConditionalSet() const { return how == SET_IF_NOTEXIST || how == SET_IF_EXISTS; } @@ -82,8 +78,6 @@ class StringFamily { static void IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx); static void ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx); static void SetExGeneric(bool seconds, CmdArgList args, ConnectionContext* cntx); - static OpResult SetGeneric(ConnectionContext* cntx, SetCmd::SetParams sparams, - std::string_view key, std::string_view value); struct GetResp { std::string value; diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index c9c78dcb6..142552892 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -70,6 +70,8 @@ TEST_F(StringFamilyTest, Incr) { TEST_F(StringFamilyTest, Append) { Run({"setex", "key", "100", "val"}); + EXPECT_THAT(Run({"ttl", "key"}), IntArg(100)); + EXPECT_THAT(Run({"append", "key", "bar"}), IntArg(6)); EXPECT_THAT(Run({"ttl", "key"}), IntArg(100)); } @@ -77,17 +79,17 @@ TEST_F(StringFamilyTest, Append) { TEST_F(StringFamilyTest, Expire) { ASSERT_EQ(Run({"set", "key", "val", "PX", "20"}), "OK"); - UpdateTime(expire_now_ + 10); + AdvanceTime(10); EXPECT_EQ(Run({"get", "key"}), "val"); - UpdateTime(expire_now_ + 20); + AdvanceTime(10); EXPECT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL)); ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), "OK"); ASSERT_THAT(Run({"incr", "i"}), IntArg(2)); - UpdateTime(expire_now_ + 30); + AdvanceTime(10); ASSERT_THAT(Run({"incr", "i"}), IntArg(1)); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index ce9acc19f..a79a22d62 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -131,10 +131,9 @@ void BaseFamilyTest::SetUp() { opts.disable_time_update = true; service_->Init(nullptr, nullptr, opts); - expire_now_ = absl::GetCurrentTimeNanos() / 1000000; + TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; auto cb = [&](EngineShard* s) { - s->db_slice().UpdateExpireBase(expire_now_ - 1000, 0); - s->db_slice().UpdateExpireClock(expire_now_); + s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); }; shard_set->RunBriefInParallel(cb); @@ -151,12 +150,6 @@ void BaseFamilyTest::TearDown() { LOG(INFO) << "Finishing " << test_info->name(); } -// ts is ms -void BaseFamilyTest::UpdateTime(uint64_t ms) { - auto cb = [ms](EngineShard* s) { s->db_slice().UpdateExpireClock(ms); }; - shard_set->RunBriefInParallel(cb); -} - void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) { auto step = 50us; auto timeout_micro = chrono::duration_cast (1000ms * timeout); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 7ac06084f..c8b93791e 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -68,8 +68,9 @@ class BaseFamilyTest : public ::testing::Test { TestConnWrapper* AddFindConn(Protocol proto, std::string_view id); static std::vector StrArray(const RespExpr& expr); - // ts is ms - void UpdateTime(uint64_t ms); + void AdvanceTime(int64_t ms) { + TEST_current_time_ms += ms; + } // Wait for a locked key to unlock. Aborts after timeout seconds passed. void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3); @@ -88,7 +89,7 @@ class BaseFamilyTest : public ::testing::Test { absl::flat_hash_map> connections_; ::boost::fibers::mutex mu_; ConnectionContext::DebugInfo last_cmd_dbg_info_; - uint64_t expire_now_; + std::vector resp_vec_; bool single_response_ = true; }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1d81af00b..f0219be8f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -24,7 +24,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space; namespace { -std::atomic_uint64_t op_seq{1}; +atomic_uint64_t op_seq{1}; [[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction); @@ -449,10 +449,12 @@ void Transaction::ScheduleInternal() { } while (true) { - txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + txid_ = op_seq.fetch_add(1, memory_order_relaxed); - std::atomic_uint32_t lock_granted_cnt{0}; - std::atomic_uint32_t success{0}; + atomic_uint32_t lock_granted_cnt{0}; + atomic_uint32_t success{0}; + + time_now_ms_ = GetCurrentTimeMs(); auto cb = [&](EngineShard* shard) { pair res = ScheduleInShard(shard); @@ -551,6 +553,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // above. // IsArmedInShard() first checks run_count_ before accessing shard_data. run_count_.fetch_add(1, memory_order_release); + time_now_ms_ = GetCurrentTimeMs(); // Please note that schedule_cb can not update any data on ScheduleSingleHop stack // since the latter can exit before ScheduleUniqueShard returns. @@ -805,7 +808,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { } // we can do it because only a single thread writes into txid_ and sd. - txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); + txid_ = op_seq.fetch_add(1, memory_order_relaxed); sd.pq_pos = shard->txq()->Insert(this); DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); @@ -1111,7 +1114,7 @@ inline uint32_t Transaction::DecreaseRunCnt() { ::boost::intrusive_ptr guard(this); // We use release so that no stores will be reordered after. - uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); + uint32_t res = run_count_.fetch_sub(1, memory_order_release); if (res == 1) { run_ec_.notify(); } diff --git a/src/server/transaction.h b/src/server/transaction.h index d222d874f..dbca99c1f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -135,10 +135,6 @@ class Transaction { const char* Name() const; - DbIndex db_index() const { - return db_index_; // TODO: support multiple db indexes. - } - uint32_t unique_shard_cnt() const { return unique_shard_cnt_; } @@ -180,7 +176,15 @@ class Transaction { KeyLockArgs GetLockArgs(ShardId sid) const; OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, txid_, db_index_}; + return OpArgs{shard, txid_, db_context()}; + } + + DbContext db_context() const { + return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_}; + } + + DbIndex db_index() const { + return db_index_; } private: @@ -287,15 +291,15 @@ class Transaction { const CommandId* cid_; TxId txid_{0}; + uint64_t time_now_ms_{0}; std::atomic notify_txid_{kuint64max}; - std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; // unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ ShardId unique_shard_id_{kInvalidSid}; - DbIndex db_index_ = 0; + DbIndex db_index_; // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. OpStatus local_result_ = OpStatus::OK; diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 4a4c1235f..dd5e02182 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -87,13 +87,13 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args size_t member_len) { auto& db_slice = op_args.shard->db_slice(); if (zparams.flags & ZADD_IN_XX) { - return db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + return db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); } pair add_res; try { - add_res = db_slice.AddOrFind(op_args.db_ind, key); + add_res = db_slice.AddOrFind(op_args.db_cntx, key); } catch (bad_alloc&) { return OpStatus::OUT_OF_MEMORY; } @@ -110,13 +110,13 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args DVLOG(2) << "Created zset " << zobj->ptr; if (!add_res.second) { - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } it->second.ImportRObj(zobj); } else { if (it->second.ObjType() != OBJ_ZSET) return OpStatus::WRONG_TYPE; - db_slice.PreUpdate(op_args.db_ind, it); + db_slice.PreUpdate(op_args.db_cntx.db_index, it); } return it; @@ -615,7 +615,7 @@ OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest return OpStatus::OK; // return empty map for (unsigned j = start; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); if (!it_res) @@ -661,7 +661,7 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest return OpStatus::SKIPPED; // return noop for (unsigned j = start; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_index(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); @@ -710,8 +710,8 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ auto& db_slice = op_args.shard->db_slice(); if (zparams.override && members.empty()) { - auto it = db_slice.FindExt(op_args.db_ind, key).first; - db_slice.Del(op_args.db_ind, it); + auto it = db_slice.FindExt(op_args.db_cntx, key).first; + db_slice.Del(op_args.db_cntx.db_index, it); return OpStatus::OK; } @@ -763,7 +763,7 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ DVLOG(2) << "ZAdd " << zobj->ptr; res_it.value()->second.SyncRObj(); - op_args.shard->db_slice().PostUpdate(op_args.db_ind, *res_it, key); + op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zparams.flags & ZADD_IN_INCR) { aresult.new_score = new_score; @@ -934,7 +934,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_index(), key, OBJ_ZSET); + OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_ZSET); if (!find_res) { return find_res.status(); } @@ -1505,7 +1505,7 @@ bool ZSetFamily::ParseRangeByScoreParams(CmdArgList args, RangeParams* params) { OpResult ZSetFamily::OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor) { - OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!find_res) return find_res.status(); @@ -1564,11 +1564,11 @@ OpResult ZSetFamily::OpScan(const OpArgs& op_args, std::string_view k OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); - db_slice.PreUpdate(op_args.db_ind, *res_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); robj* zobj = res_it.value()->second.AsRObj(); sds& tmp_str = op_args.shard->tmp_str1; unsigned deleted = 0; @@ -1578,17 +1578,17 @@ OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg } auto zlen = zsetLength(zobj); res_it.value()->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *res_it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value())); + CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } return deleted; } OpResult ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1605,7 +1605,7 @@ OpResult ZSetFamily::OpScore(const OpArgs& op_args, string_view key, str auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key) -> OpResult { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1620,11 +1620,11 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key, const ZRangeSpec& range_spec) { auto& db_slice = op_args.shard->db_slice(); - OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); - db_slice.PreUpdate(op_args.db_ind, *res_it); + db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); robj* zobj = res_it.value()->second.AsRObj(); @@ -1632,11 +1632,11 @@ OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key std::visit(iv, range_spec.interval); res_it.value()->second.SyncRObj(); - db_slice.PostUpdate(op_args.db_ind, *res_it, key); + db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); auto zlen = zsetLength(zobj); if (zlen == 0) { - CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value())); + CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } return iv.removed(); @@ -1644,7 +1644,7 @@ OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key OpResult ZSetFamily::OpRank(const OpArgs& op_args, string_view key, string_view member, bool reverse) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1659,7 +1659,7 @@ OpResult ZSetFamily::OpRank(const OpArgs& op_args, string_view key, st OpResult ZSetFamily::OpCount(const OpArgs& op_args, std::string_view key, const ScoreInterval& interval) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status(); @@ -1730,7 +1730,7 @@ OpResult ZSetFamily::OpCount(const OpArgs& op_args, std::string_view k OpResult ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key, const ZSetFamily::LexInterval& interval) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + OpResult res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET); if (!res_it) return res_it.status();