mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix(transaction): Improve ACTIVE flags management (#2458)
* fix(transaction): Improve ACTIVE flags management --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
517be2005e
commit
aeb2b00ac8
2 changed files with 42 additions and 23 deletions
|
@ -184,8 +184,7 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
|
|||
sd.arg_start = args_.size();
|
||||
|
||||
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
|
||||
if (multi_)
|
||||
sd.local_mask &= ~ACTIVE;
|
||||
DCHECK_EQ(sd.local_mask & ACTIVE, 0);
|
||||
|
||||
if (sd.arg_count == 0)
|
||||
continue;
|
||||
|
@ -277,20 +276,22 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;
|
||||
|
||||
if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) {
|
||||
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);
|
||||
|
||||
// We don't have to split the arguments by shards, so we can copy them directly.
|
||||
StoreKeysInArgs(key_index, needs_reverse_mapping);
|
||||
|
||||
// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
|
||||
// array, as it still might be read by leftover callbacks.
|
||||
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
|
||||
shard_data_.front().local_mask |= ACTIVE;
|
||||
|
||||
unique_shard_cnt_ = 1;
|
||||
if (is_stub) // stub transactions don't migrate
|
||||
DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size()));
|
||||
else
|
||||
unique_shard_id_ = Shard(args_.front(), shard_set->size());
|
||||
|
||||
// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
|
||||
// array, as it still might be read by leftover callbacks.
|
||||
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
|
||||
shard_data_[SidToId(unique_shard_id_)].local_mask |= ACTIVE;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -316,12 +317,13 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
if (unique_shard_cnt_ == 1) {
|
||||
PerShardData* sd;
|
||||
if (IsActiveMulti()) {
|
||||
sd = &shard_data_[unique_shard_id_];
|
||||
sd = &shard_data_[SidToId(unique_shard_id_)];
|
||||
DCHECK(sd->local_mask & ACTIVE);
|
||||
} else {
|
||||
shard_data_.resize(1);
|
||||
sd = &shard_data_.front();
|
||||
sd->local_mask |= ACTIVE;
|
||||
}
|
||||
sd->local_mask |= ACTIVE;
|
||||
sd->arg_count = -1;
|
||||
sd->arg_start = -1;
|
||||
}
|
||||
|
@ -431,9 +433,10 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
|||
DCHECK(multi_);
|
||||
DCHECK(!cb_ptr_);
|
||||
|
||||
multi_->cmd_seq_num++;
|
||||
|
||||
if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads
|
||||
unique_shard_id_ = 0;
|
||||
multi_->cmd_seq_num++;
|
||||
unique_shard_cnt_ = 0;
|
||||
|
||||
args_.clear();
|
||||
|
@ -442,18 +445,25 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
|||
cid_ = cid;
|
||||
cb_ptr_ = nullptr;
|
||||
|
||||
if (multi_->mode == NON_ATOMIC || multi_->role == SQUASHED_STUB) {
|
||||
// Reset shard data without resizing because armed might be read from cancelled callbacks.
|
||||
for (auto& sd : shard_data_) {
|
||||
sd.arg_count = sd.arg_start = sd.local_mask = 0;
|
||||
sd.pq_pos = TxQueue::kEnd;
|
||||
DCHECK_EQ(sd.is_armed.load(memory_order_relaxed), false);
|
||||
for (auto& sd : shard_data_) {
|
||||
sd.arg_count = sd.arg_start = 0;
|
||||
|
||||
if (multi_->mode == NON_ATOMIC) {
|
||||
sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags
|
||||
DCHECK_EQ(sd.pq_pos, TxQueue::kEnd);
|
||||
} else {
|
||||
DCHECK(IsAtomicMulti()); // Every command determines it's own active shards
|
||||
sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED
|
||||
}
|
||||
coordinator_state_ = 0;
|
||||
DCHECK(!sd.is_armed.load(memory_order_relaxed));
|
||||
}
|
||||
|
||||
if (multi_->mode == NON_ATOMIC)
|
||||
if (multi_->mode == NON_ATOMIC) {
|
||||
coordinator_state_ = 0;
|
||||
txid_ = 0;
|
||||
} else if (multi_->role == SQUASHED_STUB) {
|
||||
DCHECK_EQ(coordinator_state_, 0u);
|
||||
}
|
||||
|
||||
if (multi_->role == SQUASHER)
|
||||
multi_->role = DEFAULT;
|
||||
|
@ -710,6 +720,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
|
||||
if (schedule_fast) {
|
||||
DCHECK_NE(unique_shard_id_, kInvalidSid);
|
||||
DCHECK(IsActive(unique_shard_id_));
|
||||
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
|
||||
|
||||
// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
|
||||
|
@ -1044,6 +1055,18 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
|
|||
return res;
|
||||
}
|
||||
|
||||
bool Transaction::IsActive(ShardId sid) const {
|
||||
// If we have only one shard, we often don't store infromation about all shards, so determine it
|
||||
// solely by id
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
// However the active flag is still supposed to be set for our unique shard
|
||||
DCHECK((shard_data_[SidToId(unique_shard_id_)].local_mask & ACTIVE));
|
||||
return sid == unique_shard_id_;
|
||||
}
|
||||
|
||||
return shard_data_[SidToId(sid)].local_mask & ACTIVE;
|
||||
}
|
||||
|
||||
// Runs within a engine shard thread.
|
||||
// Optimized path that schedules and runs transactions out of order if possible.
|
||||
// Returns true if eagerly executed, false if the callback will be handled by the transaction
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue