From 56a7f85e39868ee2c96c5163c758ab57a07ec288 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Wed, 10 Apr 2024 16:59:33 +0300 Subject: [PATCH] fix: brpoplpush single shard to wake up blocked transactions (#2875) * wake up blocked transactions on single shard --- src/server/list_family.cc | 16 +++++++++++++--- src/server/list_family_test.cc | 17 +++++++++++++++++ 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 9b1354ca3..10a0115de 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -850,10 +850,20 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { auto cb_move = [&](Transaction* t, EngineShard* shard) { OpArgs op_args = t->GetOpArgs(shard); op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_); - if (op_res && op_args.shard->journal()) { - std::array arr = {pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)}; - RecordJournal(op_args, "LMOVE", arr, 1); + if (op_res) { + if (op_args.shard->journal()) { + std::array arr = {pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)}; + RecordJournal(op_args, "LMOVE", arr, 1); + } + if (shard->blocking_controller()) { + string tmp; + + shard->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, push_key_); + absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", push_key_, " by ", + op_args.tx->DebugId()); + } } + return OpStatus::OK; }; t->Execute(cb_move, false); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 3284ead5e..817b54256 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -642,6 +642,23 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) { ASSERT_EQ(0, NumWatched()); } +TEST_F(ListFamilyTest, BRPopLPushSingleShardBug2857) { + Run({"lpush", "src", "val1"}); + RespExpr resp; + auto blpop = [&]() { resp = Run("id", {"blpop", "dest", "4"}); }; + auto f = pp_->at(1)->LaunchFiber(Launch::dispatch, blpop); + EXPECT_THAT(Run({"brpoplpush", "src", "dest", "1"}), "val1"); + f.Join(); + EXPECT_THAT(resp, ArgType(RespExpr::ARRAY)); + EXPECT_THAT(resp.GetVec(), ElementsAre("dest", "val1")); + + // Timeout + f = pp_->at(1)->LaunchFiber(Launch::dispatch, blpop); + EXPECT_THAT(Run({"brpoplpush", "src", "dest", "1"}), ArgType(RespExpr::NIL)); + f.Join(); + EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY)); +} + TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { RespExpr resp;