fix(list): Fixes blpop failure. (#462)

The bug was that if two push operations where queued together in the tx queue,
and the first push awakes pending blpop, then the PollExecution function would continue with the
second push before switching to blpop, which contradicts the spec.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-11-05 14:43:11 +02:00 committed by GitHub
parent d95de3347a
commit 2b87088121
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 6 deletions

View file

@ -164,12 +164,18 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
}
bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction();
Transaction* head = nullptr;
string dbg_id;
if (continuation_trans_ == nullptr && !has_awaked_trans) {
if (continuation_trans_ == nullptr) {
while (!txq_.Empty()) {
// we must check every iteration so that if the current transaction awakens
// another transaction, the loop won't proceed further and will break, because we must run
// the notified transaction before all other transactions in the queue can proceed.
bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction();
if (has_awaked_trans)
break;
auto val = txq_.Front();
head = absl::get<Transaction*>(val);
@ -212,13 +218,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
break;
}
} // while(!txq_.Empty())
} else { // if (continuation_trans_ == nullptr && !has_awaked_trans)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
} else { // if (continuation_trans_ == nullptr)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_;
}
// we need to run trans if it's OOO or when trans is blocked in this shard and should
// be treated here as noop.
// trans is OOO, it it locked keys that previous transactions have not locked yet.
bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q);
// It may be that there are other transactions that touch those keys but they necessary ordered

View file

@ -622,4 +622,43 @@ TEST_F(ListFamilyTest, LMove) {
ASSERT_THAT(Run({"lmove", kKey1, kKey1, "LEFT", "R"}), ArgType(RespExpr::ERROR));
}
TEST_F(ListFamilyTest, TwoQueueBug451) {
// The bug was that if 2 push operations where queued together in the tx queue,
// and the first awoke pending blpop, then the PollExecution function would continue with the
// second push before switching to blpop, which contradicts the spec.
std::atomic_bool running{true};
std::atomic_int it_cnt{0};
auto pop_fiber = [&]() {
auto id = "t-" + std::to_string(it_cnt.fetch_add(1));
while (running.load()) {
Run(id, {"blpop", "a", "0.1"});
}
};
auto push_fiber = [&]() {
auto id = "t-" + std::to_string(it_cnt.fetch_add(1));
for (int i = 0; i < 1000; i++) {
Run(id, {"rpush", "a", "DATA"});
}
::boost::this_fiber::sleep_for(100ms);
running = false;
};
vector<boost::fibers::fiber> fbs;
// more likely to reproduce the bug if we start pop_fiber first.
for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber));
}
for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber));
}
for (auto& f : fbs)
f.join();
}
} // namespace dfly

View file

@ -1137,7 +1137,8 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false;
}
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask;
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask
<< " by " << committed_txid;
// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.