diff --git a/src/facade/facade_test.h b/src/facade/facade_test.h index 94cada6dc..76b6e090e 100644 --- a/src/facade/facade_test.h +++ b/src/facade/facade_test.h @@ -91,6 +91,10 @@ inline bool operator==(const RespExpr& left, std::string_view s) { return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s; } +inline bool operator==(const RespExpr& left, int64_t val) { + return left.type == RespExpr::INT64 && left.GetInt() == val; +} + inline bool operator!=(const RespExpr& left, std::string_view s) { return !(left == s); } diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index 4152d3c9e..f8d48d4ef 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -9,7 +9,6 @@ extern "C" { } // Custom types: Range 30-35 is used by DF RDB types. -constexpr uint8_t RDB_TYPE_JSON_OLD = 20; constexpr uint8_t RDB_TYPE_JSON = 30; constexpr uint8_t RDB_TYPE_HASH_WITH_EXPIRY = 31; constexpr uint8_t RDB_TYPE_SET_WITH_EXPIRY = 32; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index a39dd6e67..8eefe11bd 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -477,6 +477,8 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr& ptr CreateZSet(ptr.get()); break; case RDB_TYPE_STREAM_LISTPACKS: + case RDB_TYPE_STREAM_LISTPACKS_2: + case RDB_TYPE_STREAM_LISTPACKS_3: CreateStream(ptr.get()); break; default: @@ -955,8 +957,16 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { } s->length = ltrace->stream_trace->stream_len; - s->last_id.ms = ltrace->stream_trace->ms; - s->last_id.seq = ltrace->stream_trace->seq; + CopyStreamId(ltrace->stream_trace->last_id, &s->last_id); + CopyStreamId(ltrace->stream_trace->first_id, &s->first_id); + CopyStreamId(ltrace->stream_trace->max_deleted_entry_id, &s->max_deleted_entry_id); + s->entries_added = ltrace->stream_trace->entries_added; + + if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) { + /* Since the rax is already loaded, we can find the first entry's + * ID. */ + streamGetEdgeID(s, 1, 1, &s->first_id); + } for (const auto& cg : ltrace->stream_trace->cgroup) { string_view cgname = ToSV(cg.name); @@ -964,7 +974,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { cg_id.ms = cg.ms; cg_id.seq = cg.seq; - streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, 0); + uint64_t entries_read = cg.entries_read; + if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) { + entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg_id); + } + + streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, entries_read); if (cgroup == NULL) { LOG(ERROR) << "Duplicated consumer group name " << cgname; ec_ = RdbError(errc::duplicate_key); @@ -1512,7 +1527,7 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { // RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30 // because it overlapped with the new type RDB_TYPE_SET_LISTPACK if (rdb_version_ < 10) { - // consider it RDB_TYPE_JSON_OLD + // consider it RDB_TYPE_JSON_OLD (20) iores = ReadJson(); } else { iores = ReadGeneric(rdbtype); @@ -1876,21 +1891,20 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result { // so if there are still unread elements return the partial stream. if (listpacks > n) { pending_read_.remaining = listpacks - n; - return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS}; - } else if (pending_read_.remaining > 0) { - pending_read_.remaining = 0; + return OpaqueObj{std::move(load_trace), rdbtype}; } - // Load stream metadata. + pending_read_.remaining = 0; + // Load stream metadata. load_trace->stream_trace.reset(new StreamTrace); /* Load total number of items inside the stream. */ SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len); /* Load the last entry ID. */ - SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->ms); - SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->seq); + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.ms); + SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.seq); if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { /* Load the first entry ID. */ @@ -1907,13 +1921,7 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result { /* During migration the offset can be initialized to the stream's * length. At this point, we also don't care about tombstones * because CG offsets will be later initialized as well. */ - load_trace->stream_trace->max_deleted_entry_id.ms = 0; - load_trace->stream_trace->max_deleted_entry_id.seq = 0; load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len; - - // TODO add implementation, we need to find the first entry's ID. - // The redis code is next - // streamGetEdgeID(s,1,1,&s->first_id); } /* Consumer groups loading */ @@ -1937,24 +1945,11 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result { SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms); SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq); - uint64_t cg_offset; + cgroup.entries_read = 0; if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) { - SET_OR_UNEXPECT(LoadLen(nullptr), cg_offset); - (void)cg_offset; - } else { - // TODO implement - // cg_offset = should be calculated like streamEstimateDistanceFromFirstEverEntry(); + SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.entries_read); } - // TODO add our implementation for the next Redis logic - // streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, cg_offset); - // if (cgroup == NULL) { - // rdbReportCorruptRDB("Duplicated consumer group name %s", cgname); - // decrRefCount(o); - // sdsfree(cgname); - // return NULL; - // } - /* Load the global PEL for this consumer group, however we'll * not yet populate the NACK structures with the message * owner, since consumers for this group and their messages will @@ -2724,6 +2719,11 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co return visitor.ec(); } +void RdbLoaderBase::CopyStreamId(const StreamID& src, struct streamID* dest) { + dest->ms = src.ms; + dest->seq = src.seq; +} + void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { EngineShard* es = EngineShard::tlocal(); DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index cebdabf1d..0ece10089 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -16,6 +16,8 @@ extern "C" { #include "server/common.h" #include "server/journal/serializer.h" +struct streamID; + namespace dfly { class EngineShardSet; @@ -84,15 +86,15 @@ class RdbLoaderBase { }; struct StreamID { - uint64_t ms; - uint64_t seq; + uint64_t ms = 0; + uint64_t seq = 0; }; struct StreamCGTrace { RdbVariant name; uint64_t ms; uint64_t seq; - + uint64_t entries_read; std::vector pel_arr; std::vector cons_arr; }; @@ -100,10 +102,10 @@ class RdbLoaderBase { struct StreamTrace { size_t lp_len; size_t stream_len; - uint64_t ms, seq; + StreamID last_id; StreamID first_id; /* The first non-tombstone entry, zero if empty. */ StreamID max_deleted_entry_id; /* The maximal ID that was deleted. */ - uint64_t entries_added; /* All time count of elements added. */ + uint64_t entries_added = 0; /* All time count of elements added. */ std::vector cgroup; }; @@ -192,6 +194,8 @@ class RdbLoaderBase { std::error_code EnsureReadInternal(size_t min_to_read); + static void CopyStreamId(const StreamID& src, struct streamID* dest); + base::IoBuf* mem_buf_ = nullptr; base::IoBuf origin_mem_buf_; ::io::Source* src_ = nullptr; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index c9b7dbbf1..3b23be01c 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -51,10 +51,16 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); + +// TODO: to retire both flags in v1.27 (Jan 2025) ABSL_FLAG(bool, list_rdb_encode_v2, true, "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " "enconding of list uses ziplist encoding compatible with redis 6"); +ABSL_FLAG(bool, stream_rdb_encode_v2, false, + "V2 uses format, compatible with redis 7.2 and Dragonfly v1.26+, while v1 format " + "is compatible with redis 6"); + namespace dfly { using namespace std; @@ -209,12 +215,12 @@ uint8_t RdbObjectType(const PrimeValue& pv) { } break; case OBJ_STREAM: - return RDB_TYPE_STREAM_LISTPACKS; + return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS + : RDB_TYPE_STREAM_LISTPACKS_2; case OBJ_MODULE: return RDB_TYPE_MODULE_2; case OBJ_JSON: - return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July - // 2024. + return RDB_TYPE_JSON; case OBJ_SBF: return RDB_TYPE_SBF; } @@ -657,6 +663,22 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveLen(s->last_id.ms)); RETURN_ON_ERR(SaveLen(s->last_id.seq)); + uint8_t rdb_type = RdbObjectType(pv); + + // 'first_id', 'max_deleted_entry_id' and 'entries_added' are added + // in RDB_TYPE_STREAM_LISTPACKS_2 + if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) { + /* Save the first entry ID. */ + RETURN_ON_ERR(SaveLen(s->first_id.ms)); + RETURN_ON_ERR(SaveLen(s->first_id.seq)); + + /* Save the maximal tombstone ID. */ + RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.ms)); + RETURN_ON_ERR(SaveLen(s->max_deleted_entry_id.seq)); + + /* Save the offset. */ + RETURN_ON_ERR(SaveLen(s->entries_added)); + } /* The consumer groups and their clients are part of the stream * type, so serialize every consumer group. */ @@ -678,9 +700,14 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len)); /* Last ID. */ - RETURN_ON_ERR(SaveLen(s->last_id.ms)); + RETURN_ON_ERR(SaveLen(cg->last_id.ms)); - RETURN_ON_ERR(SaveLen(s->last_id.seq)); + RETURN_ON_ERR(SaveLen(cg->last_id.seq)); + + if (rdb_type >= RDB_TYPE_STREAM_LISTPACKS_2) { + /* Save the group's logical reads counter. */ + RETURN_ON_ERR(SaveLen(cg->entries_read)); + } /* Save the global PEL. */ RETURN_ON_ERR(SaveStreamPEL(cg->pel, true)); @@ -836,10 +863,15 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { /* Consumer name. */ RETURN_ON_ERR(SaveString(ri.key, ri.key_len)); - /* Last seen time. */ + /* seen time. */ absl::little_endian::Store64(buf, consumer->seen_time); RETURN_ON_ERR(WriteRaw(buf)); + // TODO: enable this when we switch to RDB_TYPE_STREAM_LISTPACKS_3 + /* Active time. */ + // absl::little_endian::Store64(buf, consumer->active_time); + // RETURN_ON_ERR(WriteRaw(buf)); + /* Consumer PEL, without the ACKs (see last parameter of the function * passed with value of 0), at loading time we'll lookup the ID * in the consumer group global PEL and will put a reference in the diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 8e3ff241b..a74e94189 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -36,6 +36,8 @@ ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode); namespace dfly { +static const auto kMatchNil = ArgType(RespExpr::NIL); + class RdbTest : public BaseFamilyTest { protected: void SetUp(); @@ -136,13 +138,20 @@ TEST_F(RdbTest, Stream) { auto resp = Run({"type", "key:10"}); EXPECT_EQ(resp, "stream"); + resp = Run({"xinfo", "groups", "key:0"}); EXPECT_THAT(resp, ArrLen(2)); + EXPECT_THAT(resp.GetVec()[0], + RespElementsAre("name", "g1", "consumers", 0, "pending", 0, "last-delivered-id", + "1655444851524-3", "entries-read", 128, "lag", 0)); + EXPECT_THAT(resp.GetVec()[1], + RespElementsAre("name", "g2", "consumers", 1, "pending", 0, "last-delivered-id", + "1655444851523-1", "entries-read", kMatchNil, "lag", kMatchNil)); resp = Run({"xinfo", "groups", "key:1"}); // test dereferences array of size 1 - EXPECT_THAT(resp, RespArray(ElementsAre("name", "g2", "consumers", IntArg(0), "pending", - IntArg(0), "last-delivered-id", "1655444851523-1", - "entries-read", IntArg(0), "lag", IntArg(0)))); + EXPECT_THAT(resp, RespElementsAre("name", "g2", "consumers", IntArg(0), "pending", IntArg(0), + "last-delivered-id", "1655444851523-1", "entries-read", + kMatchNil, "lag", kMatchNil)); resp = Run({"xinfo", "groups", "key:2"}); EXPECT_THAT(resp, ArrLen(0)); @@ -629,14 +638,30 @@ TEST_F(RdbTest, LoadHugeList) { // Tests loading a huge stream, where the stream is loaded in multiple partial // reads. TEST_F(RdbTest, LoadHugeStream) { + TEST_current_time_ms = 1000; + // Add a huge stream (test:0) with 2000 entries, and 4 1k elements per entry // (note must be more than 512*4kb elements to test partial reads). - for (int i = 0; i != 2000; i++) { + // We add 2000 entries to the stream to ensure that the stream, because populate strream + // adds only a single entry at a time, with multiple elements in it. + for (unsigned i = 0; i < 2000; i++) { Run({"debug", "populate", "1", "test", "2000", "rand", "type", "stream", "elements", "4"}); } ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"})); + Run({"XGROUP", "CREATE", "test:0", "grp1", "0"}); + Run({"XGROUP", "CREATE", "test:0", "grp2", "0"}); + Run({"XREADGROUP", "GROUP", "grp1", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"}); + Run({"XREADGROUP", "GROUP", "grp2", "Alice", "COUNT", "1", "STREAMS", "test:0", ">"}); - RespExpr resp = Run({"save", "df"}); + auto resp = Run({"xinfo", "stream", "test:0"}); + + EXPECT_THAT( + resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010, + "last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0", + "entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2, + "first-entry", ArrLen(2), "last-entry", ArrLen(2))); + + resp = Run({"save", "df"}); ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); @@ -644,18 +669,29 @@ TEST_F(RdbTest, LoadHugeStream) { ASSERT_EQ(resp, "OK"); ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"})); + resp = Run({"xinfo", "stream", "test:0"}); + EXPECT_THAT( + resp, RespElementsAre("length", 2000, "radix-tree-keys", 2000, "radix-tree-nodes", 2010, + "last-generated-id", "1000-1999", "max-deleted-entry-id", "0-0", + "entries-added", 2000, "recorded-first-entry-id", "1000-0", "groups", 2, + "first-entry", ArrLen(2), "last-entry", ArrLen(2))); + resp = Run({"xinfo", "groups", "test:0"}); + EXPECT_THAT(resp, RespElementsAre(RespElementsAre("name", "grp1", "consumers", 1, "pending", 1, + "last-delivered-id", "1000-0", "entries-read", + 1, "lag", 1999), + _)); } TEST_F(RdbTest, LoadStream2) { auto ec = LoadRdb("RDB_TYPE_STREAM_LISTPACKS_2.rdb"); ASSERT_FALSE(ec) << ec.message(); auto res = Run({"XINFO", "STREAM", "mystream"}); - ASSERT_THAT( - res.GetVec(), - ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), - "last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0", - "entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), - "first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); + ASSERT_THAT(res.GetVec(), + ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2, + "last-generated-id", "1732613360686-0", "max-deleted-entry-id", "0-0", + "entries-added", 2, "recorded-first-entry-id", "1732613352350-0", + "groups", 1, "first-entry", RespElementsAre("1732613352350-0", _), + "last-entry", RespElementsAre("1732613360686-0", _))); } TEST_F(RdbTest, LoadStream3) { @@ -664,10 +700,10 @@ TEST_F(RdbTest, LoadStream3) { auto res = Run({"XINFO", "STREAM", "mystream"}); ASSERT_THAT( res.GetVec(), - ElementsAre("length", IntArg(2), "radix-tree-keys", IntArg(1), "radix-tree-nodes", IntArg(2), - "last-generated-id", "1732614679549-0", "max-deleted-entry-id", "0-0", - "entries-added", IntArg(0), "recorded-first-entry-id", "0-0", "groups", IntArg(1), - "first-entry", ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); + ElementsAre("length", 2, "radix-tree-keys", 1, "radix-tree-nodes", 2, "last-generated-id", + "1732614679549-0", "max-deleted-entry-id", "0-0", "entries-added", 2, + "recorded-first-entry-id", "1732614676541-0", "groups", 1, "first-entry", + ArgType(RespExpr::ARRAY), "last-entry", ArgType(RespExpr::ARRAY))); } TEST_F(RdbTest, SnapshotTooBig) {