mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
bug(server): multi atomicity fix (#1593)
* bug(server): multi atomicity fix The bug: when multi transaction run OOO we removed it from trasaction queue, causing non atomic execution. The fix: When we run multi transaction unless it is the head in txq we remove it inside unlock multi from txq. Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
1a65d5684b
commit
366f50230b
3 changed files with 12 additions and 13 deletions
|
@ -726,7 +726,6 @@ TEST_F(MultiTest, TestSquashing) {
|
||||||
f1.Join();
|
f1.Join();
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
TEST_F(MultiTest, MultiLeavesTxQueue) {
|
TEST_F(MultiTest, MultiLeavesTxQueue) {
|
||||||
if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode == Transaction::NON_ATOMIC) {
|
if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode == Transaction::NON_ATOMIC) {
|
||||||
GTEST_SKIP() << "Skipped MultiLeavesTxQueue test because multi_exec_mode is non atomic";
|
GTEST_SKIP() << "Skipped MultiLeavesTxQueue test because multi_exec_mode is non atomic";
|
||||||
|
@ -806,7 +805,6 @@ TEST_F(MultiTest, MultiLeavesTxQueue) {
|
||||||
fb2.Join();
|
fb2.Join();
|
||||||
ASSERT_TRUE(success);
|
ASSERT_TRUE(success);
|
||||||
}
|
}
|
||||||
#endif
|
|
||||||
|
|
||||||
class MultiEvalTest : public BaseFamilyTest {
|
class MultiEvalTest : public BaseFamilyTest {
|
||||||
protected:
|
protected:
|
||||||
|
|
|
@ -422,7 +422,7 @@ string Transaction::DebugId() const {
|
||||||
return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
|
return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
|
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
|
||||||
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
|
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
|
||||||
CHECK(cb_ptr_) << DebugId();
|
CHECK(cb_ptr_) << DebugId();
|
||||||
|
@ -444,13 +444,9 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
||||||
bool awaked_prerun = sd.local_mask & AWAKED_Q;
|
bool awaked_prerun = sd.local_mask & AWAKED_Q;
|
||||||
|
|
||||||
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
|
||||||
// Therefore we differentiate between concluding, which says that this specific
|
|
||||||
// runnable concludes current operation, and should_release which tells
|
|
||||||
// whether we should unlock the keys. should_release is false for multi and
|
|
||||||
// equal to concluding otherwise.
|
|
||||||
bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING);
|
bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING);
|
||||||
bool should_release = is_concluding && !IsAtomicMulti();
|
bool tx_stop_runnig = is_concluding && !IsAtomicMulti();
|
||||||
|
|
||||||
IntentLock::Mode mode = Mode();
|
IntentLock::Mode mode = Mode();
|
||||||
|
|
||||||
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
|
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
|
||||||
|
@ -498,14 +494,19 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
|
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
|
||||||
// and successive hops are run by continuation_trans_ in engine shard.
|
// and successive hops are run by continuation_trans_ in engine shard.
|
||||||
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
|
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
|
||||||
bool remove_txq = is_concluding || !txq_ooo;
|
// In case of multi transaction is_concluding represents only if the current running op is
|
||||||
|
// concluding, therefore we remove from txq in unlock multi function which is when the transaction
|
||||||
|
// is concluding.
|
||||||
|
bool remove_txq = tx_stop_runnig || !txq_ooo;
|
||||||
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
|
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
|
||||||
|
VLOG(2) << "Remove from txq" << this->DebugId();
|
||||||
shard->txq()->Remove(sd.pq_pos);
|
shard->txq()->Remove(sd.pq_pos);
|
||||||
sd.pq_pos = TxQueue::kEnd;
|
sd.pq_pos = TxQueue::kEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
||||||
// If it's a final hop we should release the locks.
|
// If it's a final hop we should release the locks.
|
||||||
if (should_release) {
|
if (tx_stop_runnig) {
|
||||||
bool became_suspended = sd.local_mask & SUSPENDED_Q;
|
bool became_suspended = sd.local_mask & SUSPENDED_Q;
|
||||||
KeyLockArgs largs;
|
KeyLockArgs largs;
|
||||||
|
|
||||||
|
@ -550,7 +551,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||||
// From this point on we can not access 'this'.
|
// From this point on we can not access 'this'.
|
||||||
|
|
||||||
return !should_release; // keep
|
return !tx_stop_runnig;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::ScheduleInternal() {
|
void Transaction::ScheduleInternal() {
|
||||||
|
|
|
@ -170,7 +170,7 @@ class Transaction {
|
||||||
// Called by engine shard to execute a transaction hop.
|
// Called by engine shard to execute a transaction hop.
|
||||||
// txq_ooo is set to true if the transaction is running out of order
|
// txq_ooo is set to true if the transaction is running out of order
|
||||||
// not as the tx queue head.
|
// not as the tx queue head.
|
||||||
// Returns true if transaction should be kept in the queue.
|
// Returns true if the transaction continues running in the thread
|
||||||
bool RunInShard(EngineShard* shard, bool txq_ooo);
|
bool RunInShard(EngineShard* shard, bool txq_ooo);
|
||||||
|
|
||||||
// Registers transaction into watched queue and blocks until a) either notification is received.
|
// Registers transaction into watched queue and blocks until a) either notification is received.
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue