From e434f623367e58021a8188a0338fb68fa2171cd2 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 27 Jan 2025 18:32:53 +0200 Subject: [PATCH] fix: crash for delconsumer during stream reading (#4513) fix: crash for delconsumer during reading stream --- src/server/stream_family.cc | 10 +++++++--- src/server/stream_family_test.cc | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 3 deletions(-) diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 802c4265f..c64ddde04 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2335,10 +2335,11 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, if (sitem.group) { range_opts.consumer = FindOrAddConsumer(opts->consumer_name, sitem.group, GetCurrentTimeMs()); - } + sitem.consumer = range_opts.consumer; + if (!sitem.consumer) { + return OpStatus::OUT_OF_MEMORY; + } - range_opts.noack = opts->noack; - if (sitem.consumer) { if (sitem.consumer->pel->numnodes == 0) { LOG(DFATAL) << "Internal error when accessing consumer data, seen_time " << sitem.consumer->seen_time; @@ -2346,6 +2347,9 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, return OpStatus::OK; } } + + range_opts.noack = opts->noack; + result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts); key = *wake_key; } diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 12edae158..0872b7953 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -400,6 +400,25 @@ TEST_F(StreamFamilyTest, XReadGroupBlock) { EXPECT_THAT(resp0, ErrArg("consumer group this client was blocked on no longer exists")); } +TEST_F(StreamFamilyTest, XReadGroupBlockDelconsumer) { + Run({"XGROUP", "CREATE", "foo", "group", "0", "MKSTREAM"}); + + RespExpr resp0; + auto fb0 = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] { + resp0 = Run({"XREADGROUP", "GROUP", "group", "alice", "BLOCK", "0", "streams", "foo", ">"}); + }); + ThisFiber::SleepFor(50us); + + // Del consumer while it's blocked + RespExpr resp_del_consumer = Run({"XGROUP", "DELCONSUMER", "foo", "group", "alice"}); + + pp_->at(1)->Await([&] { return Run("xadd", {"XADD", "foo", "1-0", "k1", "v1"}); }); + fb0.Join(); + + EXPECT_THAT(resp0.GetVec(), ElementsAre("foo", ArrLen(1))); + EXPECT_THAT(resp_del_consumer, IntArg(0)); +} + TEST_F(StreamFamilyTest, XReadInvalidArgs) { // Invalid COUNT value. auto resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "0", "0"});