From 366f50230b1089871d118e4f32556492f167f033 Mon Sep 17 00:00:00 2001 From: adiholden Date: Mon, 31 Jul 2023 14:50:33 +0300 Subject: [PATCH] bug(server): multi atomicity fix (#1593) * bug(server): multi atomicity fix The bug: when multi transaction run OOO we removed it from trasaction queue, causing non atomic execution. The fix: When we run multi transaction unless it is the head in txq we remove it inside unlock multi from txq. Signed-off-by: adi_holden --- src/server/multi_test.cc | 2 -- src/server/transaction.cc | 21 +++++++++++---------- src/server/transaction.h | 2 +- 3 files changed, 12 insertions(+), 13 deletions(-) diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 66ffe225a..b6fbab932 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -726,7 +726,6 @@ TEST_F(MultiTest, TestSquashing) { f1.Join(); } -#if 0 TEST_F(MultiTest, MultiLeavesTxQueue) { if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode == Transaction::NON_ATOMIC) { GTEST_SKIP() << "Skipped MultiLeavesTxQueue test because multi_exec_mode is non atomic"; @@ -806,7 +805,6 @@ TEST_F(MultiTest, MultiLeavesTxQueue) { fb2.Join(); ASSERT_TRUE(success); } -#endif class MultiEvalTest : public BaseFamilyTest { protected: diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 928655c0e..ec74648d0 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -422,7 +422,7 @@ string Transaction::DebugId() const { return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); } -// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. +// Runs in the dbslice thread. Returns true if the transaction continues running in the thread. bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); CHECK(cb_ptr_) << DebugId(); @@ -444,13 +444,9 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = sd.local_mask & AWAKED_Q; - // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. - // Therefore we differentiate between concluding, which says that this specific - // runnable concludes current operation, and should_release which tells - // whether we should unlock the keys. should_release is false for multi and - // equal to concluding otherwise. bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING); - bool should_release = is_concluding && !IsAtomicMulti(); + bool tx_stop_runnig = is_concluding && !IsAtomicMulti(); + IntentLock::Mode mode = Mode(); DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL)); @@ -498,14 +494,19 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. // Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). - bool remove_txq = is_concluding || !txq_ooo; + // In case of multi transaction is_concluding represents only if the current running op is + // concluding, therefore we remove from txq in unlock multi function which is when the transaction + // is concluding. + bool remove_txq = tx_stop_runnig || !txq_ooo; if (remove_txq && sd.pq_pos != TxQueue::kEnd) { + VLOG(2) << "Remove from txq" << this->DebugId(); shard->txq()->Remove(sd.pq_pos); sd.pq_pos = TxQueue::kEnd; } + // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // If it's a final hop we should release the locks. - if (should_release) { + if (tx_stop_runnig) { bool became_suspended = sd.local_mask & SUSPENDED_Q; KeyLockArgs largs; @@ -550,7 +551,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { CHECK_GE(DecreaseRunCnt(), 1u); // From this point on we can not access 'this'. - return !should_release; // keep + return !tx_stop_runnig; } void Transaction::ScheduleInternal() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 24a0a99da..945054c21 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -170,7 +170,7 @@ class Transaction { // Called by engine shard to execute a transaction hop. // txq_ooo is set to true if the transaction is running out of order // not as the tx queue head. - // Returns true if transaction should be kept in the queue. + // Returns true if the transaction continues running in the thread bool RunInShard(EngineShard* shard, bool txq_ooo); // Registers transaction into watched queue and blocks until a) either notification is received.