chore(rdb): support rdb version 11 (#1579)

* Bump up RDB_VERSION to 11
* Update RDB_JSON value to 30
* Fix HT being serialized to the wrong type
* Serialize HT as LIST_PACK
* Add support for deserializing SET_LISTPACK
This commit is contained in:
Kostas Kyrimis 2023-07-29 18:15:13 +03:00 committed by GitHub
parent 6213986af2
commit 82965279a2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 172 additions and 37 deletions

View file

@ -37,12 +37,13 @@
#include "object.h"
#include "redis_aux.h"
/* TBD: include only necessary headers. */
/* The current RDB version. When the format changes in a way that is no longer
* backward compatible this number gets incremented. */
// TODO: should increment to 10 once we start storing RDB_TYPE_ZSET_LISTPACK.
#define RDB_VERSION 9
#define RDB_VERSION 11
/* We would like to serialize to version 9 such that our rdb files
* can be loaded by redis version 6 (RDB_VERSION 9) */
#define RDB_SER_VERSION 9
/* Defines related to the dump file format. To store 32 bits lengths for short
* keys requires a lot of space, so we check the most significant 2 bits of
@ -83,6 +84,7 @@
#define RDB_TYPE_HASH 4
#define RDB_TYPE_ZSET_2 5 /* ZSET version 2 with doubles stored in binary. */
#define RDB_TYPE_MODULE 6
#define RDB_TYPE_MODULE_PRE_GA 6 /* Used in 4.0 release candidates */
#define RDB_TYPE_MODULE_2 7 /* Module value with annotations for parsing without
the generating module being loaded. */
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
@ -98,15 +100,20 @@
#define RDB_TYPE_HASH_LISTPACK 16
#define RDB_TYPE_ZSET_LISTPACK 17
#define RDB_TYPE_LIST_QUICKLIST_2 18
#define RDB_TYPE_STREAM_LISTPACKS_2 19
#define RDB_TYPE_SET_LISTPACK 20
#define RDB_TYPE_STREAM_LISTPACKS_3 21
/* NOTE: WHEN ADDING NEW RDB TYPE, UPDATE rdbIsObjectType() BELOW */
/* Test if a type is an object type. */
#define __rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
#define __rdbIsObjectType(t) (((t) >= 0 && (t) <= 7) || ((t) >= 9 && (t) <= 21))
/* Range 200-240 is used by Dragonfly specific opcodes */
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_FUNCTION2 245 /* function library data */
#define RDB_OPCODE_FUNCTION_PRE_GA 246 /* old function library data for 7.0 rc1 and rc2 */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
@ -137,6 +144,7 @@
#define RDBFLAGS_REPLICATION (1<<1) /* Load/save for SYNC. */
#define RDBFLAGS_ALLOW_DUP (1<<2) /* Allow duplicated keys when loading.*/
#define RDBFLAGS_FEED_REPL (1<<3) /* Feed replication stream when loading.*/
#define RDBFLAGS_KEEP_CACHE (1<<4) /* Don't reclaim cache after rdb file is generated */
/* When rdbLoadObject() returns NULL, the err flag is
* set to hold the type of error that occurred */

View file

@ -49,7 +49,7 @@ cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(bitops_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
testdata/redis6_stream.rdb testdata/hll.rdb LABELS DFLY)
testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY)

View file

