mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat: add ability reading stream_listpacks_2/3 rdb types (#4192)
* feat: add ability reading stream_listpacks_2/3 rdb types * refactor: address comments
This commit is contained in:
parent
f84e1eeac8
commit
3327e1a908
6 changed files with 89 additions and 4 deletions
|
@ -109,7 +109,8 @@ cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
|||
cxx_test(bitops_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
|
||||
testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb
|
||||
testdata/redis_json.rdb LABELS DFLY)
|
||||
testdata/redis_json.rdb testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb
|
||||
testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb LABELS DFLY)
|
||||
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
|
||||
|
|
|
@ -1502,7 +1502,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
|
|||
iores = ReadListQuicklist(rdbtype);
|
||||
break;
|
||||
case RDB_TYPE_STREAM_LISTPACKS:
|
||||
iores = ReadStreams();
|
||||
case RDB_TYPE_STREAM_LISTPACKS_2:
|
||||
case RDB_TYPE_STREAM_LISTPACKS_3:
|
||||
iores = ReadStreams(rdbtype);
|
||||
break;
|
||||
case RDB_TYPE_JSON:
|
||||
iores = ReadJson();
|
||||
|
@ -1828,7 +1830,7 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
|
|||
return OpaqueObj{std::move(load_trace), rdbtype};
|
||||
}
|
||||
|
||||
auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
|
||||
auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
|
||||
size_t listpacks;
|
||||
if (pending_read_.remaining > 0) {
|
||||
listpacks = pending_read_.remaining;
|
||||
|
@ -1892,6 +1894,30 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
|
|||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms);
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq);
|
||||
|
||||
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||
/* Load the first entry ID. */
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.ms);
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.seq);
|
||||
|
||||
/* Load the maximal deleted entry ID. */
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.ms);
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.seq);
|
||||
|
||||
/* Load the offset. */
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->entries_added);
|
||||
} else {
|
||||
/* During migration the offset can be initialized to the stream's
|
||||
* length. At this point, we also don't care about tombstones
|
||||
* because CG offsets will be later initialized as well. */
|
||||
load_trace->stream_trace->max_deleted_entry_id.ms = 0;
|
||||
load_trace->stream_trace->max_deleted_entry_id.seq = 0;
|
||||
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;
|
||||
|
||||
// TODO add implementation, we need to find the first entry's ID.
|
||||
// The redis code is next
|
||||
// streamGetEdgeID(s,1,1,&s->first_id);
|
||||
}
|
||||
|
||||
/* Consumer groups loading */
|
||||
uint64_t cgroups_count;
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count);
|
||||
|
@ -1913,6 +1939,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
|
|||
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
|
||||
|
||||
uint64_t cg_offset;
|
||||
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset);
|
||||
(void)cg_offset;
|
||||
} else {
|
||||
// TODO implement
|
||||
// cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry();
|
||||
}
|
||||
|
||||
// TODO add our implementation for the next Redis logic
|
||||
// streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset);
|
||||
// if (cgroup == NULL) {
|
||||
// rdbReportCorruptRDB("Duplicated consumer group name %s", cgname);
|
||||
// decrRefCount(o);
|
||||
// sdsfree(cgname);
|
||||
// return NULL;
|
||||
// }
|
||||
|
||||
/* Load the global PEL for this consumer group, however we'll
|
||||
* not yet populate the NACK structures with the message
|
||||
* owner, since consumers for this group and their messages will
|
||||
|
@ -1958,6 +2002,13 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
|
|||
|
||||
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.seen_time);
|
||||
|
||||
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) {
|
||||
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.active_time);
|
||||
} else {
|
||||
/* That's the best estimate we got */
|
||||
consumer.active_time = consumer.seen_time;
|
||||
}
|
||||
|
||||
/* Load the PEL about entries owned by this specific
|
||||
* consumer. */
|
||||
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
|
||||
|
|
|
@ -79,9 +79,15 @@ class RdbLoaderBase {
|
|||
struct StreamConsumerTrace {
|
||||
RdbVariant name;
|
||||
int64_t seen_time;
|
||||
int64_t active_time;
|
||||
std::vector<std::array<uint8_t, 16>> nack_arr;
|
||||
};
|
||||
|
||||
struct StreamID {
|
||||
uint64_t ms;
|
||||
uint64_t seq;
|
||||
};
|
||||
|
||||
struct StreamCGTrace {
|
||||
RdbVariant name;
|
||||
uint64_t ms;
|
||||
|
@ -95,6 +101,9 @@ class RdbLoaderBase {
|
|||
size_t lp_len;
|
||||
size_t stream_len;
|
||||
uint64_t ms, seq;
|
||||
StreamID first_id; /* The first non-tombstone entry, zero if empty. */
|
||||
StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */
|
||||
uint64_t entries_added; /* All time count of elements added. */
|
||||
std::vector<StreamCGTrace> cgroup;
|
||||
};
|
||||
|
||||
|
@ -165,7 +174,7 @@ class RdbLoaderBase {
|
|||
::io::Result<OpaqueObj> ReadZSet(int rdbtype);
|
||||
::io::Result<OpaqueObj> ReadZSetZL();
|
||||
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
|
||||
::io::Result<OpaqueObj> ReadStreams();
|
||||
::io::Result<OpaqueObj> ReadStreams(int rdbtype);
|
||||
::io::Result<OpaqueObj> ReadRedisJson();
|
||||
::io::Result<OpaqueObj> ReadJson();
|
||||
::io::Result<OpaqueObj> ReadSBF();
|
||||
|
|
|
@ -655,4 +655,28 @@ TEST_F(RdbTest, SnapshotTooBig) {
|
|||
ASSERT_THAT(resp, ErrArg("Out of memory"));
|
||||
}
|
||||
|
||||
TEST_F(RdbTest, LoadStream2) {
|
||||
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
|
||||
ASSERT_FALSE(ec) << ec.message();
|
||||
auto res = Run({"XINFO", "STREAM", "mystream"});
|
||||
EXPECT_THAT(
|
||||
res.GetVec(),
|
||||
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
|
||||
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
|
||||
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
|
||||
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
|
||||
}
|
||||
|
||||
TEST_F(RdbTest, LoadStream3) {
|
||||
auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb");
|
||||
ASSERT_FALSE(ec) << ec.message();
|
||||
auto res = Run({"XINFO", "STREAM", "mystream"});
|
||||
EXPECT_THAT(
|
||||
res.GetVec(),
|
||||
ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2),
|
||||
"last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0",
|
||||
"entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1),
|
||||
"first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY)));
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
BIN
src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb
vendored
Normal file
BIN
src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb
vendored
Normal file
Binary file not shown.
BIN
src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb
vendored
Normal file
BIN
src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb
vendored
Normal file
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue