mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: crash for delconsumer during stream reading (#4513)
fix: crash for delconsumer during reading stream
This commit is contained in:
parent
2d85f59a74
commit
e434f62336
2 changed files with 26 additions and 3 deletions
|
@ -2335,10 +2335,11 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
|
|||
if (sitem.group) {
|
||||
range_opts.consumer =
|
||||
FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs());
|
||||
}
|
||||
sitem.consumer = range_opts.consumer;
|
||||
if (!sitem.consumer) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
range_opts.noack = opts->noack;
|
||||
if (sitem.consumer) {
|
||||
if (sitem.consumer->pel->numnodes == 0) {
|
||||
LOG(DFATAL) << "Internal error when accessing consumer data, seen_time "
|
||||
<< sitem.consumer->seen_time;
|
||||
|
@ -2346,6 +2347,9 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
|
|||
return OpStatus::OK;
|
||||
}
|
||||
}
|
||||
|
||||
range_opts.noack = opts->noack;
|
||||
|
||||
result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts);
|
||||
key = *wake_key;
|
||||
}
|
||||
|
|
|
@ -400,6 +400,25 @@ TEST_F(StreamFamilyTest, XReadGroupBlock) {
|
|||
EXPECT_THAT(resp0, ErrArg("consumer group this client was blocked on no longer exists"));
|
||||
}
|
||||
|
||||
TEST_F(StreamFamilyTest, XReadGroupBlockDelconsumer) {
|
||||
Run({"XGROUP", "CREATE", "foo", "group", "0", "MKSTREAM"});
|
||||
|
||||
RespExpr resp0;
|
||||
auto fb0 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
|
||||
resp0 = Run({"XREADGROUP", "GROUP", "group", "alice", "BLOCK", "0", "streams", "foo", ">"});
|
||||
});
|
||||
ThisFiber::SleepFor(50us);
|
||||
|
||||
// Del consumer while it's blocked
|
||||
RespExpr resp_del_consumer = Run({"XGROUP", "DELCONSUMER", "foo", "group", "alice"});
|
||||
|
||||
pp_->at(1)->Await([&] { return Run("xadd", {"XADD", "foo", "1-0", "k1", "v1"}); });
|
||||
fb0.Join();
|
||||
|
||||
EXPECT_THAT(resp0.GetVec(), ElementsAre("foo", ArrLen(1)));
|
||||
EXPECT_THAT(resp_del_consumer, IntArg(0));
|
||||
}
|
||||
|
||||
TEST_F(StreamFamilyTest, XReadInvalidArgs) {
|
||||
// Invalid COUNT value.
|
||||
auto resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "0", "0"});
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue