From adeac6bd27013f49b44467b7e9178aef8d8ba4c5 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 1 Feb 2024 14:19:08 +0200 Subject: [PATCH] Pr1 (#2517) * fix: Remove a stale reference to blocking watch queue 1. Remove the duplicated FinalizeWatched function 2. Identify the case where we delete the watched queue while we may still have awakedened_keys pointing to it. 3. Add a test reproducing the issue of having in awakened_keys an untangled key. Properly fixes #2514 Signed-off-by: Roman Gershman --------- Signed-off-by: Roman Gershman --- src/server/blocking_controller.cc | 47 +++++-------------------------- src/server/blocking_controller.h | 4 +-- src/server/list_family_test.cc | 28 ++++++++++++++++-- src/server/test_utils.cc | 2 +- src/server/transaction.cc | 3 +- 5 files changed, 36 insertions(+), 48 deletions(-) diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 5922e0eb5..a795dc10b 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -93,13 +93,14 @@ bool BlockingController::DbWatchTable::UnwatchTx(string_view key, Transaction* t // the suspended item does not have to be the first one in the queue. // This shard has not been awakened and in case this transaction in the queue // we must clean it up. - if (auto it = wq->Find(tx); it != wq->items.end()) { wq->items.erase(it); } } if (wq->items.empty()) { + DVLOG(1) << "queue_map.erase"; + awakened_keys.erase(wq_it->first); queue_map.erase(wq_it); } return res; @@ -120,45 +121,7 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) { return awakened_keys.insert(it->first).second; } -// Optionally removes tx from the front of the watch queues. -void BlockingController::FinalizeWatched(KeyLockArgs lock_args, Transaction* tx) { - DCHECK(tx); - - ShardId sid = owner_->shard_id(); - - uint16_t local_mask = tx->GetLocalMask(sid); - VLOG(1) << "FinalizeBlocking [" << sid << "]" << tx->DebugId() << " " << local_mask; - - bool is_awakened = local_mask & Transaction::AWAKED_Q; - - if (is_awakened) - awakened_transactions_.erase(tx); - - auto dbit = watched_dbs_.find(tx->GetDbIndex()); - - // Can happen if it was the only transaction in the queue and it was notified and removed. - if (dbit == watched_dbs_.end()) - return; - - DbWatchTable& wt = *dbit->second; - - // Add keys of processed transaction so we could awake the next one in the queue - // in case those keys still exist. - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view key = lock_args.args[i]; - bool removed_awakened = wt.UnwatchTx(key, tx); - if (removed_awakened) { - CHECK(is_awakened) << tx->DebugId() << " " << key << " " << local_mask; - } - } - - if (wt.queue_map.empty()) { - watched_dbs_.erase(dbit); - } - awakened_indices_.emplace(tx->GetDbIndex()); -} - -// Similar function but with ArgSlice. TODO: to fix the duplication. +// Removes tx from its watch queues if tx appears there. void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { DCHECK(tx); @@ -195,6 +158,7 @@ void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { awakened_indices_.emplace(tx->GetDbIndex()); } +// Runs on the shard thread. void BlockingController::NotifyPending() { const Transaction* tx = owner_->GetContTx(); CHECK(tx == nullptr) << tx->DebugId(); @@ -214,6 +178,8 @@ void BlockingController::NotifyPending() { DVLOG(1) << "Processing awakened key " << sv_key; auto w_it = wt.queue_map.find(sv_key); if (w_it == wt.queue_map.end()) { + // This should not happen because we remove keys from awakened_keys every type we remove + // the entry from queue_map. TODO: to make it a CHECK after Dec 2024 LOG(ERROR) << "Internal error: Key " << sv_key << " was not found in the watch queue, wt.awakened_keys len is " << wt.awakened_keys.size() << " wt.queue_map len is " << wt.queue_map.size(); @@ -229,6 +195,7 @@ void BlockingController::NotifyPending() { WatchQueue* wq = w_it->second.get(); NotifyWatchQueue(sv_key, wq, context); if (wq->items.empty()) { + // we erase awakened_keys right after this loop finishes running. wt.queue_map.erase(w_it); } } diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 0a835625c..081aff9f4 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -28,10 +28,8 @@ class BlockingController { return awakened_transactions_; } - void FinalizeWatched(KeyLockArgs lock_args, Transaction* tx); - - // A mirror reflection but with ArgSlice. Yeah, I know.... void FinalizeWatched(ArgSlice args, Transaction* tx); + // go over potential wakened keys, verify them and activate watch queues. void NotifyPending(); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 4303e2316..3284ead5e 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -815,7 +815,7 @@ TEST_F(ListFamilyTest, BLPopUnwakesInScript) { }); // Start long running script that intends to wake up blpop - auto f2 = pp_->at(2)->LaunchFiber([&]() { + auto f2 = pp_->at(2)->LaunchFiber([&] { Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"}); }); @@ -841,7 +841,7 @@ TEST_F(ListFamilyTest, OtherMultiWakesBLpop) { )"; // Start BLPOP with infinite timeout - auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&]() { + auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] { auto resp = Run("blpop", {"BLPOP", "l", "0"}); // blpop should only be awakened after the script has completed, so the // last element added in the script should be returned. @@ -851,7 +851,7 @@ TEST_F(ListFamilyTest, OtherMultiWakesBLpop) { // Start long running script that accesses the list, but should wake up blpop only after it // finished - auto f2 = pp_->at(2)->LaunchFiber(Launch::dispatch, [&]() { + auto f2 = pp_->at(2)->LaunchFiber(Launch::dispatch, [&] { Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"}); }); @@ -862,4 +862,26 @@ TEST_F(ListFamilyTest, OtherMultiWakesBLpop) { f2.Join(); } +TEST_F(ListFamilyTest, ContendExpire) { + vector blpop_fibers; + for (unsigned i = 0; i < num_threads_; ++i) { + for (unsigned j = 0; j < 30; ++j) { + blpop_fibers.emplace_back(pp_->at(i)->LaunchFiber(Launch::post, [&, i, j] { + string keys[2] = {"key0", "key1"}; + thread_local unsigned cur = 0; + for (unsigned n = 0; n < 30; n++) { + string k = keys[cur]; + cur ^= 1; + Run(StrCat("push", i, "_", j), {"lpush", k, "foo"}); + Run(StrCat("blpop", i, "_", j), {"blpop", keys[cur], "a", "0.001"}); + } + })); + } + } + + for (auto& f : blpop_fibers) { + f.Join(); + } +} + } // namespace dfly diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 70f66f348..5b3d200ad 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -228,7 +228,7 @@ void BaseFamilyTest::ResetService() { watchdog_fiber_ = pp_->GetNextProactor()->LaunchFiber([this] { ThisFiber::SetName("Watchdog"); - if (!watchdog_done_.WaitFor(120s)) { + if (!watchdog_done_.WaitFor(60s)) { LOG(ERROR) << "Deadlock detected!!!!"; absl::SetFlag(&FLAGS_alsologtostderr, true); fb2::Mutex m; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index e9db7b5a2..874970ef3 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -634,7 +634,8 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // of the queue and notify the next one. if (auto* bcontroller = shard->blocking_controller(); bcontroller) { if (awaked_prerun || was_suspended) { - bcontroller->FinalizeWatched(largs, this); + CHECK_EQ(largs.key_step, 1u); + bcontroller->FinalizeWatched(largs.args, this); } // Wake only if no tx queue head is currently running