diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 5fa46ba3f..a98a219c0 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -164,12 +164,18 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } - bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); Transaction* head = nullptr; string dbg_id; - if (continuation_trans_ == nullptr && !has_awaked_trans) { + if (continuation_trans_ == nullptr) { while (!txq_.Empty()) { + // we must check every iteration so that if the current transaction awakens + // another transaction, the loop won't proceed further and will break, because we must run + // the notified transaction before all other transactions in the queue can proceed. + bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); + if (has_awaked_trans) + break; + auto val = txq_.Front(); head = absl::get(val); @@ -212,13 +218,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { break; } } // while(!txq_.Empty()) - } else { // if (continuation_trans_ == nullptr && !has_awaked_trans) - DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; + } else { // if (continuation_trans_ == nullptr) + DVLOG(1) << "Skipped TxQueue " << continuation_trans_; } // we need to run trans if it's OOO or when trans is blocked in this shard and should // be treated here as noop. - // trans is OOO, it it locked keys that previous transactions have not locked yet. bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q); // It may be that there are other transactions that touch those keys but they necessary ordered diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 01e074c0c..4e2896bc6 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -622,4 +622,43 @@ TEST_F(ListFamilyTest, LMove) { ASSERT_THAT(Run({"lmove", kKey1, kKey1, "LEFT", "R"}), ArgType(RespExpr::ERROR)); } +TEST_F(ListFamilyTest, TwoQueueBug451) { + // The bug was that if 2 push operations where queued together in the tx queue, + // and the first awoke pending blpop, then the PollExecution function would continue with the + // second push before switching to blpop, which contradicts the spec. + std::atomic_bool running{true}; + std::atomic_int it_cnt{0}; + + auto pop_fiber = [&]() { + auto id = "t-" + std::to_string(it_cnt.fetch_add(1)); + while (running.load()) { + Run(id, {"blpop", "a", "0.1"}); + } + }; + + auto push_fiber = [&]() { + auto id = "t-" + std::to_string(it_cnt.fetch_add(1)); + for (int i = 0; i < 1000; i++) { + Run(id, {"rpush", "a", "DATA"}); + } + ::boost::this_fiber::sleep_for(100ms); + running = false; + }; + + vector fbs; + + // more likely to reproduce the bug if we start pop_fiber first. + for (int i = 0; i < 2; i++) { + fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber)); + } + + + for (int i = 0; i < 2; i++) { + fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber)); + } + + for (auto& f : fbs) + f.join(); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f0219be8f..7f38c2aaf 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1137,7 +1137,8 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } - DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask; + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask + << " by " << committed_txid; // local_mask could be awaked (i.e. not suspended) if the transaction has been // awakened by another key or awakened by the same key multiple times.