@ -48,8 +48,8 @@ int64_t CalculateExpirationTime(bool seconds, bool absolute, int64_t ts, int64_t
VersionBuffer MakeRdbVersion() {
VersionBuffer buf;
buf[0] = RDB_VERSION & 0xff;
buf[1] = (RDB_VERSION >> 8) & 0xff;
buf[0] = RDB_SER_VERSION & 0xff;
buf[1] = (RDB_SER_VERSION >> 8) & 0xff;
return buf;
}
@ -73,7 +73,7 @@ void AppendFooter(std::string* dump_res) {
dump_res->append(crc.data(), crc.size());
}
bool VerifyFooter(std::string_view msg) {
bool VerifyFooter(std::string_view msg, int* rdb_version) {
if (msg.size() <= DUMP_FOOTER_SIZE) {
LOG(WARNING) << "got restore payload that is too short - " << msg.size();
return false;
@ -81,6 +81,7 @@ bool VerifyFooter(std::string_view msg) {
const uint8_t* footer =
reinterpret_cast<const uint8_t*>(msg.data()) + (msg.size() - DUMP_FOOTER_SIZE);
uint16_t version = (*(footer + 1) << 8 | (*footer));
*rdb_version = version;
if (version > RDB_VERSION) {
LOG(WARNING) << "got restore payload with illegal version - supporting version up to "
<< RDB_VERSION << " got version " << version;
@ -126,6 +127,10 @@ class InMemSource : public ::io::Source {
class RdbRestoreValue : protected RdbLoaderBase {
public:
RdbRestoreValue(int rdb_version) {
rdb_version_ = rdb_version;
}
bool Add(std::string_view payload, std::string_view key, DbSlice& db_slice, DbIndex index,
uint64_t expire_ms);
@ -481,7 +486,7 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
}
OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
RestoreArgs restore_args) {
RestoreArgs restore_args, int rdb_version) {
if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) {
return OpStatus::OUT_OF_RANGE;
}
@ -508,7 +513,7 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
return true;
}
RdbRestoreValue loader{};
RdbRestoreValue loader(rdb_version);
return loader.Add(payload, key, db_slice, op_args.db_cntx.db_index,
restore_args.ExpirationTime());
@ -1028,8 +1033,8 @@ void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 0);
std::string_view serialized_value = ArgS(args, 2);
if (!VerifyFooter(serialized_value)) {
int rdb_version = 0;
if (!VerifyFooter(serialized_value, &rdb_version)) {
return (*cntx)->SendError("ERR DUMP payload version or checksum are wrong");
}
@ -1043,7 +1048,7 @@ void GenericFamily::Restore(CmdArgList args, ConnectionContext* cntx) {
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OnRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value());
return OnRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value(), rdb_version);
};
OpResult<bool> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));

View file

@ -428,15 +428,13 @@ TEST_F(GenericFamilyTest, Persist) {
}
TEST_F(GenericFamilyTest, Dump) {
// The following would only work for RDB version 9
// The format was changed at version 10
// The expected results were taken from running the same with Redis branch 6.2
ASSERT_THAT(RDB_VERSION, 9);
ASSERT_THAT(RDB_SER_VERSION, 9);
uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13,
0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e};
uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00,
0x02, 0x00, 0x00, 0xfe, 0x13, 0x03, 0xc0, 0xd2, 0x04, 0xff,
0x09, 0x00, 0xb1, 0x0b, 0xae, 0x6c, 0x23, 0x5d, 0x17, 0xaa};
uint8_t EXPECTED_LIST_DUMP[] = {0x0e, 0x01, 0x0e, 0x0e, 0x00, 0x00, 0x00, 0x0a, 0x00,
0x00, 0x00, 0x01, 0x00, 0x00, 0xfe, 0x14, 0xff, 0x09,
0x00, 0xba, 0x1e, 0xa9, 0x6b, 0xba, 0xfe, 0x2d, 0x3f};
@ -471,9 +469,9 @@ TEST_F(GenericFamilyTest, Restore) {
using std::chrono::seconds;
using std::chrono::system_clock;
// redis 6 with RDB_VERSION 9
uint8_t STRING_DUMP_REDIS[] = {0x00, 0xc1, 0xd2, 0x04, 0x09, 0x00, 0xd0,
0x75, 0x59, 0x6d, 0x10, 0x04, 0x3f, 0x5c};
auto resp = Run({"set", "exiting-key", "1234"});
EXPECT_EQ(resp, "OK");
// try to restore into existing key - this should failed
@ -538,6 +536,18 @@ TEST_F(GenericFamilyTest, Restore) {
resp = Run({"get", "string-key"});
EXPECT_EQ("1234", resp);
EXPECT_EQ(CheckedInt({"ttl", "string-key"}), -1);
// The following set was created in Redis 7 with rdb version 11 and it's listpack encoded.
// We should be able to read it and convert it to our own format DenseSet or HT
// sadd myset "acme"
// dump myset
uint8_t SET_LISTPACK_DUMP[] = {0x14, 0x0D, 0x0D, 0x00, 0x00, 0x00, 0x01, 0x00, 0x84,
0x61, 0x63, 0x6D, 0x65, 0x05, 0xff, 0x0b, 0x00, 0xc1,
0x37, 0x5c, 0xe5, 0xe2, 0xc0, 0xdd, 0x27};
resp = Run({"restore", "listpack-set", "0", ToSV(SET_LISTPACK_DUMP)});
resp = Run({"sismember", "listpack-set", "acme"});
EXPECT_EQ(true, resp.GetInt().has_value());
EXPECT_EQ(1, resp.GetInt());
}
TEST_F(GenericFamilyTest, Info) {

View file

@ -8,8 +8,9 @@ extern "C" {
#include "redis/rdb.h"
}
// Custom types: Range 20-25 is used by DF RDB types.
const uint8_t RDB_TYPE_JSON = 20;
// Custom types: Range 30-35 is used by DF RDB types.
const uint8_t RDB_TYPE_JSON_OLD = 20;
const uint8_t RDB_TYPE_JSON = 30;
constexpr bool rdbIsObjectTypeDF(uint8_t type) {
return __rdbIsObjectType(type) || (type == RDB_TYPE_JSON);

View file

@ -22,6 +22,7 @@ extern "C" {
#include <lz4frame.h>
#include <zstd.h>
#include <cstring>
#include <jsoncons/json.hpp>
#include "base/endian.h"
@ -644,7 +645,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) {
uint8_t* src = (uint8_t*)sv.data();
if (!lpValidateIntegrity(src, sv.size(), 0, NULL, NULL)) {
if (!lpValidateIntegrity(src, sv.size(), 0, nullptr, nullptr)) {
LOG(ERROR) << "Listpack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return false;
@ -893,14 +894,73 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
res->encoding = OBJ_ENCODING_INTSET;
}
pv_->ImportRObj(res);
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST) {
unsigned char* lp = lpNew(blob.size());
if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(), &lp)) {
LOG(ERROR) << "Zset ziplist integrity check failed.";
zfree(lp);
} else if (rdb_type_ == RDB_TYPE_SET_LISTPACK) {
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
LOG(ERROR) << "ListPack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
unsigned char* lp = (unsigned char*)blob.data();
auto iterate_and_apply_f = [lp](auto f) {
for (unsigned char* cur = lpFirst(lp); cur != nullptr; cur = lpNext(lp, cur)) {
unsigned int slen = 0;
long long lval = 0;
unsigned char* res = lpGetValue(cur, &slen, &lval);
f(res, slen, lval);
}
};
const bool use_set2 = GetFlag(FLAGS_use_set2);
robj* res = nullptr;
if (use_set2) {
StringSet* set = new StringSet{CompactObj::memory_resource()};
res = createObject(OBJ_SET, set);
res->encoding = OBJ_ENCODING_HT;
auto f = [this, res](unsigned char* val, unsigned int slen, long long lval) {
sds sdsele = (val) ? sdsnewlen(val, slen) : sdsfromlonglong(lval);
if (!((StringSet*)res->ptr)->AddSds(sdsele)) {
LOG(ERROR) << "Error adding to member set2";
ec_ = RdbError(errc::duplicate_key);
}
};
iterate_and_apply_f(f);
} else {
res = createSetObject();
auto f = [this, res](unsigned char* val, unsigned int slen, long long lval) {
sds sdsele = (val) ? sdsnewlen(val, slen) : sdsfromlonglong(lval);
if (!dictAdd((dict*)res->ptr, sdsele, nullptr)) {
LOG(ERROR) << "Error adding to member set";
ec_ = RdbError(errc::duplicate_key);
}
};
iterate_and_apply_f(f);
}
if (ec_) {
decrRefCount(res);
return;
}
pv_->ImportRObj(res);
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST || rdb_type_ == RDB_TYPE_HASH_LISTPACK) {
unsigned char* lp = lpNew(blob.size());
switch (rdb_type_) {
case RDB_TYPE_HASH_ZIPLIST:
if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(),
&lp)) {
LOG(ERROR) << "Zset ziplist integrity check failed.";
zfree(lp);
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
break;
case RDB_TYPE_HASH_LISTPACK:
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
LOG(ERROR) << "ListPack integrity check failed.";
zfree(lp);
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
std::memcpy(lp, blob.data(), blob.size());
break;
}
if (lpLength(lp) == 0) {
lpFree(lp);
@ -1240,7 +1300,8 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadIntSet();
break;
case RDB_TYPE_HASH_ZIPLIST:
iores = ReadHZiplist();
case RDB_TYPE_HASH_LISTPACK:
iores = ReadGeneric(rdbtype);
break;
case RDB_TYPE_HASH:
iores = ReadHMap();
@ -1260,7 +1321,20 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadStreams();
break;
case RDB_TYPE_JSON:
iores = ReadJson();
case RDB_TYPE_SET_LISTPACK:
// We need to deal with protocol versions 9 and older because in these
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
if (rdb_version_ < 10 && rdbtype == RDB_TYPE_JSON_OLD) {
iores = ReadJson();
break;
}
if (rdbtype == RDB_TYPE_JSON) {
iores = ReadJson();
break;
}
iores = ReadGeneric(rdbtype);
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
@ -1394,7 +1468,7 @@ auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(obj), RDB_TYPE_SET_INTSET};
}
auto RdbLoaderBase::ReadHZiplist() -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result<OpaqueObj> {
RdbVariant str_obj;
error_code ec = ReadStringObj(&str_obj);
if (ec)
@ -1404,7 +1478,7 @@ auto RdbLoaderBase::ReadHZiplist() -> io::Result<OpaqueObj> {
return Unexpected(errc::rdb_file_corrupted);
}
return OpaqueObj{std::move(str_obj), RDB_TYPE_HASH_ZIPLIST};
return OpaqueObj{std::move(str_obj), rdbtype};
}
auto RdbLoaderBase::ReadHMap() -> io::Result<OpaqueObj> {
@ -1760,9 +1834,9 @@ error_code RdbLoader::Load(io::Source* src) {
char buf[64] = {0};
::memcpy(buf, cb.data() + 5, 4);
int rdbver = atoi(buf);
if (rdbver < 5 || rdbver > RDB_VERSION) { // We accept starting from 5.
LOG(ERROR) << "RDB Version " << rdbver << " is not supported";
rdb_version_ = atoi(buf);
if (rdb_version_ < 5 || rdb_version_ > RDB_VERSION) { // We accept starting from 5.
LOG(ERROR) << "RDB Version " << rdb_version_ << " is not supported";
return RdbError(errc::bad_version);
}

View file

@ -15,6 +15,7 @@ extern "C" {
#include "core/json_object.h"
#include "core/mpsc_intrusive_queue.h"
#include "io/io.h"
#include "redis/rdb.h"
#include "server/common.h"
#include "server/journal/serializer.h"
@ -125,7 +126,7 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadSet();
::io::Result<OpaqueObj> ReadIntSet();
::io::Result<OpaqueObj> ReadHZiplist();
::io::Result<OpaqueObj> ReadGeneric(int rdbtype);
::io::Result<OpaqueObj> ReadHMap();
::io::Result<OpaqueObj> ReadZSet(int rdbtype);
::io::Result<OpaqueObj> ReadZSetZL();
@ -155,6 +156,7 @@ class RdbLoaderBase {
std::unique_ptr<DecompressImpl> decompress_impl_;
JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;
int rdb_version_ = RDB_VERSION;
};
class RdbLoader : protected RdbLoaderBase {

View file

@ -138,7 +138,7 @@ uint8_t RdbObjectType(unsigned type, unsigned compact_enc) {
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
return RDB_TYPE_JSON;
return RDB_TYPE_JSON_OLD;
}
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
return 0; /* avoid warning */
@ -348,6 +348,7 @@ error_code RdbSerializer::SaveListObject(const robj* obj) {
while (node) {
DVLOG(3) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container
<< "/" << node->sz;
if (QL_NODE_IS_PLAIN(node)) {
if (quicklistNodeIsCompressed(node)) {
void* data;
@ -1150,7 +1151,9 @@ void RdbSaver::StopSnapshotInShard(EngineShard* shard) {
error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
char magic[16];
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_VERSION);
// We should use RDB_VERSION here from rdb.h when we ditch redis 6 support
// For now we serialize to an older version.
size_t sz = absl::SNPrintF(magic, sizeof(magic), "REDIS%04d", RDB_SER_VERSION);
CHECK_EQ(9u, sz);
RETURN_ON_ERR(impl_->serializer()->WriteRaw(Bytes{reinterpret_cast<uint8_t*>(magic), sz}));

View file

@ -408,4 +408,36 @@ TEST_P(HllRdbTest, Hll) {
INSTANTIATE_TEST_SUITE_P(HllRdbTest, HllRdbTest, Values("key-sparse", "key-dense"));
TEST_F(RdbTest, LoadSmall7) {
// Contains 3 keys
// 1. A list called my-list encoded as RDB_TYPE_LIST_QUICKLIST_2
// 2. A hashtable called my-hset encoded as RDB_TYPE_HASH_LISTPACK
// 3. A set called my-set encoded as RDB_TYPE_SET_LISTPACK
io::FileSource fs = GetSource("redis7_small.rdb");
RdbLoader loader{service_.get()};
// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
ASSERT_FALSE(ec) << ec.message();
auto resp = Run({"scan", "0"});
ASSERT_THAT(resp, ArrLen(2));
EXPECT_THAT(StrArray(resp.GetVec()[1]), UnorderedElementsAre("my-set", "my-hset", "my-list"));
resp = Run({"smembers", "my-set"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), UnorderedElementsAre("redis", "acme"));
resp = Run({"hgetall", "my-hset"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), UnorderedElementsAre("acme", "44", "field", "22"));
resp = Run({"lrange", "my-list", "0", "-1"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), UnorderedElementsAre("list1", "list2"));
}
} // namespace dfly

BIN
src/server/testdata/redis7_small.rdb vendored Normal file

Binary file not shown.