From 1fb0a486aca6a571d07b559eafbb87c05296295c Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 31 Dec 2023 17:02:12 +0200 Subject: [PATCH] chore: transaction simplification (#2347) chore: simplify transaction multi-locking Also, add the ananlysis routine that determines whether the schewduled transaction is contended with other transaction in a shard thread. Signed-off-by: Roman Gershman --- src/server/db_slice.cc | 5 -- src/server/db_slice.h | 10 ++-- src/server/engine_shard_set.cc | 83 ++++++++++------------------ src/server/multi_test.cc | 3 +- src/server/transaction.cc | 98 +++++++++++++++------------------- src/server/transaction.h | 19 ++++--- 6 files changed, 87 insertions(+), 131 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index c4f75e6c8..b3b18d931 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -941,11 +941,6 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { return lock_acquired; } -void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, std::string_view key, - unsigned count) { - return ReleaseNormalized(mode, db_index, KeyLockArgs::GetLockKey(key), count); -} - void DbSlice::ReleaseNormalized(IntentLock::Mode mode, DbIndex db_index, std::string_view key, unsigned count) { DCHECK_EQ(key, KeyLockArgs::GetLockKey(key)); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 502bbf99d..21016458e 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -283,8 +283,6 @@ class DbSlice { void Release(IntentLock::Mode m, const KeyLockArgs& lock_args); - void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count); - // Returns true if the key can be locked under m. Does not lock. bool CheckLock(IntentLock::Mode m, DbIndex dbid, std::string_view key) const; @@ -391,14 +389,14 @@ class DbSlice { // Delete a key referred by its iterator. void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table); - private: - void PreUpdate(DbIndex db_ind, PrimeIterator it); - void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size); - // Releases a single key. `key` must have been normalized by GetLockKey(). void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count); + private: + void PreUpdate(DbIndex db_ind, PrimeIterator it); + void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size); + AddOrFindResult AddOrUpdateInternal(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms, bool force_update) noexcept(false); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 4c3122e44..577c9a9e9 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -158,6 +158,34 @@ class RoundRobinSharder { static Mutex mutex_; }; +bool HasContendedLocks(unsigned shard_id, Transaction* trx, DbTable* table) { + bool has_contended_locks = false; + + if (trx->IsMulti()) { + trx->IterateMultiLocks(shard_id, [&](const string& key) { + auto it = table->trans_locks.find(key); + DCHECK(it != table->trans_locks.end()); + if (it->second.IsContended()) { + has_contended_locks = true; + } + }); + } else { + KeyLockArgs lock_args = trx->GetLockArgs(shard_id); + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); + auto it = table->trans_locks.find(s); + DCHECK(it != table->trans_locks.end()); + if (it != table->trans_locks.end()) { + if (it->second.IsContended()) { + has_contended_locks = true; + break; + } + } + } + } + return has_contended_locks; +} + thread_local string RoundRobinSharder::round_robin_prefix_; thread_local vector RoundRobinSharder::round_robin_shards_tl_cache_; vector RoundRobinSharder::round_robin_shards_; @@ -533,44 +561,6 @@ void EngineShard::RemoveContTx(Transaction* tx) { } } -#if 0 -// There are several cases that contain proof of convergence for this shard: -// 1. txq_ empty - it means that anything that is goonna be scheduled will already be scheduled -// with txid > notifyid. -// 2. committed_txid_ > notifyid - similarly, this shard can not affect the result with timestamp -// notifyid. -// 3. committed_txid_ == notifyid, then if a transaction in progress (continuation_trans_ != NULL) -// the this transaction can still affect the result, hence we require continuation_trans_ is null -// which will point to converged result @notifyid. However, we never awake a transaction -// when there is a multi-hop transaction in progress to avoid false positives. -// Therefore, continuation_trans_ must always be null when calling this function. -// 4. Finally with committed_txid_ < notifyid. -// we can check if the next in line (HeadScore) is after notifyid in that case we can also -// conclude regarding the result convergence for this shard. -// -bool EngineShard::HasResultConverged(TxId notifyid) const { - CHECK(continuation_trans_ == nullptr); - - if (committed_txid_ >= notifyid) - return true; - - // This could happen if a single lpush (not in transaction) woke multi-shard blpop. - DVLOG(1) << "HasResultConverged: cmtxid - " << committed_txid_ << " vs " << notifyid; - - // We must check for txq head - it's not an optimization - we need it for correctness. - // If a multi-transaction has been scheduled and it does not have any presence in - // this shard (no actual keys) and we won't check for it HasResultConverged will - // return false. The blocked transaction will wait for this shard to progress and - // will also block other shards from progressing (where it has been notified). - // If this multi-transaction has presence in those shards, it won't progress there as well. - // Therefore, we will get a deadlock. By checking txid of the head we will avoid this situation: - // if the head.txid is after notifyid then this shard obviously converged. - // if the head.txid <= notifyid that transaction will be able to progress in other shards. - // and we must wait for it to finish. - return txq_.Empty() || txq_.HeadScore() > notifyid; -} -#endif - void EngineShard::Heartbeat() { CacheStats(); @@ -736,22 +726,7 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { info.tx_global++; } else { DbTable* table = db_slice().GetDBTable(trx->GetDbIndex()); - bool can_run = true; - - if (!trx->IsMulti()) { - KeyLockArgs lock_args = trx->GetLockArgs(sid); - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); - auto it = table->trans_locks.find(s); - DCHECK(it != table->trans_locks.end()); - if (it != table->trans_locks.end()) { - if (it->second.IsContended()) { - can_run = false; - break; - } - } - } - } + bool can_run = !HasContendedLocks(sid, trx, table); if (can_run) { info.tx_runnable++; } diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 2a2c41ddc..aa9d2dae4 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -904,7 +904,8 @@ TEST_F(MultiTest, TestLockedKeys) { EXPECT_EQ(Run({"multi"}), "OK"); EXPECT_EQ(Run({"set", "key1", "val1"}), "QUEUED"); EXPECT_EQ(Run({"set", "key2", "val2"}), "QUEUED"); - EXPECT_THAT(Run({"exec"}), RespArray(ElementsAre("OK", "OK"))); + EXPECT_EQ(Run({"mset", "key1", "val3", "key1", "val4"}), "QUEUED"); + EXPECT_THAT(Run({"exec"}), RespArray(ElementsAre("OK", "OK", "OK"))); fb.Join(); EXPECT_FALSE(service_->IsLocked(0, "key1")); EXPECT_FALSE(service_->IsLocked(0, "key2")); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f069c98d4..3e35e8b5e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -90,7 +90,7 @@ void Transaction::InitGlobal() { EnableAllShards(); } -void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, +void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping, std::vector* out) { auto args = full_args_; @@ -157,38 +157,23 @@ void Transaction::InitShardData(absl::Span shard_index, siz CHECK_EQ(args_.size(), num_args); } -void Transaction::InitMultiData(KeyIndex key_index) { +void Transaction::RecordMultiLocks(const KeyIndex& key_index) { DCHECK(multi_); + DCHECK(!multi_->lock_mode); if (multi_->mode == NON_ATOMIC) return; - IntentLock::Mode mode = Mode(); + auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); }; - auto& tmp_uniques = tmp_space.uniq_keys; - - auto lock_key = [this, mode, &tmp_uniques](string_view key) { - if (auto [_, inserted] = tmp_uniques.insert(KeyLockArgs::GetLockKey(key)); !inserted) - return; - - multi_->lock_counts[key][mode]++; - }; - - // With EVAL, we call this function for EVAL itself as well as for each command - // for eval. currently, we lock everything only during the eval call. - if (!multi_->locks_recorded) { - tmp_uniques.clear(); - - for (size_t i = key_index.start; i < key_index.end; i += key_index.step) - lock_key(ArgS(full_args_, i)); - if (key_index.bonus) - lock_key(ArgS(full_args_, *key_index.bonus)); - - multi_->locks_recorded = true; - } + multi_->lock_mode.emplace(Mode()); + for (size_t i = key_index.start; i < key_index.end; i += key_index.step) + lock_key(ArgS(full_args_, i)); + if (key_index.bonus) + lock_key(ArgS(full_args_, *key_index.bonus)); DCHECK(IsAtomicMulti()); - DCHECK(multi_->mode == GLOBAL || !multi_->lock_counts.empty()); + DCHECK(multi_->mode == GLOBAL || !multi_->locks.empty()); } void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { @@ -230,15 +215,13 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { * **/ -void Transaction::InitByKeys(KeyIndex key_index) { - auto args = full_args_; - - if (key_index.start == args.size()) { // eval with 0 keys. +void Transaction::InitByKeys(const KeyIndex& key_index) { + if (key_index.start == full_args_.size()) { // eval with 0 keys. CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name(); return; } - DCHECK_LT(key_index.start, args.size()); + DCHECK_LT(key_index.start, full_args_.size()); bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; @@ -265,7 +248,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { shard_data_.resize(shard_set->size()); // shard_data isn't sparse, so we must allocate for all :( DCHECK(key_index.step == 1 || key_index.step == 2); - DCHECK(key_index.step != 2 || (args.size() % 2) == 0); + DCHECK(key_index.step != 2 || (full_args_.size() % 2) == 0); // Safe, because flow below is not preemptive. auto& shard_index = tmp_space.GetShardIndex(shard_data_.size()); @@ -276,8 +259,8 @@ void Transaction::InitByKeys(KeyIndex key_index) { // Initialize shard data based on distributed arguments. InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping); - if (multi_) - InitMultiData(key_index); + if (multi_ && !multi_->lock_mode) + RecordMultiLocks(key_index); DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front(); @@ -298,7 +281,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { // Validation. Check reverse mapping was built correctly. if (needs_reverse_mapping) { for (size_t i = 0; i < args_.size(); ++i) { - DCHECK_EQ(args_[i], ArgS(args, reverse_index_[i])) << args; + DCHECK_EQ(args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; } } @@ -373,7 +356,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { multi_->mode = GLOBAL; InitBase(dbid, {}); InitGlobal(); - multi_->locks_recorded = true; + multi_->lock_mode = IntentLock::EXCLUSIVE; ScheduleInternal(); } @@ -519,7 +502,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // is concluding. bool remove_txq = tx_stop_runnig || !txq_ooo; if (remove_txq && sd.pq_pos != TxQueue::kEnd) { - VLOG(2) << "Remove from txq" << this->DebugId(); + VLOG(2) << "Remove from txq " << this->DebugId(); shard->txq()->Remove(sd.pq_pos); sd.pq_pos = TxQueue::kEnd; } @@ -782,10 +765,10 @@ void Transaction::UnlockMulti() { return; auto sharded_keys = make_shared>(shard_set->size()); - while (!multi_->lock_counts.empty()) { - auto entry = multi_->lock_counts.extract(multi_->lock_counts.begin()); - ShardId sid = Shard(entry.key(), sharded_keys->size()); - (*sharded_keys)[sid].emplace_back(std::move(entry.key()), entry.mapped()); + while (!multi_->locks.empty()) { + auto entry = multi_->locks.extract(multi_->locks.begin()); + ShardId sid = Shard(entry.value(), sharded_keys->size()); + (*sharded_keys)[sid].emplace_back(std::move(entry.value())); } unsigned shard_journals_cnt = @@ -796,8 +779,8 @@ void Transaction::UnlockMulti() { use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add(i, [this, sharded_keys, shard_journals_cnt]() { - this->UnlockMultiShardCb(*sharded_keys, EngineShard::tlocal(), shard_journals_cnt); + shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() { + this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt); intrusive_ptr_release(this); }); } @@ -857,7 +840,7 @@ void Transaction::ExecuteAsync() { DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); - DCHECK(!IsAtomicMulti() || multi_->locks_recorded); + DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be // executed by the engine shard once it has been armed and coordinator thread will finish the @@ -941,6 +924,16 @@ void Transaction::Refurbish() { cb_ptr_ = nullptr; } +void Transaction::IterateMultiLocks(ShardId sid, std::function cb) const { + unsigned shard_num = shard_set->size(); + for (const auto& key : multi_->locks) { + ShardId key_sid = Shard(key, shard_num); + if (key_sid == sid) { + cb(key); + } + } +} + void Transaction::EnableShard(ShardId sid) { unique_shard_cnt_ = 1; unique_shard_id_ = sid; @@ -1290,8 +1283,10 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { return status; } -void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard, +void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard, uint32_t shard_journals_cnt) { + DCHECK(multi_ && multi_->lock_mode); + auto journal = shard->journal(); if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()]) { @@ -1301,20 +1296,13 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); } else { - ShardId sid = shard->shard_id(); - for (const auto& k_v : sharded_keys[sid]) { - auto release = [&](IntentLock::Mode mode) { - if (k_v.second[mode]) { - shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]); - } - }; - - release(IntentLock::SHARED); - release(IntentLock::EXCLUSIVE); + for (const auto& key : sharded_keys) { + shard->db_slice().ReleaseNormalized(*multi_->lock_mode, db_index_, key, 1); } } - auto& sd = shard_data_[SidToId(shard->shard_id())]; + ShardId sid = shard->shard_id(); + auto& sd = shard_data_[SidToId(sid)]; sd.local_mask |= UNLOCK_MULTI; // It does not have to be that all shards in multi transaction execute this tx. diff --git a/src/server/transaction.h b/src/server/transaction.h index 6ff5f75c4..403ab127d 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -325,6 +325,8 @@ class Transaction { void Refurbish(); + void IterateMultiLocks(ShardId sid, std::function cb) const; + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { @@ -341,7 +343,7 @@ class Transaction { }; // owned std::string because callbacks its used in run fully async and can outlive the entries. - using KeyList = std::vector>; + using KeyList = std::vector; struct alignas(64) PerShardData { PerShardData(PerShardData&&) noexcept { @@ -377,15 +379,14 @@ class Transaction { MultiRole role; MultiMode mode; - absl::flat_hash_map lock_counts; + std::optional lock_mode; + absl::flat_hash_set locks; // The shard_journal_write vector variable is used to determine the number of shards // involved in a multi-command transaction. This information is utilized by replicas when // executing multi-command. For every write to a shard journal, the corresponding index in the // vector is marked as true. absl::InlinedVector shard_journal_write; - - bool locks_recorded = false; }; enum CoordinatorState : uint8_t { @@ -416,20 +417,20 @@ class Transaction { void InitGlobal(); // Init with a set of keys. - void InitByKeys(KeyIndex keys); + void InitByKeys(const KeyIndex& keys); void EnableShard(ShardId sid); void EnableAllShards(); // Build shard index by distributing the arguments by shards based on the key index. - void BuildShardIndex(KeyIndex keys, bool rev_mapping, std::vector* out); + void BuildShardIndex(const KeyIndex& keys, bool rev_mapping, std::vector* out); // Init shard data from shard index. void InitShardData(absl::Span shard_index, size_t num_args, bool rev_mapping); // Init multi. Record locks if needed. - void InitMultiData(KeyIndex keys); + void RecordMultiLocks(const KeyIndex& keys); // Store all key index keys in args_. Used only for single shard initialization. void StoreKeysInArgs(KeyIndex keys, bool rev_mapping); @@ -467,7 +468,7 @@ class Transaction { // Run callback inline as part of multi stub. OpStatus RunSquashedMultiCb(RunnableType cb); - void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard, + void UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard, uint32_t shard_journals_cnt); // In a multi-command transaction, we determine the number of shard journals that we wrote entries @@ -585,8 +586,6 @@ class Transaction { private: struct TLTmpSpace { - absl::flat_hash_set uniq_keys; - std::vector& GetShardIndex(unsigned size); private: