From 3327e1a9083677d3f3289533c37022b0434c20ce Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 26 Nov 2024 16:43:30 +0200 Subject: [PATCH] feat: add ability reading stream_listpacks_2/3 rdb types (#4192) * feat: add ability reading stream_listpacks_2/3 rdb types * refactor: address comments --- src/server/CMakeLists.txt | 3 +- src/server/rdb_load.cc | 55 +++++++++++++++++- src/server/rdb_load.h | 11 +++- src/server/rdb_test.cc | 24 ++++++++ .../testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb | Bin 0 -> 274 bytes .../testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb | Bin 0 -> 280 bytes 6 files changed, 89 insertions(+), 4 deletions(-) create mode 100644 src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb create mode 100644 src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 96c1a5437..b7d862c21 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -109,7 +109,8 @@ cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(bitops_family_test dfly_test_lib LABELS DFLY) cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb - testdata/redis_json.rdb LABELS DFLY) + testdata/redis_json.rdb testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb + testdata/RDB_TYPE_STREAM_LISTPACKS_3.rdb LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index fe42407eb..81ca45bd9 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1502,7 +1502,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { iores = ReadListQuicklist(rdbtype); break; case RDB_TYPE_STREAM_LISTPACKS: - iores = ReadStreams(); + case RDB_TYPE_STREAM_LISTPACKS_2: + case RDB_TYPE_STREAM_LISTPACKS_3: + iores = ReadStreams(rdbtype); break; case RDB_TYPE_JSON: iores = ReadJson(); @@ -1828,7 +1830,7 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result { return OpaqueObj{std::move(load_trace), rdbtype}; } -auto RdbLoaderBase::ReadStreams() -> io::Result { +auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result { size_t listpacks; if (pending_read_.remaining > 0) { listpacks = pending_read_.remaining; @@ -1892,6 +1894,30 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms); SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq); + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { + /* Load the first entry ID. */ + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.ms); + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.seq); + + /* Load the maximal deleted entry ID. */ + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.ms); + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.seq); + + /* Load the offset. */ + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->entries_added); + } else { + /* During migration the offset can be initialized to the stream's + * length. At this point, we also don't care about tombstones + * because CG offsets will be later initialized as well. */ + load_trace->stream_trace->max_deleted_entry_id.ms = 0; + load_trace->stream_trace->max_deleted_entry_id.seq = 0; + load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len; + + // TODO add implementation, we need to find the first entry's ID. + // The redis code is next + // streamGetEdgeID(s,1,1,&s->first_id); + } + /* Consumer groups loading */ uint64_t cgroups_count; SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count); @@ -1913,6 +1939,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq); + uint64_t cg_offset; + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { + SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset); + (void)cg_offset; + } else { + // TODO implement + // cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry(); + } + + // TODO add our implementation for the next Redis logic + // streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset); + // if (cgroup == NULL) { + // rdbReportCorruptRDB("Duplicated consumer group name %s", cgname); + // decrRefCount(o); + // sdsfree(cgname); + // return NULL; + // } + /* Load the global PEL for this consumer group, however we'll * not yet populate the NACK structures with the message * owner, since consumers for this group and their messages will @@ -1958,6 +2002,13 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { SET_OR_UNEXPECT(FetchInt(), consumer.seen_time); + if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) { + SET_OR_UNEXPECT(FetchInt(), consumer.active_time); + } else { + /* That's the best estimate we got */ + consumer.active_time = consumer.seen_time; + } + /* Load the PEL about entries owned by this specific * consumer. */ SET_OR_UNEXPECT(LoadLen(nullptr), pel_size); diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 78733912b..cebdabf1d 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -79,9 +79,15 @@ class RdbLoaderBase { struct StreamConsumerTrace { RdbVariant name; int64_t seen_time; + int64_t active_time; std::vector> nack_arr; }; + struct StreamID { + uint64_t ms; + uint64_t seq; + }; + struct StreamCGTrace { RdbVariant name; uint64_t ms; @@ -95,6 +101,9 @@ class RdbLoaderBase { size_t lp_len; size_t stream_len; uint64_t ms, seq; + StreamID first_id; /* The first non-tombstone entry, zero if empty. */ + StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */ + uint64_t entries_added; /* All time count of elements added. */ std::vector cgroup; }; @@ -165,7 +174,7 @@ class RdbLoaderBase { ::io::Result ReadZSet(int rdbtype); ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); - ::io::Result ReadStreams(); + ::io::Result ReadStreams(int rdbtype); ::io::Result ReadRedisJson(); ::io::Result ReadJson(); ::io::Result ReadSBF(); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 890946fd5..a74ee5046 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -655,4 +655,28 @@ TEST_F(RdbTest, SnapshotTooBig) { ASSERT_THAT(resp, ErrArg("Out of memory")); } +TEST_F(RdbTest, LoadStream2) { + auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb"); + ASSERT_FALSE(ec) << ec.message(); + auto res = Run({"XINFO", "STREAM", "mystream"}); + EXPECT_THAT( + res.GetVec(), + ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), + "last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0", + "entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), + "first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); +} + +TEST_F(RdbTest, LoadStream3) { + auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb"); + ASSERT_FALSE(ec) << ec.message(); + auto res = Run({"XINFO", "STREAM", "mystream"}); + EXPECT_THAT( + res.GetVec(), + ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), + "last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0", + "entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), + "first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); +} + } // namespace dfly diff --git a/src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb b/src/server/testdata/RDB_TYPE_STREAM_LISTPACKS_2.rdb new file mode 100644 index 0000000000000000000000000000000000000000..3fee85280d62615e134dac76f37ce372b64373ab GIT binary patch literal 274 zcmWG?b@2=~FfcUu#aWb^l3A=s=c%sg zzc@;ZQ&V(vQ*#d;Nattx#gUkwrkj*loO*!aAH#1(24Rle%Hoou)Wlpy0R{%f$?0cT z&trfBLqi~gpMi;yfsv81IW0e*g@F;oVPI@dN-Sbw0dhW0P+(?YYyonOSXmhVGc`iY zIHSh^Wr0m&Vr0*)OfSkWEkKAeGNM~oaq(RGWRNH$t7A@Paw?pUuJHfq+>M2ss(B^> E0OuO-d|IJ;3mf;Ws0LC`WE(aY<2XVlJZq0|Vpa^oI{@ z8KA(x0Lb8HU}9uoWMphk%g<+FU<7d(7@Lz4i&$8IoYM!G7#LfCJR?>X#{W!>5F=jy zWq`84hA}a+=T@c{<(C#9L>U>;ZF5n0ls*|G%E;=NlbM_fxtC!9>rpyNb Di*`^) literal 0 HcmV?d00001