From 74524415c3f5076ac82ae182f76c3af57b6cafa1 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Fri, 2 May 2025 11:29:52 +0200 Subject: [PATCH] fix(stream_family): Fix stream memory tracking issues (#5024) --- src/server/family_utils.cc | 15 +++++ src/server/family_utils.h | 20 +++++++ src/server/stream_family.cc | 115 +++++++++++++++++++----------------- 3 files changed, 96 insertions(+), 54 deletions(-) diff --git a/src/server/family_utils.cc b/src/server/family_utils.cc index 8f874ec6c..5041c2d6e 100644 --- a/src/server/family_utils.cc +++ b/src/server/family_utils.cc @@ -16,6 +16,21 @@ sds WrapSds(std::string_view s) { return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length()); } +SdsWrapper::SdsWrapper(std::string_view str) { + sds_ = sdsempty(); + sds_ = sdscpylen(sds_, str.data(), str.length()); +} + +SdsWrapper::~SdsWrapper() { + if (sds_) { + sdsfree(sds_); + } +} + +SdsWrapper::operator sds() { + return sds_; +} + NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) { CHECK_GT(max_range, RandomPick(0)); } diff --git a/src/server/family_utils.h b/src/server/family_utils.h index 2bec230ad..1c0c17793 100644 --- a/src/server/family_utils.h +++ b/src/server/family_utils.h @@ -43,6 +43,26 @@ static std::vector ExpireElements(DenseSet* owner, const facade::CmdArgLis // Copy str to thread local sds instance. Valid until next WrapSds call on thread sds WrapSds(std::string_view str); +// Clears sds on destruction +// This is a wrapper around sds to avoid using sdsfree() directly +class SdsWrapper { + public: + explicit SdsWrapper(std::string_view str); + + SdsWrapper(const SdsWrapper&) = delete; + SdsWrapper(SdsWrapper&&) = delete; + + SdsWrapper& operator=(SdsWrapper&&) = delete; + SdsWrapper& operator=(const SdsWrapper&) = delete; + + ~SdsWrapper(); + + operator sds(); + + private: + sds sds_{nullptr}; +}; + using RandomPick = uint32_t; class PicksGenerator { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 66a1bdb64..2e7eb2290 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -772,41 +772,52 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& return result_id; } -OpResult OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) { +/* This method modifies opts->group and opts->consumer by inserting data into them. + Since these structures are stored within the stream, the stream itself is also modified. + Therefore, we accept opts as a pointer to make this mutation explicit. */ +OpResult OpRange(const OpArgs& op_args, string_view key, RangeOpts* opts) { auto& db_slice = op_args.GetDbSlice(); - auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM); + auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) return res_it.status(); RecordVec result; - if (opts.count == 0) + if (!opts->count) return result; + CompactObj& cobj = res_it->it->second; + stream* s = (stream*)cobj.RObjPtr(); + + StreamMemTracker mem_tracker; + streamIterator si; + absl::Cleanup cleanup = [&]() { + streamIteratorStop(&si); + mem_tracker.UpdateStreamSize(cobj); + }; + int64_t numfields; streamID id; - const CompactObj& cobj = (*res_it)->second; - stream* s = (stream*)cobj.RObjPtr(); - streamID sstart = opts.start.val, send = opts.end.val; + streamID sstart = opts->start.val, send = opts->end.val; - streamIteratorStart(&si, s, &sstart, &send, opts.is_rev); + streamIteratorStart(&si, s, &sstart, &send, opts->is_rev); while (streamIteratorGetID(&si, &id, &numfields)) { Record rec; rec.id = id; rec.kv_arr.reserve(numfields); - if (opts.group && streamCompareID(&id, &opts.group->last_id) > 0) { - if (opts.group->entries_read != SCG_INVALID_ENTRIES_READ && + if (opts->group && streamCompareID(&id, &opts->group->last_id) > 0) { + if (opts->group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s, &id, NULL)) { /* A valid counter and no future tombstones mean we can * increment the read counter to keep tracking the group's * progress. */ - opts.group->entries_read++; + opts->group->entries_read++; } else if (s->entries_added) { /* The group's counter may be invalid, so we try to obtain it. */ - opts.group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id); + opts->group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id); } - opts.group->last_id = id; + opts->group->last_id = id; } /* Emit the field-value pairs. */ @@ -822,7 +833,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO result.push_back(std::move(rec)); - if (opts.group && !opts.noack) { + if (opts->group && !opts->noack) { unsigned char buf[sizeof(streamID)]; StreamEncodeID(buf, &id); uint64_t now_ms = op_args.db_cntx.time_now_ms; @@ -830,39 +841,37 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO /* Try to add a new NACK. Most of the time this will work and * will not require extra lookups. We'll fix the problem later * if we find that there is already an entry for this ID. */ - streamNACK* nack = StreamCreateNACK(opts.consumer, now_ms); - int group_inserted = raxTryInsert(opts.group->pel, buf, sizeof(buf), nack, nullptr); + streamNACK* nack = StreamCreateNACK(opts->consumer, now_ms); + int group_inserted = raxTryInsert(opts->group->pel, buf, sizeof(buf), nack, nullptr); - int consumer_inserted = raxTryInsert(opts.consumer->pel, buf, sizeof(buf), nack, nullptr); + int consumer_inserted = raxTryInsert(opts->consumer->pel, buf, sizeof(buf), nack, nullptr); /* Now we can check if the entry was already busy, and * in that case reassign the entry to the new consumer, * or update it if the consumer is the same as before. */ if (group_inserted == 0) { streamFreeNACK(nack); - nack = static_cast(raxFind(opts.group->pel, buf, sizeof(buf))); + nack = static_cast(raxFind(opts->group->pel, buf, sizeof(buf))); DCHECK(nack != raxNotFound); raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL); LOG_IF(DFATAL, nack->consumer->pel->numnodes == 0) << "Invalid rax state"; /* Update the consumer and NACK metadata. */ - nack->consumer = opts.consumer; + nack->consumer = opts->consumer; nack->delivery_time = now_ms; nack->delivery_count = 1; /* Add the entry in the new consumer local PEL. */ - raxInsert(opts.consumer->pel, buf, sizeof(buf), nack, NULL); + raxInsert(opts->consumer->pel, buf, sizeof(buf), nack, NULL); } else if (group_inserted == 1 && consumer_inserted == 0) { LOG(DFATAL) << "Internal error"; return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible."); } - opts.consumer->active_time = now_ms; + opts->consumer->active_time = now_ms; } - if (opts.count == result.size()) + if (opts->count == result.size()) break; } - streamIteratorStop(&si); - return result; } @@ -894,7 +903,7 @@ OpResult OpRangeFromConsumerPEL(const OpArgs& op_args, string_view ke RangeOpts ropts; ropts.start.val = id; ropts.end.val = id; - auto op_result = OpRange(op_args, key, ropts); + auto op_result = OpRange(op_args, key, &ropts); if (!op_result || !op_result.value().size()) { result.push_back(Record{id, vector>()}); } else { @@ -918,13 +927,16 @@ stream* GetReadOnlyStream(const CompactObj& cobj) { } // namespace -// Returns the range response for each stream on this shard in order of -// GetShardArgs. -vector OpRead(const OpArgs& op_args, const ShardArgs& shard_args, const ReadOpts& opts) { +/* Returns the range response for each stream on this shard in order of GetShardArgs. + + It modifies opts->stream_ids.group and opts->stream_ids.consumer by inserting data into them. + Since these structures are stored within the stream, the stream itself is also modified. + Therefore, we accept opts as a pointer to make this mutation explicit. */ +vector OpRead(const OpArgs& op_args, const ShardArgs& shard_args, ReadOpts* opts) { DCHECK(!shard_args.Empty()); RangeOpts range_opts; - range_opts.count = opts.count; + range_opts.count = opts->count; range_opts.end = ParsedStreamId{.val = streamID{ .ms = UINT64_MAX, .seq = UINT64_MAX, @@ -933,22 +945,22 @@ vector OpRead(const OpArgs& op_args, const ShardArgs& shard_args, con vector response(shard_args.Size()); unsigned index = 0; for (string_view key : shard_args) { - const auto& sitem = opts.stream_ids.at(key); + const auto& sitem = opts->stream_ids.at(key); auto& dest = response[index++]; - if (!sitem.group && opts.read_group) { + if (!sitem.group && opts->read_group) { continue; } range_opts.start = sitem.id; range_opts.group = sitem.group; range_opts.consumer = sitem.consumer; - range_opts.noack = opts.noack; + range_opts.noack = opts->noack; OpResult range_res; - if (opts.serve_history) + if (opts->serve_history) range_res = OpRangeFromConsumerPEL(op_args, key, range_opts); else - range_res = OpRange(op_args, key, range_opts); + range_res = OpRange(op_args, key, &range_opts); if (range_res) { dest = std::move(range_res.value()); } @@ -1174,7 +1186,7 @@ OpResult> OpConsumers(const DbContext& db_cntx, EngineShard vector result; const CompactObj& cobj = (*res_it)->second; stream* s = GetReadOnlyStream(cobj); - streamCG* cg = streamLookupCG(s, WrapSds(group_name)); + streamCG* cg = streamLookupCG(s, SdsWrapper(group_name)); if (cg == NULL) { return OpStatus::INVALID_VALUE; } @@ -1201,12 +1213,10 @@ OpResult> OpConsumers(const DbContext& db_cntx, EngineShard return result; } -constexpr uint8_t kCreateOptMkstream = 1 << 0; - struct CreateOpts { string_view gname; string_view id; - uint8_t flags = 0; + bool create_stream = false; }; OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) { @@ -1216,7 +1226,7 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts StreamMemTracker mem_tracker; if (!res_it) { - if (opts.flags & kCreateOptMkstream) { + if (opts.create_stream) { // MKSTREAM is enabled, so create the stream res_it = db_slice.AddNew(op_args.db_cntx, key, PrimeValue{}, 0); if (!res_it) @@ -1266,7 +1276,7 @@ OpResult FindGroup(const OpArgs& op_args, string_view key, stri CompactObj& cobj = res_it->it->second; auto* s = static_cast(cobj.RObjPtr()); - auto* cg = streamLookupCG(s, WrapSds(gname)); + auto* cg = streamLookupCG(s, SdsWrapper(gname)); if (skip_group && !cg) return OpStatus::SKIPPED; @@ -1276,8 +1286,7 @@ OpResult FindGroup(const OpArgs& op_args, string_view key, stri // Try to get the consumer. If not found, create a new one. streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) { // Try to get the consumer. If not found, create a new one. - auto cname = WrapSds(name); - streamConsumer* consumer = streamLookupConsumer(cg, cname); + streamConsumer* consumer = streamLookupConsumer(cg, SdsWrapper(name)); if (consumer) consumer->seen_time = now_ms; else // TODO: notify xgroup-createconsumer event once we support stream events. @@ -1482,7 +1491,7 @@ OpResult OpDelConsumer(const OpArgs& op_args, string_view key, string_ StreamMemTracker mem_tracker; long long pending = 0; - streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name)); + streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, SdsWrapper(consumer_name)); if (consumer) { pending = raxSize(consumer->pel); streamDelConsumer(cgroup_res->cg, consumer); @@ -1857,7 +1866,7 @@ OpResult OpPending(const OpArgs& op_args, string_view key, const streamConsumer* consumer = nullptr; if (!opts.consumer_name.empty()) { - consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name)); + consumer = streamLookupConsumer(cgroup_res->cg, SdsWrapper(opts.consumer_name)); } PendingResult result; @@ -1878,9 +1887,7 @@ void CreateGroup(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder CreateOpts opts; std::tie(opts.gname, opts.id) = parser->Next(); - if (parser->Check("MKSTREAM")) { - opts.flags |= kCreateOptMkstream; - } + opts.create_stream = parser->Check("MKSTREAM"); if (auto err = parser->Error(); err) return builder->SendError(err->MakeReply()); @@ -2362,7 +2369,7 @@ void XRangeGeneric(std::string_view key, std::string_view start, std::string_vie range_opts.is_rev = is_rev; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRange(t->GetOpArgs(shard), key, range_opts); + return OpRange(t->GetOpArgs(shard), key, &range_opts); }; OpResult result = tx->ScheduleSingleHopT(cb); @@ -2413,7 +2420,7 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, // Update group pointer and check it's validity if (opts->read_group) { - sitem.group = streamLookupCG(s, WrapSds(opts->group_name)); + sitem.group = streamLookupCG(s, SdsWrapper(opts->group_name)); if (!sitem.group) return true; // abort } @@ -2485,7 +2492,7 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, range_opts.noack = opts->noack; - result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts); + result = OpRange(t->GetOpArgs(shard), *wake_key, &range_opts); key = *wake_key; } return OpStatus::OK; @@ -2533,7 +2540,7 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB if (try_fastread) { if (have_entries.load(memory_order_relaxed)) - fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *opts); + fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), &*opts); else return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING}; } @@ -2556,7 +2563,7 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB xread_resp.resize(shard_set->size()); auto read_cb = [&](Transaction* t, EngineShard* shard) { ShardId sid = shard->shard_id(); - xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts); + xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), &*opts); return OpStatus::OK; }; tx->Execute(std::move(read_cb), true); @@ -3235,8 +3242,8 @@ variant HasEntries2(const OpArgs& op_args, string_view StreamMemTracker mem_tracker; absl::Cleanup update_size_cb([&]() { mem_tracker.UpdateStreamSize(res_it->it->second); }); - const CompactObj& cobj = res_it->it->second; - stream* s = GetReadOnlyStream(cobj); + CompactObj& cobj = res_it->it->second; + stream* s = (stream*)cobj.RObjPtr(); // Fetch last id streamID last_id = s->last_id; @@ -3250,7 +3257,7 @@ variant HasEntries2(const OpArgs& op_args, string_view streamCG* group = nullptr; streamConsumer* consumer = nullptr; if (opts->read_group) { - group = streamLookupCG(s, WrapSds(opts->group_name)); + group = streamLookupCG(s, SdsWrapper(opts->group_name)); if (!group) return facade::ErrorReply{ NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};