From d6e97fcf6ded0d70508b04819c4bbf45d3808cc0 Mon Sep 17 00:00:00 2001 From: Andy Dunstall Date: Tue, 13 Jun 2023 06:59:34 +0100 Subject: [PATCH] fix: remove NotifyPending from UnwatchShardCb (#1402) NotifyPending was being called when a blocked transaction expires, which meant other blocked transactions could be woken up even though another transaction could be in progress. NotifyPending has no affect on the blocked transaction. Signed-off-by: Andy Dunstall --- src/server/list_family_test.cc | 30 ++++++++++++++++++++++++++++++ src/server/transaction.cc | 2 -- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 5963bce04..0151dc15a 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -797,4 +797,34 @@ TEST_F(ListFamilyTest, LInsert) { EXPECT_THAT(Run({"linsert", "mylist", "after", "notfound", "x"}), IntArg(-1)); } +TEST_F(ListFamilyTest, BLPopUnwakesInScript) { + const string_view SCRIPT = R"( + for i = 1, 1000 do + redis.call('MGET', 'a', 'b', 'c', 'd') + redis.call('LPUSH', 'l', tostring(i)) + end + )"; + + // Start blpop with without timeout + auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&]() { + auto resp = Run("blpop", {"BLPOP", "l", "0"}); + // blpop should only be awakened after the script has completed, so the + // last element added in the script should be returned. + EXPECT_THAT(resp, ArgType(RespExpr::ARRAY)); + EXPECT_THAT(resp.GetVec(), ElementsAre("l", "1000")); + }); + + // Start long running script that intends to wake up blpop + auto f2 = pp_->at(2)->LaunchFiber([&]() { + Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"}); + }); + + // Run blpop that times out + auto resp = Run({"blpop", "g", "0.01"}); + EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY)); + + f1.Join(); + f2.Join(); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 35f9d4c76..b51867d92 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1171,8 +1171,6 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard sd.local_mask &= ~KEYLOCK_ACQUIRED; shard->blocking_controller()->FinalizeWatched(wkeys, this); DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this)); - - shard->blocking_controller()->NotifyPending(); } // Need to see why I decided to call this.