mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix: brpoplpush single shard to wake up blocked transactions (#2875)
* wake up blocked transactions on single shard
This commit is contained in:
parent
3e270fee53
commit
56a7f85e39
2 changed files with 30 additions and 3 deletions
|
@ -850,10 +850,20 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
|
||||||
auto cb_move = [&](Transaction* t, EngineShard* shard) {
|
auto cb_move = [&](Transaction* t, EngineShard* shard) {
|
||||||
OpArgs op_args = t->GetOpArgs(shard);
|
OpArgs op_args = t->GetOpArgs(shard);
|
||||||
op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_);
|
op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_);
|
||||||
if (op_res && op_args.shard->journal()) {
|
if (op_res) {
|
||||||
std::array<string_view, 4> arr = {pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)};
|
if (op_args.shard->journal()) {
|
||||||
RecordJournal(op_args, "LMOVE", arr, 1);
|
std::array<string_view, 4> arr = {pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)};
|
||||||
|
RecordJournal(op_args, "LMOVE", arr, 1);
|
||||||
|
}
|
||||||
|
if (shard->blocking_controller()) {
|
||||||
|
string tmp;
|
||||||
|
|
||||||
|
shard->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, push_key_);
|
||||||
|
absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", push_key_, " by ",
|
||||||
|
op_args.tx->DebugId());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
};
|
};
|
||||||
t->Execute(cb_move, false);
|
t->Execute(cb_move, false);
|
||||||
|
|
|
@ -642,6 +642,23 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
|
||||||
ASSERT_EQ(0, NumWatched());
|
ASSERT_EQ(0, NumWatched());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(ListFamilyTest, BRPopLPushSingleShardBug2857) {
|
||||||
|
Run({"lpush", "src", "val1"});
|
||||||
|
RespExpr resp;
|
||||||
|
auto blpop = [&]() { resp = Run("id", {"blpop", "dest", "4"}); };
|
||||||
|
auto f = pp_->at(1)->LaunchFiber(Launch::dispatch, blpop);
|
||||||
|
EXPECT_THAT(Run({"brpoplpush", "src", "dest", "1"}), "val1");
|
||||||
|
f.Join();
|
||||||
|
EXPECT_THAT(resp, ArgType(RespExpr::ARRAY));
|
||||||
|
EXPECT_THAT(resp.GetVec(), ElementsAre("dest", "val1"));
|
||||||
|
|
||||||
|
// Timeout
|
||||||
|
f = pp_->at(1)->LaunchFiber(Launch::dispatch, blpop);
|
||||||
|
EXPECT_THAT(Run({"brpoplpush", "src", "dest", "1"}), ArgType(RespExpr::NIL));
|
||||||
|
f.Join();
|
||||||
|
EXPECT_THAT(resp, ArgType(RespExpr::NIL_ARRAY));
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
|
TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
|
||||||
RespExpr resp;
|
RespExpr resp;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue