chore(transaction): Avoid COORD_SCHED_EXEC ambiguity with multi transactions (#2392)

* chore(transaction): Avoid COORD_SCHED_EXEC ambiguity

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-10 11:31:11 +03:00 committed by GitHub
parent 0c96f83d1d
commit b8af49cfe5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 28 additions and 29 deletions

View file

@ -451,9 +451,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
bool was_suspended = sd.local_mask & SUSPENDED_Q; bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = sd.local_mask & AWAKED_Q; bool awaked_prerun = sd.local_mask & AWAKED_Q;
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;
bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING);
bool tx_stop_runnig = is_concluding && !IsAtomicMulti();
IntentLock::Mode mode = LockMode(); IntentLock::Mode mode = LockMode();
@ -493,7 +491,8 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
/*************************************************************************/ /*************************************************************************/
if (is_concluding) // Check last hop // Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard); LogAutoJournalOnShard(shard);
shard->db_slice().OnCbFinish(); shard->db_slice().OnCbFinish();
@ -503,11 +502,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard. // and successive hops are run by continuation_trans_ in engine shard.
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). // Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
// In case of multi transaction is_concluding represents only if the current running op is if ((is_concluding || !txq_ooo) && sd.pq_pos != TxQueue::kEnd) {
// concluding, therefore we remove from txq in unlock multi function which is when the transaction
// is concluding.
bool remove_txq = tx_stop_runnig || !txq_ooo;
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
VLOG(2) << "Remove from txq " << this->DebugId(); VLOG(2) << "Remove from txq " << this->DebugId();
shard->txq()->Remove(sd.pq_pos); shard->txq()->Remove(sd.pq_pos);
sd.pq_pos = TxQueue::kEnd; sd.pq_pos = TxQueue::kEnd;
@ -515,7 +510,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
// If it's a final hop we should release the locks. // If it's a final hop we should release the locks.
if (tx_stop_runnig) { if (is_concluding) {
bool became_suspended = sd.local_mask & SUSPENDED_Q; bool became_suspended = sd.local_mask & SUSPENDED_Q;
KeyLockArgs largs; KeyLockArgs largs;
@ -560,7 +555,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
CHECK_GE(DecreaseRunCnt(), 1u); CHECK_GE(DecreaseRunCnt(), 1u);
// From this point on we can not access 'this'. // From this point on we can not access 'this'.
return !tx_stop_runnig; return !is_concluding;
} }
void Transaction::ScheduleInternal() { void Transaction::ScheduleInternal() {
@ -673,22 +668,26 @@ void Transaction::ScheduleInternal() {
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or // transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop. // BLPOP where a data must be read from multiple shards before performing another hop.
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DCHECK(!cb_ptr_);
if (multi_ && multi_->role == SQUASHED_STUB) { if (multi_ && multi_->role == SQUASHED_STUB) {
return RunSquashedMultiCb(cb); return RunSquashedMultiCb(cb);
} }
DCHECK(!cb_ptr_);
DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance.
cb_ptr_ = &cb; cb_ptr_ = &cb;
DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. if (IsAtomicMulti()) {
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. multi_->concluding = true;
} else {
bool was_ooo = false; coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude.
}
// If we run only on one shard and conclude, we can avoid scheduling at all // If we run only on one shard and conclude, we can avoid scheduling at all
// and directly dispatch the task to its destination shard. // and directly dispatch the task to its destination shard.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();
bool was_ooo = false;
bool run_inline = false; bool run_inline = false;
ServerState* ss = nullptr; ServerState* ss = nullptr;
@ -729,9 +728,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
} else { } else {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
} }
} else { } else { // This transaction either spans multiple shards and/or is multi.
// This transaction either spans multiple shards and/or is multi.
if (!IsAtomicMulti()) // Multi schedule in advance. if (!IsAtomicMulti()) // Multi schedule in advance.
ScheduleInternal(); ScheduleInternal();
@ -824,12 +821,12 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
DCHECK(!cb_ptr_); DCHECK(!cb_ptr_);
cb_ptr_ = &cb; cb_ptr_ = &cb;
coordinator_state_ |= COORD_EXEC;
if (conclude) { if (IsAtomicMulti()) {
coordinator_state_ |= COORD_EXEC_CONCLUDING; multi_->concluding = conclude;
} else { } else {
coordinator_state_ &= ~COORD_EXEC_CONCLUDING; coordinator_state_ = conclude ? (coordinator_state_ | COORD_CONCLUDING)
: (coordinator_state_ & ~COORD_CONCLUDING);
} }
ExecuteAsync(); ExecuteAsync();

View file

@ -382,6 +382,9 @@ class Transaction {
std::optional<IntentLock::Mode> lock_mode; std::optional<IntentLock::Mode> lock_mode;
absl::flat_hash_set<std::string> locks; absl::flat_hash_set<std::string> locks;
// Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING
bool concluding = false;
// The shard_journal_write vector variable is used to determine the number of shards // The shard_journal_write vector variable is used to determine the number of shards
// involved in a multi-command transaction. This information is utilized by replicas when // involved in a multi-command transaction. This information is utilized by replicas when
// executing multi-command. For every write to a shard journal, the corresponding index in the // executing multi-command. For every write to a shard journal, the corresponding index in the
@ -392,12 +395,11 @@ class Transaction {
enum CoordinatorState : uint8_t { enum CoordinatorState : uint8_t {
COORD_SCHED = 1, COORD_SCHED = 1,
COORD_EXEC = 2,
COORD_EXEC_CONCLUDING = 1 << 2, // Whether its the last hop of a transaction COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 3, COORD_BLOCKED = 1 << 2,
COORD_CANCELLED = 1 << 4, COORD_CANCELLED = 1 << 3,
COORD_OOO = 1 << 5, COORD_OOO = 1 << 4,
}; };
struct PerShardCache { struct PerShardCache {