mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix(server): Fix a bug with brpoplpush (#677)
fix(server): Fix a bug when an expired transaction stays in watched queue. Now we remove the transaction from the watched queues in a consistent manner based on the keys it was assigned to watch. Signed-off-by: Roman Gershman <roman@dragonflydb.io> Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
dd218fa037
commit
613e3b8741
9 changed files with 79 additions and 80 deletions
|
@ -402,10 +402,6 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
if (was_suspended || !become_suspended) {
|
||||
shard->db_slice().Release(mode, largs);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
|
||||
if (was_suspended || (sd.local_mask & AWAKED_Q)) {
|
||||
shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
|
||||
}
|
||||
}
|
||||
sd.local_mask &= ~OUT_OF_ORDER;
|
||||
}
|
||||
|
@ -813,13 +809,18 @@ void Transaction::RunQuickie(EngineShard* shard) {
|
|||
|
||||
// runs in coordinator thread.
|
||||
// Marks the transaction as expired and removes it from the waiting queue.
|
||||
void Transaction::ExpireBlocking() {
|
||||
DVLOG(1) << "ExpireBlocking " << DebugId();
|
||||
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysPovider wcb) {
|
||||
DVLOG(1) << "UnwatchBlocking " << DebugId();
|
||||
DCHECK(!IsGlobal());
|
||||
|
||||
run_count_.store(unique_shard_cnt_, memory_order_release);
|
||||
|
||||
auto expire_cb = [this] { ExpireShardCb(EngineShard::tlocal()); };
|
||||
auto expire_cb = [&] {
|
||||
EngineShard* es = EngineShard::tlocal();
|
||||
ArgSlice wkeys = wcb(this, es);
|
||||
|
||||
UnwatchShardCb(wkeys, should_expire, es);
|
||||
};
|
||||
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
DCHECK_LT(unique_shard_id_, shard_set->size());
|
||||
|
@ -837,7 +838,7 @@ void Transaction::ExpireBlocking() {
|
|||
|
||||
// Wait for all callbacks to conclude.
|
||||
WaitForShardCallbacks();
|
||||
DVLOG(1) << "ExpireBlocking finished " << DebugId();
|
||||
DVLOG(1) << "UnwatchBlocking finished " << DebugId();
|
||||
}
|
||||
|
||||
const char* Transaction::Name() const {
|
||||
|
@ -1009,12 +1010,17 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
|
|||
return reverse_index_[sd.arg_start + arg_index];
|
||||
}
|
||||
|
||||
bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) {
|
||||
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysPovider wkeys_provider) {
|
||||
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
|
||||
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
|
||||
using namespace chrono;
|
||||
|
||||
Execute(cb, true);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto keys = wkeys_provider(t, shard);
|
||||
return t->WatchInShard(keys, shard);
|
||||
};
|
||||
|
||||
Execute(move(cb), true);
|
||||
|
||||
coordinator_state_ |= COORD_BLOCKED;
|
||||
|
||||
|
@ -1038,40 +1044,11 @@ bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) {
|
|||
DVLOG(1) << "WaitOnWatch await_until " << int(status);
|
||||
}
|
||||
|
||||
if ((coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout) {
|
||||
ExpireBlocking();
|
||||
coordinator_state_ &= ~COORD_BLOCKED;
|
||||
return false;
|
||||
}
|
||||
|
||||
#if 0
|
||||
// We were notified by a shard, so lets make sure that our notifications converged to a stable
|
||||
// form.
|
||||
if (unique_shard_cnt_ > 1) {
|
||||
run_count_.store(unique_shard_cnt_, memory_order_release);
|
||||
|
||||
auto converge_cb = [this] {
|
||||
this->CheckForConvergence(EngineShard::tlocal());
|
||||
};
|
||||
|
||||
for (ShardId i = 0; i < shard_data_.size(); ++i) {
|
||||
auto& sd = shard_data_[i];
|
||||
DCHECK_EQ(0, sd.local_mask & ARMED);
|
||||
if (sd.arg_count == 0)
|
||||
continue;
|
||||
shard_set->Add(i, converge_cb);
|
||||
}
|
||||
|
||||
// Wait for all callbacks to conclude.
|
||||
WaitForShardCallbacks();
|
||||
DVLOG(1) << "Convergence finished " << DebugId();
|
||||
}
|
||||
#endif
|
||||
|
||||
// Lift blocking mask.
|
||||
bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout;
|
||||
UnwatchBlocking(is_expired, wkeys_provider);
|
||||
coordinator_state_ &= ~COORD_BLOCKED;
|
||||
|
||||
return true;
|
||||
return !is_expired;
|
||||
}
|
||||
|
||||
// Runs only in the shard thread.
|
||||
|
@ -1091,22 +1068,24 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
void Transaction::ExpireShardCb(EngineShard* shard) {
|
||||
auto lock_args = GetLockArgs(shard->shard_id());
|
||||
shard->db_slice().Release(Mode(), lock_args);
|
||||
void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard) {
|
||||
if (should_expire) {
|
||||
auto lock_args = GetLockArgs(shard->shard_id());
|
||||
shard->db_slice().Release(Mode(), lock_args);
|
||||
|
||||
unsigned sd_idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[sd_idx];
|
||||
sd.local_mask |= EXPIRED_Q;
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
unsigned sd_idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[sd_idx];
|
||||
sd.local_mask |= EXPIRED_Q;
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
}
|
||||
|
||||
shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
|
||||
shard->blocking_controller()->RemoveWatched(wkeys, this);
|
||||
|
||||
// Need to see why I decided to call this.
|
||||
// My guess - probably to trigger the run of stalled transactions in case
|
||||
// this shard concurrently awoke this transaction and stalled the processing
|
||||
// of TxQueue.
|
||||
shard->PollExecution("expirecb", nullptr);
|
||||
shard->PollExecution("unwatchcb", nullptr);
|
||||
|
||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue