From b8af49cfe5a7a1d7137b21f67a2d0e1b45331240 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Wed, 10 Jan 2024 11:31:11 +0300 Subject: [PATCH] chore(transaction): Avoid COORD_SCHED_EXEC ambiguity with multi transactions (#2392) * chore(transaction): Avoid COORD_SCHED_EXEC ambiguity Signed-off-by: Vladislav Oleshko --- src/server/transaction.cc | 45 ++++++++++++++++++--------------------- src/server/transaction.h | 12 ++++++----- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7a5dc96c9..32a9ec369 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -451,9 +451,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = sd.local_mask & AWAKED_Q; - - bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING); - bool tx_stop_runnig = is_concluding && !IsAtomicMulti(); + bool is_concluding = coordinator_state_ & COORD_CONCLUDING; IntentLock::Mode mode = LockMode(); @@ -493,7 +491,8 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { /*************************************************************************/ - if (is_concluding) // Check last hop + // Log to jounrnal only once the command finished running + if (is_concluding || (multi_ && multi_->concluding)) LogAutoJournalOnShard(shard); shard->db_slice().OnCbFinish(); @@ -503,11 +502,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. // Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). - // In case of multi transaction is_concluding represents only if the current running op is - // concluding, therefore we remove from txq in unlock multi function which is when the transaction - // is concluding. - bool remove_txq = tx_stop_runnig || !txq_ooo; - if (remove_txq && sd.pq_pos != TxQueue::kEnd) { + if ((is_concluding || !txq_ooo) && sd.pq_pos != TxQueue::kEnd) { VLOG(2) << "Remove from txq " << this->DebugId(); shard->txq()->Remove(sd.pq_pos); sd.pq_pos = TxQueue::kEnd; @@ -515,7 +510,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // If it's a final hop we should release the locks. - if (tx_stop_runnig) { + if (is_concluding) { bool became_suspended = sd.local_mask & SUSPENDED_Q; KeyLockArgs largs; @@ -560,7 +555,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { CHECK_GE(DecreaseRunCnt(), 1u); // From this point on we can not access 'this'. - return !tx_stop_runnig; + return !is_concluding; } void Transaction::ScheduleInternal() { @@ -673,22 +668,26 @@ void Transaction::ScheduleInternal() { // transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or // BLPOP where a data must be read from multiple shards before performing another hop. OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { - DCHECK(!cb_ptr_); - if (multi_ && multi_->role == SQUASHED_STUB) { return RunSquashedMultiCb(cb); } + DCHECK(!cb_ptr_); + DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. + cb_ptr_ = &cb; - DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. - coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. - - bool was_ooo = false; + if (IsAtomicMulti()) { + multi_->concluding = true; + } else { + coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude. + } // If we run only on one shard and conclude, we can avoid scheduling at all // and directly dispatch the task to its destination shard. bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); + + bool was_ooo = false; bool run_inline = false; ServerState* ss = nullptr; @@ -729,9 +728,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } else { shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. } - } else { - // This transaction either spans multiple shards and/or is multi. - + } else { // This transaction either spans multiple shards and/or is multi. if (!IsAtomicMulti()) // Multi schedule in advance. ScheduleInternal(); @@ -824,12 +821,12 @@ void Transaction::Execute(RunnableType cb, bool conclude) { DCHECK(!cb_ptr_); cb_ptr_ = &cb; - coordinator_state_ |= COORD_EXEC; - if (conclude) { - coordinator_state_ |= COORD_EXEC_CONCLUDING; + if (IsAtomicMulti()) { + multi_->concluding = conclude; } else { - coordinator_state_ &= ~COORD_EXEC_CONCLUDING; + coordinator_state_ = conclude ? (coordinator_state_ | COORD_CONCLUDING) + : (coordinator_state_ & ~COORD_CONCLUDING); } ExecuteAsync(); diff --git a/src/server/transaction.h b/src/server/transaction.h index eb9c6b2a4..18e4ff466 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -382,6 +382,9 @@ class Transaction { std::optional lock_mode; absl::flat_hash_set locks; + // Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING + bool concluding = false; + // The shard_journal_write vector variable is used to determine the number of shards // involved in a multi-command transaction. This information is utilized by replicas when // executing multi-command. For every write to a shard journal, the corresponding index in the @@ -392,12 +395,11 @@ class Transaction { enum CoordinatorState : uint8_t { COORD_SCHED = 1, - COORD_EXEC = 2, - COORD_EXEC_CONCLUDING = 1 << 2, // Whether its the last hop of a transaction - COORD_BLOCKED = 1 << 3, - COORD_CANCELLED = 1 << 4, - COORD_OOO = 1 << 5, + COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction + COORD_BLOCKED = 1 << 2, + COORD_CANCELLED = 1 << 3, + COORD_OOO = 1 << 4, }; struct PerShardCache {