From 2b87088121432f9db97aa6fdeedbcfdd8cda8514 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sat, 5 Nov 2022 14:43:11 +0200 Subject: [PATCH] fix(list): Fixes blpop failure. (#462) The bug was that if two push operations where queued together in the tx queue, and the first push awakes pending blpop, then the PollExecution function would continue with the second push before switching to blpop, which contradicts the spec. Signed-off-by: Roman Gershman --- src/server/engine_shard_set.cc | 15 ++++++++----- src/server/list_family_test.cc | 39 ++++++++++++++++++++++++++++++++++ src/server/transaction.cc | 3 ++- 3 files changed, 51 insertions(+), 6 deletions(-) diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 5fa46ba3f..a98a219c0 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -164,12 +164,18 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } - bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); Transaction* head = nullptr; string dbg_id; - if (continuation_trans_ == nullptr && !has_awaked_trans) { + if (continuation_trans_ == nullptr) { while (!txq_.Empty()) { + // we must check every iteration so that if the current transaction awakens + // another transaction, the loop won't proceed further and will break, because we must run + // the notified transaction before all other transactions in the queue can proceed. + bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); + if (has_awaked_trans) + break; + auto val = txq_.Front(); head = absl::get(val); @@ -212,13 +218,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { break; } } // while(!txq_.Empty()) - } else { // if (continuation_trans_ == nullptr && !has_awaked_trans) - DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; + } else { // if (continuation_trans_ == nullptr) + DVLOG(1) << "Skipped TxQueue " << continuation_trans_; } // we need to run trans if it's OOO or when trans is blocked in this shard and should // be treated here as noop. - // trans is OOO, it it locked keys that previous transactions have not locked yet. bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q); // It may be that there are other transactions that touch those keys but they necessary ordered diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 01e074c0c..4e2896bc6 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -622,4 +622,43 @@ TEST_F(ListFamilyTest, LMove) { ASSERT_THAT(Run({"lmove", kKey1, kKey1, "LEFT", "R"}), ArgType(RespExpr::ERROR)); } +TEST_F(ListFamilyTest, TwoQueueBug451) { + // The bug was that if 2 push operations where queued together in the tx queue, + // and the first awoke pending blpop, then the PollExecution function would continue with the + // second push before switching to blpop, which contradicts the spec. + std::atomic_bool running{true}; + std::atomic_int it_cnt{0}; + + auto pop_fiber = [&]() { + auto id = "t-" + std::to_string(it_cnt.fetch_add(1)); + while (running.load()) { + Run(id, {"blpop", "a", "0.1"}); + } + }; + + auto push_fiber = [&]() { + auto id = "t-" + std::to_string(it_cnt.fetch_add(1)); + for (int i = 0; i < 1000; i++) { + Run(id, {"rpush", "a", "DATA"}); + } + ::boost::this_fiber::sleep_for(100ms); + running = false; + }; + + vector fbs; + + // more likely to reproduce the bug if we start pop_fiber first. + for (int i = 0; i < 2; i++) { + fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber)); + } + + + for (int i = 0; i < 2; i++) { + fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber)); + } + + for (auto& f : fbs) + f.join(); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f0219be8f..7f38c2aaf 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1137,7 +1137,8 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } - DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask; + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask + << " by " << committed_txid; // local_mask could be awaked (i.e. not suspended) if the transaction has been // awakened by another key or awakened by the same key multiple times.