fix: remove NotifyPending from UnwatchShardCb (#1402)

NotifyPending was being called when a blocked transaction expires, which
meant other blocked transactions could be woken up even though another
transaction could be in progress. NotifyPending has no affect on the
blocked transaction.

Signed-off-by: Andy Dunstall <andydunstall@hotmail.co.uk>
This commit is contained in:
Andy Dunstall 2023-06-13 06:59:34 +01:00 committed by GitHub
parent 3f27bd3eba
commit d6e97fcf6d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 2 deletions

View file

@ -797,4 +797,34 @@ TEST_F(ListFamilyTest, LInsert) {
EXPECT_THAT(Run({"linsert", "mylist", "after", "notfound", "x"}), IntArg(-1)); EXPECT_THAT(Run({"linsert", "mylist", "after", "notfound", "x"}), IntArg(-1));
} }
TEST_F(ListFamilyTest, BLPopUnwakesInScript) {
const string_view SCRIPT = R"(
for i = 1, 1000 do
redis.call('MGET', 'a', 'b', 'c', 'd')
redis.call('LPUSH', 'l', tostring(i))
end
)";
// Start blpop with without timeout
auto f1 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&]() {
auto resp = Run("blpop", {"BLPOP", "l", "0"});
// blpop should only be awakened after the script has completed, so the
// last element added in the script should be returned.
EXPECT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), ElementsAre("l", "1000"));
});
// Start long running script that intends to wake up blpop
auto f2 = pp_->at(2)->LaunchFiber([&]() {
Run("script", {"EVAL", SCRIPT, "5", "a", "b", "c", "d", "l"});
});
// Run blpop that times out
auto resp = Run({"blpop", "g", "0.01"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY));
f1.Join();
f2.Join();
}
} // namespace dfly } // namespace dfly

View file

@ -1171,8 +1171,6 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard
sd.local_mask &= ~KEYLOCK_ACQUIRED; sd.local_mask &= ~KEYLOCK_ACQUIRED;
shard->blocking_controller()->FinalizeWatched(wkeys, this); shard->blocking_controller()->FinalizeWatched(wkeys, this);
DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this)); DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this));
shard->blocking_controller()->NotifyPending();
} }
// Need to see why I decided to call this. // Need to see why I decided to call this.