mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: simplify ScheduleInShard (#1610)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
977fc18e25
commit
3a4b3c97c8
1 changed files with 21 additions and 37 deletions
|
@ -1008,68 +1008,52 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
|||
|
||||
// This function should not block since it's run via RunBriefInParallel.
|
||||
pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
||||
DCHECK(!shard_data_.empty());
|
||||
DCHECK(shard_data_[SidToId(shard->shard_id())].local_mask & ACTIVE);
|
||||
|
||||
// schedule_success, lock_granted.
|
||||
pair<bool, bool> result{false, false};
|
||||
|
||||
if (shard->committed_txid() >= txid_) {
|
||||
return result;
|
||||
}
|
||||
// If a more recent transaction already commited, we abort
|
||||
if (shard->committed_txid() >= txid_)
|
||||
return {false, false};
|
||||
|
||||
TxQueue* txq = shard->txq();
|
||||
KeyLockArgs lock_args;
|
||||
IntentLock::Mode mode = Mode();
|
||||
|
||||
bool spans_all = IsGlobal();
|
||||
bool lock_granted = false;
|
||||
ShardId sid = SidToId(shard->shard_id());
|
||||
|
||||
ShardId sid = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[sid];
|
||||
|
||||
if (!spans_all) {
|
||||
bool shard_unlocked = shard->shard_lock()->Check(mode);
|
||||
// Acquire intent locks
|
||||
if (!IsGlobal()) {
|
||||
lock_args = GetLockArgs(shard->shard_id());
|
||||
|
||||
// we need to acquire the lock unrelated to shard_unlocked since we register into Tx queue.
|
||||
// All transactions in the queue must acquire the intent lock.
|
||||
lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked;
|
||||
// Key locks are acquired even if the shard is locked since intent locks are always acquired
|
||||
bool shard_unlocked = shard->shard_lock()->Check(mode);
|
||||
bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args);
|
||||
|
||||
lock_granted = keys_unlocked && shard_unlocked;
|
||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||
DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
||||
}
|
||||
|
||||
if (!txq->Empty()) {
|
||||
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
|
||||
// and some other transaction already locked its keys we can not reorder 'trans' because
|
||||
// that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we
|
||||
// fail this scheduling attempt for trans.
|
||||
// However, when we schedule span-all transactions we can still reorder them. The reason is
|
||||
// before we start scheduling them we lock the shards and disable OOO.
|
||||
// We may record when they disable OOO via barrier_ts so if the queue contains transactions
|
||||
// that were only scheduled afterwards we know they are not free so we can still
|
||||
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore().
|
||||
bool to_proceed = lock_granted || txq->TailScore() < txid_;
|
||||
if (!to_proceed) {
|
||||
if (sd.local_mask & KEYLOCK_ACQUIRED) { // rollback the lock.
|
||||
shard->db_slice().Release(mode, lock_args);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
}
|
||||
|
||||
return result; // false, false
|
||||
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
|
||||
// and some other transaction already locked its keys we can not reorder 'trans' because
|
||||
// that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we
|
||||
// fail this scheduling attempt for trans.
|
||||
if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) {
|
||||
if (sd.local_mask & KEYLOCK_ACQUIRED) {
|
||||
shard->db_slice().Release(mode, lock_args);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
}
|
||||
return {false, false};
|
||||
}
|
||||
|
||||
result.second = lock_granted;
|
||||
result.first = true;
|
||||
|
||||
TxQueue::Iterator it = txq->Insert(this);
|
||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||
sd.pq_pos = it;
|
||||
|
||||
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size();
|
||||
|
||||
return result;
|
||||
return {true, lock_granted};
|
||||
}
|
||||
|
||||
bool Transaction::CancelShardCb(EngineShard* shard) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue