mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix: remove empty hop for non-expiring transactions (#1605)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
3c345c1226
commit
da17a39410
2 changed files with 23 additions and 29 deletions
|
@ -926,24 +926,20 @@ void Transaction::RunQuickie(EngineShard* shard) {
|
||||||
|
|
||||||
// runs in coordinator thread.
|
// runs in coordinator thread.
|
||||||
// Marks the transaction as expired and removes it from the waiting queue.
|
// Marks the transaction as expired and removes it from the waiting queue.
|
||||||
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) {
|
void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
|
||||||
DVLOG(1) << "UnwatchBlocking " << DebugId() << " expire: " << should_expire;
|
|
||||||
DCHECK(!IsGlobal());
|
DCHECK(!IsGlobal());
|
||||||
|
DVLOG(1) << "ExpireBlocking " << DebugId();
|
||||||
|
|
||||||
run_count_.store(unique_shard_cnt_, memory_order_release);
|
run_count_.store(unique_shard_cnt_, memory_order_release);
|
||||||
|
|
||||||
auto expire_cb = [this, &wcb, should_expire] {
|
auto expire_cb = [this, &wcb] {
|
||||||
EngineShard* es = EngineShard::tlocal();
|
EngineShard* es = EngineShard::tlocal();
|
||||||
ArgSlice wkeys = wcb(this, es);
|
ExpireShardCb(wcb(this, es), es);
|
||||||
|
|
||||||
UnwatchShardCb(wkeys, should_expire, es);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });
|
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });
|
||||||
|
|
||||||
// Wait for all callbacks to conclude.
|
|
||||||
WaitForShardCallbacks();
|
WaitForShardCallbacks();
|
||||||
DVLOG(1) << "UnwatchBlocking finished " << DebugId();
|
DVLOG(1) << "ExpireBlocking finished " << DebugId();
|
||||||
}
|
}
|
||||||
|
|
||||||
string_view Transaction::Name() const {
|
string_view Transaction::Name() const {
|
||||||
|
@ -1141,9 +1137,10 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi
|
||||||
--stats->num_blocked_clients;
|
--stats->num_blocked_clients;
|
||||||
|
|
||||||
bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout;
|
bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout;
|
||||||
UnwatchBlocking(is_expired, wkeys_provider);
|
if (is_expired)
|
||||||
coordinator_state_ &= ~COORD_BLOCKED;
|
ExpireBlocking(wkeys_provider);
|
||||||
|
|
||||||
|
coordinator_state_ &= ~COORD_BLOCKED;
|
||||||
return !is_expired;
|
return !is_expired;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1165,23 +1162,19 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard) {
|
void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
|
||||||
if (should_expire) {
|
auto lock_args = GetLockArgs(shard->shard_id());
|
||||||
auto lock_args = GetLockArgs(shard->shard_id());
|
shard->db_slice().Release(Mode(), lock_args);
|
||||||
shard->db_slice().Release(Mode(), lock_args);
|
|
||||||
|
|
||||||
unsigned sd_idx = SidToId(shard->shard_id());
|
unsigned sd_idx = SidToId(shard->shard_id());
|
||||||
auto& sd = shard_data_[sd_idx];
|
auto& sd = shard_data_[sd_idx];
|
||||||
sd.local_mask |= EXPIRED_Q;
|
sd.local_mask |= EXPIRED_Q;
|
||||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||||
shard->blocking_controller()->FinalizeWatched(wkeys, this);
|
|
||||||
DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Need to see why I decided to call this.
|
shard->blocking_controller()->FinalizeWatched(wkeys, this);
|
||||||
// My guess - probably to trigger the run of stalled transactions in case
|
DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this));
|
||||||
// this shard concurrently awoke this transaction and stalled the processing
|
|
||||||
// of TxQueue.
|
// Resume processing of transaction queue
|
||||||
shard->PollExecution("unwatchcb", nullptr);
|
shard->PollExecution("unwatchcb", nullptr);
|
||||||
|
|
||||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||||
|
|
|
@ -437,13 +437,14 @@ class Transaction {
|
||||||
// Adds itself to watched queue in the shard. Must run in that shard thread.
|
// Adds itself to watched queue in the shard. Must run in that shard thread.
|
||||||
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard);
|
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard);
|
||||||
|
|
||||||
void UnwatchBlocking(bool should_expire, WaitKeysProvider wcb);
|
// Expire blocking transaction, unlock keys and unregister it from the blocking controller
|
||||||
|
void ExpireBlocking(WaitKeysProvider wcb);
|
||||||
|
|
||||||
|
void ExpireShardCb(ArgSlice wkeys, EngineShard* shard);
|
||||||
|
|
||||||
// Returns true if we need to follow up with PollExecution on this shard.
|
// Returns true if we need to follow up with PollExecution on this shard.
|
||||||
bool CancelShardCb(EngineShard* shard);
|
bool CancelShardCb(EngineShard* shard);
|
||||||
|
|
||||||
void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard);
|
|
||||||
|
|
||||||
// Run callback inline as part of multi stub.
|
// Run callback inline as part of multi stub.
|
||||||
OpStatus RunSquashedMultiCb(RunnableType cb);
|
OpStatus RunSquashedMultiCb(RunnableType cb);
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue