mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: lock keys for optimistic transactions (#3865)
We do not acquire any locks for transactions that are executing optimistically. However, this is problematic for callbacks that need to preempt (e.g. because a journal is active). --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
dac1b0f3ca
commit
c1e9d510a3
1 changed files with 23 additions and 24 deletions
|
@ -83,14 +83,6 @@ uint16_t trans_id(const Transaction* ptr) {
|
||||||
return (intptr_t(ptr) >> 8) & 0xFFFF;
|
return (intptr_t(ptr) >> 8) & 0xFFFF;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool CheckLocks(const DbSlice& db_slice, IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
|
||||||
for (LockFp fp : lock_args.fps) {
|
|
||||||
if (!db_slice.CheckLock(mode, lock_args.db_index, fp))
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
struct ScheduleContext {
|
struct ScheduleContext {
|
||||||
Transaction* trans;
|
Transaction* trans;
|
||||||
bool optimistic_execution = false;
|
bool optimistic_execution = false;
|
||||||
|
@ -1052,25 +1044,18 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
|
||||||
if (txid_ > 0 && shard->committed_txid() >= txid_)
|
if (txid_ > 0 && shard->committed_txid() >= txid_)
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
|
auto release_fp_locks = [&]() {
|
||||||
|
GetDbSlice(shard->shard_id()).Release(mode, lock_args);
|
||||||
|
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||||
|
};
|
||||||
|
|
||||||
// Acquire intent locks. Intent locks are always acquired, even if already locked by others.
|
// Acquire intent locks. Intent locks are always acquired, even if already locked by others.
|
||||||
if (!IsGlobal()) {
|
if (!IsGlobal()) {
|
||||||
lock_args = GetLockArgs(shard->shard_id());
|
lock_args = GetLockArgs(shard->shard_id());
|
||||||
bool shard_unlocked = shard->shard_lock()->Check(mode);
|
bool shard_unlocked = shard->shard_lock()->Check(mode);
|
||||||
|
|
||||||
// Check if we can run immediately
|
// We need to acquire the fp locks because the executing callback
|
||||||
if (shard_unlocked && execute_optimistic &&
|
// within RunCallback below might preempt.
|
||||||
CheckLocks(GetDbSlice(shard->shard_id()), mode, lock_args)) {
|
|
||||||
sd.local_mask |= OPTIMISTIC_EXECUTION;
|
|
||||||
shard->stats().tx_optimistic_total++;
|
|
||||||
|
|
||||||
RunCallback(shard);
|
|
||||||
|
|
||||||
// Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag.
|
|
||||||
// Only possible for single shard.
|
|
||||||
if (coordinator_state_ & COORD_CONCLUDING)
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool keys_unlocked = GetDbSlice(shard->shard_id()).Acquire(mode, lock_args);
|
bool keys_unlocked = GetDbSlice(shard->shard_id()).Acquire(mode, lock_args);
|
||||||
lock_granted = shard_unlocked && keys_unlocked;
|
lock_granted = shard_unlocked && keys_unlocked;
|
||||||
|
|
||||||
|
@ -1080,6 +1065,21 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
|
||||||
}
|
}
|
||||||
|
|
||||||
DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
||||||
|
|
||||||
|
// Check if we can run immediately
|
||||||
|
if (shard_unlocked && execute_optimistic && lock_granted) {
|
||||||
|
sd.local_mask |= OPTIMISTIC_EXECUTION;
|
||||||
|
shard->stats().tx_optimistic_total++;
|
||||||
|
|
||||||
|
RunCallback(shard);
|
||||||
|
|
||||||
|
// Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag.
|
||||||
|
// Only possible for single shard.
|
||||||
|
if (coordinator_state_ & COORD_CONCLUDING) {
|
||||||
|
release_fp_locks();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Single shard operations might have delayed acquiring txid unless neccessary.
|
// Single shard operations might have delayed acquiring txid unless neccessary.
|
||||||
|
@ -1095,8 +1095,7 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
|
||||||
// fail this scheduling attempt for trans.
|
// fail this scheduling attempt for trans.
|
||||||
if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) {
|
if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) {
|
||||||
if (sd.local_mask & KEYLOCK_ACQUIRED) {
|
if (sd.local_mask & KEYLOCK_ACQUIRED) {
|
||||||
GetDbSlice(shard->shard_id()).Release(mode, lock_args);
|
release_fp_locks();
|
||||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue