From 2ff7ff98415d82c0df30c7947e8fc24274274e1f Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 21 Apr 2024 11:34:42 +0300 Subject: [PATCH] chore: get rid of lock keys (#2894) * chore: get rid of lock keys 1. Introduce LockTag a type representing the part of the key that is used for locking. 2. Hash keys once in each transaction. 3. Expose swap_memory_bytes metric. --------- Signed-off-by: Roman Gershman --- src/server/common.h | 5 +- src/server/db_slice.cc | 58 +++++++------------ src/server/db_slice.h | 15 +++-- src/server/engine_shard_set.cc | 12 ++-- src/server/main_service.cc | 9 +-- src/server/server_family.cc | 2 + src/server/table.cc | 13 ++--- src/server/table.h | 7 ++- src/server/test_utils.cc | 18 +++--- src/server/test_utils.h | 2 +- src/server/transaction.cc | 102 +++++++++++++++++---------------- src/server/transaction.h | 26 +++++---- 12 files changed, 136 insertions(+), 133 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 7e344db6f..0e7ef4c65 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -69,8 +69,7 @@ struct LockTagOptions { struct KeyLockArgs { DbIndex db_index = 0; - ArgSlice args; - unsigned key_step = 1; + absl::Span fps; }; // Describes key indices. @@ -118,7 +117,7 @@ struct OpArgs { } }; -// A strong type for a lock tag. Helps to disambiguide between keys and the parts of the +// A strong type for a lock tag. Helps to disambiguate between keys and the parts of the // keys that are used for locking. class LockTag { std::string_view str_; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 3614179de..91a38a64d 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -989,70 +989,56 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { } bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { - if (lock_args.args.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands. + if (lock_args.fps.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands. return true; } - DCHECK_GT(lock_args.key_step, 0u); auto& lt = db_arr_[lock_args.db_index]->trans_locks; bool lock_acquired = true; - if (lock_args.args.size() == 1) { - LockTag tag(lock_args.args.front()); - lock_acquired = lt.Acquire(tag, mode); - uniq_keys_ = {string_view(tag)}; // needed only for tests. + if (lock_args.fps.size() == 1) { + lock_acquired = lt.Acquire(lock_args.fps.front(), mode); + uniq_fps_ = {lock_args.fps.front()}; // needed only for tests. } else { - uniq_keys_.clear(); + uniq_fps_.clear(); - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - LockTag tag(lock_args.args[i]); - if (uniq_keys_.insert(string_view(tag)).second) { - lock_acquired &= lt.Acquire(tag, mode); + for (LockFp fp : lock_args.fps) { + if (uniq_fps_.insert(fp).second) { + lock_acquired &= lt.Acquire(fp, mode); } } } - DVLOG(2) << "Acquire " << IntentLock::ModeName(mode) << " for " << lock_args.args[0] + DVLOG(2) << "Acquire " << IntentLock::ModeName(mode) << " for " << lock_args.fps[0] << " has_acquired: " << lock_acquired; return lock_acquired; } -void DbSlice::ReleaseNormalized(IntentLock::Mode mode, DbIndex db_index, LockTag tag) { - DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " " - << " for " << string_view(tag); - - auto& lt = db_arr_[db_index]->trans_locks; - lt.Release(tag, mode); -} - void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { - if (lock_args.args.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands. + if (lock_args.fps.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands. return; } - DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0]; - if (lock_args.args.size() == 1) { - string_view key = lock_args.args.front(); - ReleaseNormalized(mode, lock_args.db_index, LockTag{key}); + DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.fps[0]; + auto& lt = db_arr_[lock_args.db_index]->trans_locks; + if (lock_args.fps.size() == 1) { + uint64_t fp = lock_args.fps.front(); + lt.Release(fp, mode); } else { - auto& lt = db_arr_[lock_args.db_index]->trans_locks; - uniq_keys_.clear(); - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - LockTag tag(lock_args.args[i]); - if (uniq_keys_.insert(string_view(tag)).second) { - lt.Release(tag, mode); + uniq_fps_.clear(); + for (LockFp fp : lock_args.fps) { + if (uniq_fps_.insert(fp).second) { + lt.Release(fp, mode); } } } - uniq_keys_.clear(); + uniq_fps_.clear(); } -bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, string_view key) const { +bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const { const auto& lt = db_arr_[dbid]->trans_locks; - LockTag tag(key); - - auto lock = lt.Find(tag); + auto lock = lt.Find(fp); if (lock) { return lock->Check(mode); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index ccbedd258..780a199b7 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -362,11 +362,13 @@ class DbSlice { void OnCbFinish(); bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args); - void Release(IntentLock::Mode m, const KeyLockArgs& lock_args); // Returns true if the key can be locked under m. Does not lock. - bool CheckLock(IntentLock::Mode m, DbIndex dbid, std::string_view key) const; + bool CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const; + bool CheckLock(IntentLock::Mode mode, DbIndex dbid, std::string_view key) const { + return CheckLock(mode, dbid, LockTag(key).Fingerprint()); + } size_t db_array_size() const { return db_arr_.size(); @@ -448,8 +450,8 @@ class DbSlice { } // Test hook to inspect last locked keys. - absl::flat_hash_set TEST_GetLastLockedKeys() const { - return uniq_keys_; + const auto& TEST_GetLastLockedFps() const { + return uniq_fps_; } void RegisterWatchedKey(DbIndex db_indx, std::string_view key, @@ -477,9 +479,6 @@ class DbSlice { void PerformDeletion(Iterator del_it, DbTable* table); void PerformDeletion(PrimeIterator del_it, DbTable* table); - // Releases a single tag. - void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, LockTag tag); - private: void PreUpdate(DbIndex db_ind, Iterator it); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); @@ -552,7 +551,7 @@ class DbSlice { DbTableArray db_arr_; // Used in temporary computations in Acquire/Release. - mutable absl::flat_hash_set uniq_keys_; + mutable absl::flat_hash_set uniq_fps_; // ordered from the smallest to largest version. std::vector> change_cb_; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 07a6d208c..6d28d3b42 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -174,18 +174,18 @@ class RoundRobinSharder { }; bool HasContendedLocks(ShardId shard_id, Transaction* trx, const DbTable* table) { - auto is_contended = [table](LockTag tag) { return table->trans_locks.Find(tag)->IsContended(); }; + auto is_contended = [table](LockFp fp) { return table->trans_locks.Find(fp)->IsContended(); }; if (trx->IsMulti()) { - auto keys = trx->GetMultiKeys(); - for (string_view key : keys) { - if (Shard(key, shard_set->size()) == shard_id && is_contended(LockTag{key})) + auto fps = trx->GetMultiFps(); + for (const auto& [sid, fp] : fps) { + if (sid == shard_id && is_contended(fp)) return true; } } else { KeyLockArgs lock_args = trx->GetLockArgs(shard_id); - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - if (is_contended(LockTag{lock_args.args[i]})) + for (size_t i = 0; i < lock_args.fps.size(); ++i) { + if (is_contended(lock_args.fps[i])) return true; } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0c0423f08..b35783800 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1810,7 +1810,7 @@ optional StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa trans->StartMultiGlobal(dbid); return true; case Transaction::LOCK_AHEAD: - trans->StartMultiLockedAhead(dbid, CmdArgVec{keys.begin(), keys.end()}); + trans->StartMultiLockedAhead(dbid, keys); return true; case Transaction::NON_ATOMIC: trans->StartMultiNonAtomic(); @@ -2087,9 +2087,10 @@ void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* case Transaction::GLOBAL: trans->StartMultiGlobal(dbid); break; - case Transaction::LOCK_AHEAD: - trans->StartMultiLockedAhead(dbid, CollectAllKeys(exec_info)); - break; + case Transaction::LOCK_AHEAD: { + auto vec = CollectAllKeys(exec_info); + trans->StartMultiLockedAhead(dbid, absl::MakeSpan(vec)); + } break; case Transaction::NON_ATOMIC: trans->StartMultiNonAtomic(); break; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f9b5ff118..a94277c69 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1088,6 +1088,8 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { if (sdata_res.has_value()) { size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; AppendMetricWithoutLabels("used_memory_rss_bytes", "", rss, MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("swap_memory_bytes", "", sdata_res->vm_swap, MetricType::GAUGE, + &resp->body()); } else { LOG_FIRST_N(ERROR, 10) << "Error fetching /proc/self/status stats. error " << sdata_res.error().message(); diff --git a/src/server/table.cc b/src/server/table.cc index 9b06b66c9..f0bd09476 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -65,16 +65,15 @@ std::optional LockTable::Find(LockTag tag) const { return std::nullopt; } -bool LockTable::Acquire(LockTag tag, IntentLock::Mode mode) { - LockFp fp = tag.Fingerprint(); - auto [it, inserted] = locks_.try_emplace(fp); - return it->second.Acquire(mode); +std::optional LockTable::Find(uint64_t fp) const { + if (auto it = locks_.find(fp); it != locks_.end()) + return it->second; + return std::nullopt; } -void LockTable::Release(LockTag tag, IntentLock::Mode mode) { - LockFp fp = tag.Fingerprint(); +void LockTable::Release(uint64_t fp, IntentLock::Mode mode) { auto it = locks_.find(fp); - DCHECK(it != locks_.end()) << string_view(tag); + DCHECK(it != locks_.end()) << fp; it->second.Release(mode); if (it->second.IsFree()) diff --git a/src/server/table.h b/src/server/table.h index 896dc1e14..ad9494e81 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -87,8 +87,11 @@ class LockTable { std::optional Find(LockTag tag) const; std::optional Find(LockFp fp) const; - bool Acquire(LockTag tag, IntentLock::Mode mode); - void Release(LockTag tag, IntentLock::Mode mode); + bool Acquire(LockFp fp, IntentLock::Mode mode) { + return locks_[fp].Acquire(mode); + } + + void Release(LockFp fp, IntentLock::Mode mode); auto begin() const { return locks_.cbegin(); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 062c691d6..3b6ad3b3d 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -628,9 +628,9 @@ vector BaseFamilyTest::StrArray(const RespExpr& expr) { return res; } -absl::flat_hash_set BaseFamilyTest::GetLastUsedKeys() { +vector BaseFamilyTest::GetLastFps() { fb2::Mutex mu; - absl::flat_hash_set result; + vector result; auto add_keys = [&](ProactorBase* proactor) { EngineShard* shard = EngineShard::tlocal(); @@ -639,8 +639,8 @@ absl::flat_hash_set BaseFamilyTest::GetLastUsedKeys() { } lock_guard lk(mu); - for (string_view key : shard->db_slice().TEST_GetLastLockedKeys()) { - result.insert(string(key)); + for (auto fp : shard->db_slice().TEST_GetLastLockedFps()) { + result.push_back(fp); } }; shard_set->pool()->AwaitFiberOnAll(add_keys); @@ -677,13 +677,15 @@ fb2::Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function& keys) { - absl::flat_hash_set own_keys; + vector key_fps; for (const auto& k : keys) { - own_keys.insert(string(k)); + key_fps.push_back(LockTag(k).Fingerprint()); } + sort(key_fps.begin(), key_fps.end()); auto cb = [=] { - auto last_keys = GetLastUsedKeys(); - return last_keys == own_keys; + auto last_fps = GetLastFps(); + sort(last_fps.begin(), last_fps.end()); + return last_fps == key_fps; }; return ExpectConditionWithSuspension(std::move(cb)); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 16c031c52..961d15e3d 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -144,7 +144,7 @@ class BaseFamilyTest : public ::testing::Test { const facade::Connection::InvalidationMessage& GetInvalidationMessage(std::string_view conn_id, size_t index) const; - static absl::flat_hash_set GetLastUsedKeys(); + static std::vector GetLastFps(); static void ExpectConditionWithinTimeout(const std::function& condition, absl::Duration timeout = absl::Seconds(10)); util::fb2::Fiber ExpectConditionWithSuspension(const std::function& condition); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index d4c57091d..5d8227ee7 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -83,9 +83,8 @@ uint16_t trans_id(const Transaction* ptr) { } bool CheckLocks(const DbSlice& db_slice, IntentLock::Mode mode, const KeyLockArgs& lock_args) { - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view s = lock_args.args[i]; - if (!db_slice.CheckLock(mode, lock_args.db_index, s)) + for (LockFp fp : lock_args.fps) { + if (!db_slice.CheckLock(mode, lock_args.db_index, fp)) return false; } return true; @@ -206,8 +205,9 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector shard_index, size_t num_args, bool rev_mapping) { kv_args_.reserve(num_args); + DCHECK(kv_fp_.empty()); + kv_fp_.reserve(num_args); + if (rev_mapping) reverse_index_.reserve(num_args); @@ -229,6 +232,8 @@ void Transaction::InitShardData(absl::Span shard_index, siz sd.arg_count = si.args.size(); sd.arg_start = kv_args_.size(); + sd.fp_start = kv_fp_.size(); + sd.fp_count = 0; // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. DCHECK_EQ(sd.local_mask & ACTIVE, 0); @@ -242,7 +247,12 @@ void Transaction::InitShardData(absl::Span shard_index, siz unique_shard_id_ = i; for (size_t j = 0; j < si.args.size(); ++j) { - kv_args_.push_back(si.args[j]); + string_view arg = si.args[j]; + kv_args_.push_back(arg); + if (si.key_step == 1 || j % si.key_step == 0) { + kv_fp_.push_back(LockTag(arg).Fingerprint()); + sd.fp_count++; + } if (rev_mapping) reverse_index_.push_back(si.original_index[j]); } @@ -251,38 +261,34 @@ void Transaction::InitShardData(absl::Span shard_index, siz DCHECK_EQ(kv_args_.size(), num_args); } -void Transaction::LaunderKeyStorage(CmdArgVec* keys) { +void Transaction::PrepareMultiFps(CmdArgList keys) { DCHECK_EQ(multi_->mode, LOCK_AHEAD); - DCHECK_GT(keys->size(), 0u); + DCHECK_GT(keys.size(), 0u); - auto& m_keys = multi_->frozen_keys; - auto& m_keys_set = multi_->frozen_keys_set; + auto& tag_fps = multi_->tag_fps; - // Reserve enough space, so pointers from frozen_keys_set are not invalidated - m_keys.reserve(keys->size()); - - for (MutableSlice key : *keys) { - string_view key_s = string_view(LockTag{facade::ToSV(key)}); - // Insert copied string view, not original. This is why "try insert" is not allowed - if (!m_keys_set.contains(key_s)) - m_keys_set.insert(m_keys.emplace_back(key_s)); + tag_fps.reserve(keys.size()); + for (MutableSlice key : keys) { + string_view sv = facade::ToSV(key); + ShardId sid = Shard(sv, shard_set->size()); + tag_fps.emplace(sid, LockTag(sv).Fingerprint()); } - - // Copy mutable pointers into keys - keys->clear(); - for (string& key : m_keys) - keys->emplace_back(key.data(), key.size()); } void Transaction::StoreKeysInArgs(const KeyIndex& key_index) { DCHECK(!key_index.bonus); DCHECK(key_index.step == 1u || key_index.step == 2u); + DCHECK(kv_fp_.empty()); // even for a single key we may have multiple arguments per key (MSET). for (unsigned j = key_index.start; j < key_index.end; j++) { - kv_args_.push_back(ArgS(full_args_, j)); - if (key_index.step == 2) + string_view arg = ArgS(full_args_, j); + kv_args_.push_back(arg); + kv_fp_.push_back(LockTag(arg).Fingerprint()); + + if (key_index.step == 2) { kv_args_.push_back(ArgS(full_args_, ++j)); + } } if (key_index.has_reverse_mapping) { @@ -339,7 +345,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { // Initialize shard data based on distributed arguments. InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping); - DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty()); + DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty()); DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front(); @@ -394,6 +400,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { DCHECK_EQ(unique_shard_cnt_, 0u); DCHECK(kv_args_.empty()); + DCHECK(kv_fp_.empty()); OpResult key_index = DetermineKeys(cid_, args); if (!key_index) @@ -442,7 +449,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { ScheduleInternal(); } -void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling) { +void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys, bool skip_scheduling) { DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys"; DCHECK(multi_); @@ -451,9 +458,9 @@ void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_ multi_->mode = LOCK_AHEAD; multi_->lock_mode = LockMode(); - LaunderKeyStorage(&keys); // Filter uniques and normalize + PrepareMultiFps(keys); - InitBase(dbid, absl::MakeSpan(keys)); + InitBase(dbid, keys); InitByKeys(KeyIndex::Range(0, keys.size())); if (!skip_scheduling) @@ -482,6 +489,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { unique_shard_cnt_ = 0; kv_args_.clear(); + kv_fp_.clear(); reverse_index_.clear(); cid_ = cid; @@ -632,7 +640,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // of the queue and notify the next one. if (auto* bcontroller = shard->blocking_controller(); bcontroller) { if (awaked_prerun || was_suspended) { - CHECK_EQ(largs.key_step, 1u); bcontroller->FinalizeWatched(GetShardArgs(idx), this); } @@ -796,12 +803,9 @@ void Transaction::UnlockMulti() { if ((coordinator_state_ & COORD_SCHED) == 0 || (coordinator_state_ & COORD_CONCLUDING) > 0) return; - multi_->frozen_keys_set.clear(); - - auto sharded_keys = make_shared>>(shard_set->size()); - for (string& key : multi_->frozen_keys) { - ShardId sid = Shard(key, sharded_keys->size()); - (*sharded_keys)[sid].emplace_back(key); + vector> sharded_keys(shard_set->size()); + for (const auto& [sid, fp] : multi_->tag_fps) { + sharded_keys[sid].emplace_back(fp); } multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; @@ -810,8 +814,9 @@ void Transaction::UnlockMulti() { DCHECK_EQ(shard_data_.size(), shard_set->size()); for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add(i, [this, sharded_keys, i]() { - this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal()); + vector fps = std::move(sharded_keys[i]); + shard_set->Add(i, [this, fps = std::move(fps)]() { + this->UnlockMultiShardCb(fps, EngineShard::tlocal()); intrusive_ptr_release(this); }); } @@ -935,9 +940,9 @@ void Transaction::Refurbish() { cb_ptr_ = nullptr; } -const absl::flat_hash_set& Transaction::GetMultiKeys() const { +const absl::flat_hash_set>& Transaction::GetMultiFps() const { DCHECK(multi_); - return multi_->frozen_keys_set; + return multi_->tag_fps; } void Transaction::FIX_ConcludeJournalExec() { @@ -1011,10 +1016,14 @@ optional Transaction::GetUniqueSlotId() const { KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { KeyLockArgs res; res.db_index = db_index_; - res.key_step = cid_->opt_mask() & CO::INTERLEAVED_KEYS ? 2 : 1; - res.args = GetShardArgs(sid); - DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL)); + if (unique_shard_cnt_ == 1) { + res.fps = {kv_fp_.data(), kv_fp_.size()}; + } else { + const auto& sd = shard_data_[sid]; + DCHECK_LE(sd.fp_start + sd.fp_count, kv_fp_.size()); + res.fps = {kv_fp_.data() + sd.fp_start, sd.fp_count}; + } return res; } @@ -1159,7 +1168,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) { } else { auto lock_args = GetLockArgs(shard->shard_id()); DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); - DCHECK(!lock_args.args.empty()); + DCHECK(!lock_args.fps.empty()); shard->db_slice().Release(LockMode(), lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } @@ -1296,8 +1305,7 @@ void Transaction::MultiReportJournalOnShard(EngineShard* shard) const { } } -void Transaction::UnlockMultiShardCb(absl::Span sharded_keys, - EngineShard* shard) { +void Transaction::UnlockMultiShardCb(absl::Span fps, EngineShard* shard) { DCHECK(multi_ && multi_->lock_mode); MultiReportJournalOnShard(shard); @@ -1305,9 +1313,7 @@ void Transaction::UnlockMultiShardCb(absl::Span sharded_ if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); } else { - for (const auto& key : sharded_keys) { - shard->db_slice().ReleaseNormalized(*multi_->lock_mode, db_index_, LockTag{key}); - } + shard->db_slice().Release(*multi_->lock_mode, KeyLockArgs{db_index_, fps}); } ShardId sid = shard->shard_id(); diff --git a/src/server/transaction.h b/src/server/transaction.h index ab6926598..c22bb4763 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -223,7 +223,7 @@ class Transaction { void StartMultiGlobal(DbIndex dbid); // Start multi in LOCK_AHEAD mode with given keys. - void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling = false); + void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys, bool skip_scheduling = false); // Start multi in NON_ATOMIC mode. void StartMultiNonAtomic(); @@ -339,7 +339,7 @@ class Transaction { void Refurbish(); // Get keys multi transaction was initialized with, normalized and unique - const absl::flat_hash_set& GetMultiKeys() const; + const absl::flat_hash_set>& GetMultiFps() const; // Send journal EXEC opcode after a series of MULTI commands on the currently active shard void FIX_ConcludeJournalExec(); @@ -389,6 +389,10 @@ class Transaction { uint32_t arg_start = 0; // Subspan in kv_args_ with local arguments. uint32_t arg_count = 0; + // span into kv_fp_ + uint32_t fp_start = 0; + uint32_t fp_count = 0; + // Position in the tx queue. OOO or cancelled schedules remove themselves by this index. TxQueue::Iterator pq_pos = TxQueue::kEnd; @@ -401,7 +405,7 @@ class Transaction { } stats; // Prevent "false sharing" between cache lines: occupy a full cache line (64 bytes) - char pad[64 - 5 * sizeof(uint32_t) - sizeof(Stats)]; + char pad[64 - 7 * sizeof(uint32_t) - sizeof(Stats)]; }; static_assert(sizeof(PerShardData) == 64); // cacheline @@ -412,9 +416,8 @@ class Transaction { MultiMode mode; std::optional lock_mode; - // Unique normalized keys used for scheduling the multi transaction. - std::vector frozen_keys; - absl::flat_hash_set frozen_keys_set; // point to frozen_keys + // Unique normalized fingerprints used for scheduling the multi transaction. + absl::flat_hash_set> tag_fps; // Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING bool concluding = false; @@ -439,6 +442,7 @@ class Transaction { struct PerShardCache { std::vector args; std::vector original_index; + unsigned key_step = 1; void Clear() { args.clear(); @@ -487,9 +491,8 @@ class Transaction { // Store all key index keys in args_. Used only for single shard initialization. void StoreKeysInArgs(const KeyIndex& key_index); - // Multi transactions unlock asynchronously, so they need to keep a copy of all they keys. - // "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction. - void LaunderKeyStorage(CmdArgVec* keys); + // Multi transactions unlock asynchronously, so they need to keep fingerprints of keys. + void PrepareMultiFps(CmdArgList keys); void ScheduleInternal(); @@ -526,7 +529,7 @@ class Transaction { // If journaling is enabled, report final exec opcode to finish the chain of commands. void MultiReportJournalOnShard(EngineShard* shard) const; - void UnlockMultiShardCb(absl::Span sharded_keys, EngineShard* shard); + void UnlockMultiShardCb(absl::Span fps, EngineShard* shard); // In a multi-command transaction, we determine the number of shard journals that we wrote entries // to by updating the shard_journal_write vector during command execution. The total number of @@ -588,6 +591,9 @@ class Transaction { // to what key. absl::InlinedVector kv_args_; + // Fingerprints of keys, precomputed once during the transaction initialization. + absl::InlinedVector kv_fp_; + // Stores the full undivided command. CmdArgList full_args_;