From 675b3889a49961f9ed0d8d20501d98d86268cdec Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sat, 27 Jan 2024 13:24:42 +0300 Subject: [PATCH] chore(transaction): Launder copied keys in multi transactions (#2478) * chore(transaction): Launder copied keys in multi transactions --------- Signed-off-by: Vladislav Oleshko --- src/server/conn_context.cc | 2 +- src/server/engine_shard_set.cc | 32 ++++++++--------- src/server/main_service.cc | 14 ++++---- src/server/transaction.cc | 63 ++++++++++++++++++---------------- src/server/transaction.h | 18 ++++++---- 5 files changed, 68 insertions(+), 61 deletions(-) diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 9fc161b89..0eb1d0407 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -234,7 +234,7 @@ size_t ConnectionState::ExecInfo::UsedMemory() const { } size_t ConnectionState::ScriptInfo::UsedMemory() const { - return dfly::HeapSize(keys); + return dfly::HeapSize(keys) + async_cmds_heap_mem; } size_t ConnectionState::SubscribeInfo::UsedMemory() const { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index e6df641d9..03c291c3e 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -166,31 +166,27 @@ class RoundRobinSharder { }; bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) { - bool has_contended_locks = false; + auto is_contended = [table](string_view key) { + auto it = table->trans_locks.find(key); + DCHECK(it != table->trans_locks.end()); + return it->second.IsContended(); + }; if (trx->IsMulti()) { - trx->IterateMultiLocks(shard_id, [&](const string& key) { - auto it = table->trans_locks.find(key); - DCHECK(it != table->trans_locks.end()); - if (it->second.IsContended()) { - has_contended_locks = true; - } - }); + auto keys = trx->GetMultiKeys(); + for (string_view key : keys) { + if (Shard(key, shard_set->size()) == shard_id && is_contended(key)) + return true; + } } else { KeyLockArgs lock_args = trx->GetLockArgs(shard_id); for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); - auto it = table->trans_locks.find(s); - DCHECK(it != table->trans_locks.end()); - if (it != table->trans_locks.end()) { - if (it->second.IsContended()) { - has_contended_locks = true; - break; - } - } + if (is_contended(KeyLockArgs::GetLockKey(lock_args.args[i]))) + return true; } } - return has_contended_locks; + + return false; } thread_local string RoundRobinSharder::round_robin_prefix_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cf21add4d..bd1f3b3c1 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -902,17 +902,21 @@ OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const C if (!key_index_res) return key_index_res.status(); + // TODO: Switch to transaction internal locked keys once single hop multi transactions are merged + // const auto& locked_keys = trans->GetMultiKeys(); + const auto& locked_keys = eval_info.keys; + const auto& key_index = *key_index_res; for (unsigned i = key_index.start; i < key_index.end; ++i) { string_view key = KeyLockArgs::GetLockKey(ArgS(args, i)); - if (!eval_info.keys.contains(key)) { + if (!locked_keys.contains(key)) { VLOG(1) << "Key " << key << " is not declared for command " << cid->name(); return OpStatus::KEY_NOTFOUND; } } if (key_index.bonus && - !eval_info.keys.contains(KeyLockArgs::GetLockKey(ArgS(args, *key_index.bonus)))) + !locked_keys.contains(KeyLockArgs::GetLockKey(ArgS(args, *key_index.bonus)))) return OpStatus::KEY_NOTFOUND; return OpStatus::OK; @@ -1714,7 +1718,7 @@ optional StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa trans->StartMultiGlobal(dbid); return true; case Transaction::LOCK_AHEAD: - trans->StartMultiLockedAhead(dbid, keys); + trans->StartMultiLockedAhead(dbid, CmdArgVec{keys.begin(), keys.end()}); return true; case Transaction::NON_ATOMIC: trans->StartMultiNonAtomic(); @@ -1985,14 +1989,12 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) { // Return true if transaction was scheduled, false if scheduling was not required. void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info, Transaction::MultiMode multi_mode) { - CmdArgVec tmp_keys; switch (multi_mode) { case Transaction::GLOBAL: trans->StartMultiGlobal(dbid); break; case Transaction::LOCK_AHEAD: - tmp_keys = CollectAllKeys(exec_info); - trans->StartMultiLockedAhead(dbid, CmdArgList{tmp_keys}); + trans->StartMultiLockedAhead(dbid, CollectAllKeys(exec_info)); break; case Transaction::NON_ATOMIC: trans->StartMultiNonAtomic(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index d2e0c7e09..1c6558b60 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -204,23 +204,27 @@ void Transaction::InitShardData(absl::Span shard_index, siz CHECK_EQ(args_.size(), num_args); } -void Transaction::RecordMultiLocks(const KeyIndex& key_index) { - DCHECK(multi_); - DCHECK(!multi_->lock_mode); +void Transaction::LaunderKeyStorage(CmdArgVec* keys) { + DCHECK_EQ(multi_->mode, LOCK_AHEAD); + DCHECK_GT(keys->size(), 0u); - if (multi_->mode == NON_ATOMIC) - return; + auto& m_keys = multi_->frozen_keys; + auto& m_keys_set = multi_->frozen_keys_set; - auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); }; + // Reserve enough space, so pointers from frozen_keys_set are not invalidated + m_keys.reserve(keys->size()); - multi_->lock_mode.emplace(LockMode()); - for (size_t i = key_index.start; i < key_index.end; i += key_index.step) - lock_key(ArgS(full_args_, i)); - if (key_index.bonus) - lock_key(ArgS(full_args_, *key_index.bonus)); + for (MutableSlice key : *keys) { + string_view key_s = KeyLockArgs::GetLockKey(facade::ToSV(key)); + // Insert copied string view, not original. This is why "try insert" is not allowed + if (!m_keys_set.contains(key_s)) + m_keys_set.insert(m_keys.emplace_back(key_s)); + } - DCHECK(IsAtomicMulti()); - DCHECK(multi_->mode == GLOBAL || !multi_->locks.empty()); + // Copy mutable pointers into keys + keys->clear(); + for (string& key : m_keys) + keys->emplace_back(key.data(), key.size()); } void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { @@ -308,8 +312,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { // Initialize shard data based on distributed arguments. InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping); - if (multi_ && !multi_->lock_mode) - RecordMultiLocks(key_index); + DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty()); DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front(); @@ -411,17 +414,23 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { ScheduleInternal(); } -void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) { +void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) { DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys"; DCHECK(multi_); DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. multi_->mode = LOCK_AHEAD; - InitBase(dbid, keys); + multi_->lock_mode = LockMode(); + + LaunderKeyStorage(&keys); // Filter uniques and normalize + + InitBase(dbid, absl::MakeSpan(keys)); InitByKeys(KeyIndex::Range(0, keys.size())); ScheduleInternal(); + + full_args_ = {nullptr, 0}; // InitBase set it to temporary keys, now we reset it. } void Transaction::StartMultiNonAtomic() { @@ -794,11 +803,12 @@ void Transaction::UnlockMulti() { if (multi_->mode == NON_ATOMIC) return; + multi_->frozen_keys_set.clear(); + auto sharded_keys = make_shared>(shard_set->size()); - while (!multi_->locks.empty()) { - auto entry = multi_->locks.extract(multi_->locks.begin()); - ShardId sid = Shard(entry.value(), sharded_keys->size()); - (*sharded_keys)[sid].emplace_back(std::move(entry.value())); + for (string& key : multi_->frozen_keys) { + ShardId sid = Shard(key, sharded_keys->size()); + (*sharded_keys)[sid].emplace_back(std::move(key)); } unsigned shard_journals_cnt = @@ -922,14 +932,9 @@ void Transaction::Refurbish() { cb_ptr_ = nullptr; } -void Transaction::IterateMultiLocks(ShardId sid, std::function cb) const { - unsigned shard_num = shard_set->size(); - for (const auto& key : multi_->locks) { - ShardId key_sid = Shard(key, shard_num); - if (key_sid == sid) { - cb(key); - } - } +const absl::flat_hash_set& Transaction::GetMultiKeys() const { + DCHECK(multi_); + return multi_->frozen_keys_set; } void Transaction::EnableShard(ShardId sid) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 67d14d6d4..128a4ac80 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -233,7 +233,7 @@ class Transaction { void StartMultiGlobal(DbIndex dbid); // Start multi in LOCK_AHEAD mode with given keys. - void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys); + void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys); // Start multi in NON_ATOMIC mode. void StartMultiNonAtomic(); @@ -344,7 +344,8 @@ class Transaction { void Refurbish(); - void IterateMultiLocks(ShardId sid, std::function cb) const; + // Get keys multi transaction was initialized with, normalized and unique + const absl::flat_hash_set& GetMultiKeys() const; private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. @@ -397,9 +398,11 @@ class Transaction { struct MultiData { MultiRole role; MultiMode mode; - std::optional lock_mode; - absl::flat_hash_set locks; + + // Unique normalized keys used for scheduling the multi transaction. + std::vector frozen_keys; + absl::flat_hash_set frozen_keys_set; // point to frozen_keys // Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING bool concluding = false; @@ -449,12 +452,13 @@ class Transaction { void InitShardData(absl::Span shard_index, size_t num_args, bool rev_mapping); - // Init multi. Record locks if needed. - void RecordMultiLocks(const KeyIndex& keys); - // Store all key index keys in args_. Used only for single shard initialization. void StoreKeysInArgs(KeyIndex keys, bool rev_mapping); + // Multi transactions unlock asynchronously, so they need to keep a copy of all they keys. + // "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction. + void LaunderKeyStorage(CmdArgVec* keys); + // Generic schedule used from Schedule() and ScheduleSingleHop() on slow path. void ScheduleInternal();