fix(stream_family): Fix stream memory tracking issues (#5024)

This commit is contained in:
Stepan Bagritsevich 2025-05-02 11:29:52 +02:00 committed by GitHub
parent 291b2622c6
commit 74524415c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 96 additions and 54 deletions

View file

@ -16,6 +16,21 @@ sds WrapSds(std::string_view s) {
return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length());
}
SdsWrapper::SdsWrapper(std::string_view str) {
sds_ = sdsempty();
sds_ = sdscpylen(sds_, str.data(), str.length());
}
SdsWrapper::~SdsWrapper() {
if (sds_) {
sdsfree(sds_);
}
}
SdsWrapper::operator sds() {
return sds_;
}
NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
CHECK_GT(max_range, RandomPick(0));
}

View file

@ -43,6 +43,26 @@ static std::vector<long> ExpireElements(DenseSet* owner, const facade::CmdArgLis
// Copy str to thread local sds instance. Valid until next WrapSds call on thread
sds WrapSds(std::string_view str);
// Clears sds on destruction
// This is a wrapper around sds to avoid using sdsfree() directly
class SdsWrapper {
public:
explicit SdsWrapper(std::string_view str);
SdsWrapper(const SdsWrapper&) = delete;
SdsWrapper(SdsWrapper&&) = delete;
SdsWrapper& operator=(SdsWrapper&&) = delete;
SdsWrapper& operator=(const SdsWrapper&) = delete;
~SdsWrapper();
operator sds();
private:
sds sds_{nullptr};
};
using RandomPick = uint32_t;
class PicksGenerator {

View file

@ -772,41 +772,52 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
return result_id;
}
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeOpts& opts) {
/* This method modifies opts->group and opts->consumer by inserting data into them.
Since these structures are stored within the stream, the stream itself is also modified.
Therefore, we accept opts as a pointer to make this mutation explicit. */
OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, RangeOpts* opts) {
auto& db_slice = op_args.GetDbSlice();
auto res_it = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_STREAM);
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it)
return res_it.status();
RecordVec result;
if (opts.count == 0)
if (!opts->count)
return result;
CompactObj& cobj = res_it->it->second;
stream* s = (stream*)cobj.RObjPtr();
StreamMemTracker mem_tracker;
streamIterator si;
absl::Cleanup cleanup = [&]() {
streamIteratorStop(&si);
mem_tracker.UpdateStreamSize(cobj);
};
int64_t numfields;
streamID id;
const CompactObj& cobj = (*res_it)->second;
stream* s = (stream*)cobj.RObjPtr();
streamID sstart = opts.start.val, send = opts.end.val;
streamID sstart = opts->start.val, send = opts->end.val;
streamIteratorStart(&si, s, &sstart, &send, opts.is_rev);
streamIteratorStart(&si, s, &sstart, &send, opts->is_rev);
while (streamIteratorGetID(&si, &id, &numfields)) {
Record rec;
rec.id = id;
rec.kv_arr.reserve(numfields);
if (opts.group && streamCompareID(&id, &opts.group->last_id) > 0) {
if (opts.group->entries_read != SCG_INVALID_ENTRIES_READ &&
if (opts->group && streamCompareID(&id, &opts->group->last_id) > 0) {
if (opts->group->entries_read != SCG_INVALID_ENTRIES_READ &&
!streamRangeHasTombstones(s, &id, NULL)) {
/* A valid counter and no future tombstones mean we can
* increment the read counter to keep tracking the group's
* progress. */
opts.group->entries_read++;
opts->group->entries_read++;
} else if (s->entries_added) {
/* The group's counter may be invalid, so we try to obtain it. */
opts.group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id);
opts->group->entries_read = streamEstimateDistanceFromFirstEverEntry(s, &id);
}
opts.group->last_id = id;
opts->group->last_id = id;
}
/* Emit the field-value pairs. */
@ -822,7 +833,7 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
result.push_back(std::move(rec));
if (opts.group && !opts.noack) {
if (opts->group && !opts->noack) {
unsigned char buf[sizeof(streamID)];
StreamEncodeID(buf, &id);
uint64_t now_ms = op_args.db_cntx.time_now_ms;
@ -830,39 +841,37 @@ OpResult<RecordVec> OpRange(const OpArgs& op_args, string_view key, const RangeO
/* Try to add a new NACK. Most of the time this will work and
* will not require extra lookups. We'll fix the problem later
* if we find that there is already an entry for this ID. */
streamNACK* nack = StreamCreateNACK(opts.consumer, now_ms);
int group_inserted = raxTryInsert(opts.group->pel, buf, sizeof(buf), nack, nullptr);
streamNACK* nack = StreamCreateNACK(opts->consumer, now_ms);
int group_inserted = raxTryInsert(opts->group->pel, buf, sizeof(buf), nack, nullptr);
int consumer_inserted = raxTryInsert(opts.consumer->pel, buf, sizeof(buf), nack, nullptr);
int consumer_inserted = raxTryInsert(opts->consumer->pel, buf, sizeof(buf), nack, nullptr);
/* Now we can check if the entry was already busy, and
* in that case reassign the entry to the new consumer,
* or update it if the consumer is the same as before. */
if (group_inserted == 0) {
streamFreeNACK(nack);
nack = static_cast<streamNACK*>(raxFind(opts.group->pel, buf, sizeof(buf)));
nack = static_cast<streamNACK*>(raxFind(opts->group->pel, buf, sizeof(buf)));
DCHECK(nack != raxNotFound);
raxRemove(nack->consumer->pel, buf, sizeof(buf), NULL);
LOG_IF(DFATAL, nack->consumer->pel->numnodes == 0) << "Invalid rax state";
/* Update the consumer and NACK metadata. */
nack->consumer = opts.consumer;
nack->consumer = opts->consumer;
nack->delivery_time = now_ms;
nack->delivery_count = 1;
/* Add the entry in the new consumer local PEL. */
raxInsert(opts.consumer->pel, buf, sizeof(buf), nack, NULL);
raxInsert(opts->consumer->pel, buf, sizeof(buf), nack, NULL);
} else if (group_inserted == 1 && consumer_inserted == 0) {
LOG(DFATAL) << "Internal error";
return OpStatus::SKIPPED; // ("NACK half-created. Should not be possible.");
}
opts.consumer->active_time = now_ms;
opts->consumer->active_time = now_ms;
}
if (opts.count == result.size())
if (opts->count == result.size())
break;
}
streamIteratorStop(&si);
return result;
}
@ -894,7 +903,7 @@ OpResult<RecordVec> OpRangeFromConsumerPEL(const OpArgs& op_args, string_view ke
RangeOpts ropts;
ropts.start.val = id;
ropts.end.val = id;
auto op_result = OpRange(op_args, key, ropts);
auto op_result = OpRange(op_args, key, &ropts);
if (!op_result || !op_result.value().size()) {
result.push_back(Record{id, vector<pair<string, string>>()});
} else {
@ -918,13 +927,16 @@ stream* GetReadOnlyStream(const CompactObj& cobj) {
} // namespace
// Returns the range response for each stream on this shard in order of
// GetShardArgs.
vector<RecordVec> OpRead(const OpArgs& op_args, const ShardArgs& shard_args, const ReadOpts& opts) {
/* Returns the range response for each stream on this shard in order of GetShardArgs.
It modifies opts->stream_ids.group and opts->stream_ids.consumer by inserting data into them.
Since these structures are stored within the stream, the stream itself is also modified.
Therefore, we accept opts as a pointer to make this mutation explicit. */
vector<RecordVec> OpRead(const OpArgs& op_args, const ShardArgs& shard_args, ReadOpts* opts) {
DCHECK(!shard_args.Empty());
RangeOpts range_opts;
range_opts.count = opts.count;
range_opts.count = opts->count;
range_opts.end = ParsedStreamId{.val = streamID{
.ms = UINT64_MAX,
.seq = UINT64_MAX,
@ -933,22 +945,22 @@ vector<RecordVec> OpRead(const OpArgs& op_args, const ShardArgs& shard_args, con
vector<RecordVec> response(shard_args.Size());
unsigned index = 0;
for (string_view key : shard_args) {
const auto& sitem = opts.stream_ids.at(key);
const auto& sitem = opts->stream_ids.at(key);
auto& dest = response[index++];
if (!sitem.group && opts.read_group) {
if (!sitem.group && opts->read_group) {
continue;
}
range_opts.start = sitem.id;
range_opts.group = sitem.group;
range_opts.consumer = sitem.consumer;
range_opts.noack = opts.noack;
range_opts.noack = opts->noack;
OpResult<RecordVec> range_res;
if (opts.serve_history)
if (opts->serve_history)
range_res = OpRangeFromConsumerPEL(op_args, key, range_opts);
else
range_res = OpRange(op_args, key, range_opts);
range_res = OpRange(op_args, key, &range_opts);
if (range_res) {
dest = std::move(range_res.value());
}
@ -1174,7 +1186,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
vector<ConsumerInfo> result;
const CompactObj& cobj = (*res_it)->second;
stream* s = GetReadOnlyStream(cobj);
streamCG* cg = streamLookupCG(s, WrapSds(group_name));
streamCG* cg = streamLookupCG(s, SdsWrapper(group_name));
if (cg == NULL) {
return OpStatus::INVALID_VALUE;
}
@ -1201,12 +1213,10 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
return result;
}
constexpr uint8_t kCreateOptMkstream = 1 << 0;
struct CreateOpts {
string_view gname;
string_view id;
uint8_t flags = 0;
bool create_stream = false;
};
OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) {
@ -1216,7 +1226,7 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
StreamMemTracker mem_tracker;
if (!res_it) {
if (opts.flags & kCreateOptMkstream) {
if (opts.create_stream) {
// MKSTREAM is enabled, so create the stream
res_it = db_slice.AddNew(op_args.db_cntx, key, PrimeValue{}, 0);
if (!res_it)
@ -1266,7 +1276,7 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
CompactObj& cobj = res_it->it->second;
auto* s = static_cast<stream*>(cobj.RObjPtr());
auto* cg = streamLookupCG(s, WrapSds(gname));
auto* cg = streamLookupCG(s, SdsWrapper(gname));
if (skip_group && !cg)
return OpStatus::SKIPPED;
@ -1276,8 +1286,7 @@ OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, stri
// Try to get the consumer. If not found, create a new one.
streamConsumer* FindOrAddConsumer(string_view name, streamCG* cg, uint64_t now_ms) {
// Try to get the consumer. If not found, create a new one.
auto cname = WrapSds(name);
streamConsumer* consumer = streamLookupConsumer(cg, cname);
streamConsumer* consumer = streamLookupConsumer(cg, SdsWrapper(name));
if (consumer)
consumer->seen_time = now_ms;
else // TODO: notify xgroup-createconsumer event once we support stream events.
@ -1482,7 +1491,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
StreamMemTracker mem_tracker;
long long pending = 0;
streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name));
streamConsumer* consumer = streamLookupConsumer(cgroup_res->cg, SdsWrapper(consumer_name));
if (consumer) {
pending = raxSize(consumer->pel);
streamDelConsumer(cgroup_res->cg, consumer);
@ -1857,7 +1866,7 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
streamConsumer* consumer = nullptr;
if (!opts.consumer_name.empty()) {
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name));
consumer = streamLookupConsumer(cgroup_res->cg, SdsWrapper(opts.consumer_name));
}
PendingResult result;
@ -1878,9 +1887,7 @@ void CreateGroup(facade::CmdArgParser* parser, Transaction* tx, SinkReplyBuilder
CreateOpts opts;
std::tie(opts.gname, opts.id) = parser->Next<string_view, string_view>();
if (parser->Check("MKSTREAM")) {
opts.flags |= kCreateOptMkstream;
}
opts.create_stream = parser->Check("MKSTREAM");
if (auto err = parser->Error(); err)
return builder->SendError(err->MakeReply());
@ -2362,7 +2369,7 @@ void XRangeGeneric(std::string_view key, std::string_view start, std::string_vie
range_opts.is_rev = is_rev;
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRange(t->GetOpArgs(shard), key, range_opts);
return OpRange(t->GetOpArgs(shard), key, &range_opts);
};
OpResult<RecordVec> result = tx->ScheduleSingleHopT(cb);
@ -2413,7 +2420,7 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
// Update group pointer and check it's validity
if (opts->read_group) {
sitem.group = streamLookupCG(s, WrapSds(opts->group_name));
sitem.group = streamLookupCG(s, SdsWrapper(opts->group_name));
if (!sitem.group)
return true; // abort
}
@ -2485,7 +2492,7 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
range_opts.noack = opts->noack;
result = OpRange(t->GetOpArgs(shard), *wake_key, range_opts);
result = OpRange(t->GetOpArgs(shard), *wake_key, &range_opts);
key = *wake_key;
}
return OpStatus::OK;
@ -2533,7 +2540,7 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB
if (try_fastread) {
if (have_entries.load(memory_order_relaxed))
fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), *opts);
fastread_prefetched = OpRead(tx->GetOpArgs(es), tx->GetShardArgs(es->shard_id()), &*opts);
else
return {OpStatus::OK, Transaction::RunnableResult::AVOID_CONCLUDING};
}
@ -2556,7 +2563,7 @@ void XReadGeneric2(CmdArgList args, bool read_group, Transaction* tx, SinkReplyB
xread_resp.resize(shard_set->size());
auto read_cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), *opts);
xread_resp[sid] = OpRead(t->GetOpArgs(shard), t->GetShardArgs(sid), &*opts);
return OpStatus::OK;
};
tx->Execute(std::move(read_cb), true);
@ -3235,8 +3242,8 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
StreamMemTracker mem_tracker;
absl::Cleanup update_size_cb([&]() { mem_tracker.UpdateStreamSize(res_it->it->second); });
const CompactObj& cobj = res_it->it->second;
stream* s = GetReadOnlyStream(cobj);
CompactObj& cobj = res_it->it->second;
stream* s = (stream*)cobj.RObjPtr();
// Fetch last id
streamID last_id = s->last_id;
@ -3250,7 +3257,7 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
streamCG* group = nullptr;
streamConsumer* consumer = nullptr;
if (opts->read_group) {
group = streamLookupCG(s, WrapSds(opts->group_name));
group = streamLookupCG(s, SdsWrapper(opts->group_name));
if (!group)
return facade::ErrorReply{
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};