fix: stream bugs (#4240)

This PR syncs some of the improvements that were introduced in streams in Redis 7.2.3 OSS.

1. verify xsetid against max deleted id in the stream
2. Implement precise memory measurement of streams for "memory usage" command.
3. Always compact nodes in stream listpacks after creating new nodes.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-03 16:12:54 +02:00 committed by GitHub
parent 95f2320825
commit 8d343bfd69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 148 additions and 30 deletions

View file

@ -125,6 +125,95 @@ size_t MallocUsedZSet(unsigned encoding, void* ptr) {
return 0;
}
/* This is a helper function with the goal of estimating the memory
* size of a radix tree that is used to store Stream IDs.
*
* Note: to guess the size of the radix tree is not trivial, so we
* approximate it considering 16 bytes of data overhead for each
* key (the ID), and then adding the number of bare nodes, plus some
* overhead due by the data and child pointers. This secret recipe
* was obtained by checking the average radix tree created by real
* workloads, and then adjusting the constants to get numbers that
* more or less match the real memory usage.
*
* Actually the number of nodes and keys may be different depending
* on the insertion speed and thus the ability of the radix tree
* to compress prefixes. */
size_t streamRadixTreeMemoryUsage(rax* rax) {
size_t size = sizeof(*rax);
size = rax->numele * sizeof(streamID);
size += rax->numnodes * sizeof(raxNode);
/* Add a fixed overhead due to the aux data pointer, children, ... */
size += rax->numnodes * sizeof(long) * 30;
return size;
}
size_t MallocUsedStream(stream* s) {
size_t asize = sizeof(*s);
asize += streamRadixTreeMemoryUsage(s->rax_tree);
/* Now we have to add the listpacks. The last listpack is often non
* complete, so we estimate the size of the first N listpacks, and
* use the average to compute the size of the first N-1 listpacks, and
* finally add the real size of the last node. */
raxIterator ri;
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "^", NULL, 0);
size_t lpsize = 0, samples = 0;
while (raxNext(&ri)) {
uint8_t* lp = (uint8_t*)ri.data;
/* Use the allocated size, since we overprovision the node initially. */
lpsize += zmalloc_size(lp);
samples++;
}
if (s->rax_tree->numele <= samples) {
asize += lpsize;
} else {
if (samples)
lpsize /= samples; /* Compute the average. */
asize += lpsize * (s->rax_tree->numele - 1);
/* No need to check if seek succeeded, we enter this branch only
* if there are a few elements in the radix tree. */
raxSeek(&ri, "$", NULL, 0);
raxNext(&ri);
/* Use the allocated size, since we overprovision the node initially. */
asize += zmalloc_size(ri.data);
}
raxStop(&ri);
/* Consumer groups also have a non trivial memory overhead if there
* are many consumers and many groups, let's count at least the
* overhead of the pending entries in the groups and consumers
* PELs. */
if (s->cgroups) {
raxStart(&ri, s->cgroups);
raxSeek(&ri, "^", NULL, 0);
while (raxNext(&ri)) {
streamCG* cg = (streamCG*)ri.data;
asize += sizeof(*cg);
asize += streamRadixTreeMemoryUsage(cg->pel);
asize += sizeof(streamNACK) * raxSize(cg->pel);
/* For each consumer we also need to add the basic data
* structures and the PEL memory usage. */
raxIterator cri;
raxStart(&cri, cg->consumers);
raxSeek(&cri, "^", NULL, 0);
while (raxNext(&cri)) {
const streamConsumer* consumer = (const streamConsumer*)cri.data;
asize += sizeof(*consumer);
asize += sdslen(consumer->name);
asize += streamRadixTreeMemoryUsage(consumer->pel);
/* Don't count NACKs again, they are shared with the
* consumer group PEL. */
}
raxStop(&cri);
}
raxStop(&ri);
}
return asize;
}
inline void FreeObjHash(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingStrMap2:
@ -311,7 +400,7 @@ size_t RobjWrapper::MallocUsed(bool slow) const {
case OBJ_ZSET:
return MallocUsedZSet(encoding_, inner_obj_);
case OBJ_STREAM:
return sz_;
return slow ? MallocUsedStream((stream*)inner_obj_) : sz_;
default:
LOG(FATAL) << "Not supported " << type_;