From c1e9d510a3786d97f6d9d09362d41bd418affe49 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Tue, 8 Oct 2024 14:25:36 +0300 Subject: [PATCH] chore: lock keys for optimistic transactions (#3865) We do not acquire any locks for transactions that are executing optimistically. However, this is problematic for callbacks that need to preempt (e.g. because a journal is active). --------- Signed-off-by: kostas --- src/server/transaction.cc | 47 +++++++++++++++++++-------------------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ef6cf8a3f..2fe35e740 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -83,14 +83,6 @@ uint16_t trans_id(const Transaction* ptr) { return (intptr_t(ptr) >> 8) & 0xFFFF; } -bool CheckLocks(const DbSlice& db_slice, IntentLock::Mode mode, const KeyLockArgs& lock_args) { - for (LockFp fp : lock_args.fps) { - if (!db_slice.CheckLock(mode, lock_args.db_index, fp)) - return false; - } - return true; -} - struct ScheduleContext { Transaction* trans; bool optimistic_execution = false; @@ -1052,25 +1044,18 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { if (txid_ > 0 && shard->committed_txid() >= txid_) return false; + auto release_fp_locks = [&]() { + GetDbSlice(shard->shard_id()).Release(mode, lock_args); + sd.local_mask &= ~KEYLOCK_ACQUIRED; + }; + // Acquire intent locks. Intent locks are always acquired, even if already locked by others. if (!IsGlobal()) { lock_args = GetLockArgs(shard->shard_id()); bool shard_unlocked = shard->shard_lock()->Check(mode); - // Check if we can run immediately - if (shard_unlocked && execute_optimistic && - CheckLocks(GetDbSlice(shard->shard_id()), mode, lock_args)) { - sd.local_mask |= OPTIMISTIC_EXECUTION; - shard->stats().tx_optimistic_total++; - - RunCallback(shard); - - // Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag. - // Only possible for single shard. - if (coordinator_state_ & COORD_CONCLUDING) - return true; - } - + // We need to acquire the fp locks because the executing callback + // within RunCallback below might preempt. bool keys_unlocked = GetDbSlice(shard->shard_id()).Acquire(mode, lock_args); lock_granted = shard_unlocked && keys_unlocked; @@ -1080,6 +1065,21 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { } DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId(); + + // Check if we can run immediately + if (shard_unlocked && execute_optimistic && lock_granted) { + sd.local_mask |= OPTIMISTIC_EXECUTION; + shard->stats().tx_optimistic_total++; + + RunCallback(shard); + + // Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag. + // Only possible for single shard. + if (coordinator_state_ & COORD_CONCLUDING) { + release_fp_locks(); + return true; + } + } } // Single shard operations might have delayed acquiring txid unless neccessary. @@ -1095,8 +1095,7 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { // fail this scheduling attempt for trans. if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) { if (sd.local_mask & KEYLOCK_ACQUIRED) { - GetDbSlice(shard->shard_id()).Release(mode, lock_args); - sd.local_mask &= ~KEYLOCK_ACQUIRED; + release_fp_locks(); } return false; }