mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
feat(json): Deserialize ReJSON format (#2725)
* feat(json): Deserialize ReJSON format This PR adds support for Redis-based JSON RDB format deserialization. Since Redis uses ReJSON as a module, serialization is slightly different from other types, but overall it's not a big change once we know where all bits should be. While this change knows how to _read_ Redis-based JSON keys, it does not _save_ them in Redis format. That will be in a different PR. This PR also ignores unknown (non-keys) module data instead of failing the load. Fixes #2718 * Cleanup * Add tests * Skip unsupported modules * Small refactor
This commit is contained in:
parent
9c6e6a96b7
commit
e2b5d48837
5 changed files with 117 additions and 3 deletions
|
@ -1364,6 +1364,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
|
|||
iores = ReadGeneric(rdbtype);
|
||||
}
|
||||
break;
|
||||
case RDB_TYPE_MODULE_2:
|
||||
iores = ReadRedisJson();
|
||||
break;
|
||||
default:
|
||||
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
|
||||
|
||||
|
@ -1777,6 +1780,44 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
|
|||
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
|
||||
}
|
||||
|
||||
auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
|
||||
auto json_magic_number = LoadLen(nullptr);
|
||||
if (!json_magic_number) {
|
||||
return Unexpected(errc::rdb_file_corrupted);
|
||||
}
|
||||
|
||||
constexpr string_view kJsonModule = "ReJSON-RL"sv;
|
||||
string module_name = ModuleTypeName(*json_magic_number);
|
||||
if (module_name != kJsonModule) {
|
||||
LOG(ERROR) << "Unsupported module: " << module_name;
|
||||
return Unexpected(errc::unsupported_operation);
|
||||
}
|
||||
|
||||
int encver = *json_magic_number & 1023;
|
||||
if (encver != 3) {
|
||||
LOG(ERROR) << "Unsupported ReJSON version: " << encver;
|
||||
return Unexpected(errc::unsupported_operation);
|
||||
}
|
||||
|
||||
auto opcode = FetchInt<uint8_t>();
|
||||
if (!opcode || *opcode != RDB_MODULE_OPCODE_STRING) {
|
||||
return Unexpected(errc::rdb_file_corrupted);
|
||||
}
|
||||
|
||||
RdbVariant dest;
|
||||
error_code ec = ReadStringObj(&dest);
|
||||
if (ec) {
|
||||
return make_unexpected(ec);
|
||||
}
|
||||
|
||||
opcode = FetchInt<uint8_t>();
|
||||
if (!opcode || *opcode != RDB_MODULE_OPCODE_EOF) {
|
||||
return Unexpected(errc::rdb_file_corrupted);
|
||||
}
|
||||
|
||||
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
|
||||
}
|
||||
|
||||
auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
|
||||
RdbVariant dest;
|
||||
error_code ec = ReadStringObj(&dest);
|
||||
|
@ -2001,8 +2042,9 @@ error_code RdbLoader::Load(io::Source* src) {
|
|||
SET_OR_RETURN(LoadLen(nullptr), module_id);
|
||||
string module_name = ModuleTypeName(module_id);
|
||||
|
||||
LOG(ERROR) << "Modules are not supported, error loading module " << module_name;
|
||||
return RdbError(errc::feature_not_supported);
|
||||
LOG(WARNING) << "WARNING: Skipping data for module " << module_name;
|
||||
RETURN_ON_ERR(SkipModuleData());
|
||||
continue;
|
||||
}
|
||||
|
||||
if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START ||
|
||||
|
@ -2142,6 +2184,50 @@ void RdbLoaderBase::AllocateDecompressOnce(int op_type) {
|
|||
}
|
||||
}
|
||||
|
||||
error_code RdbLoaderBase::SkipModuleData() {
|
||||
uint64_t opcode;
|
||||
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when_opcode'
|
||||
if (opcode != RDB_MODULE_OPCODE_UINT)
|
||||
return RdbError(errc::rdb_file_corrupted);
|
||||
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when'
|
||||
|
||||
while (true) {
|
||||
SET_OR_RETURN(LoadLen(nullptr), opcode);
|
||||
|
||||
switch (opcode) {
|
||||
case RDB_MODULE_OPCODE_EOF:
|
||||
return kOk; // Module data end
|
||||
|
||||
case RDB_MODULE_OPCODE_SINT:
|
||||
case RDB_MODULE_OPCODE_UINT: {
|
||||
[[maybe_unused]] uint64_t _;
|
||||
SET_OR_RETURN(LoadLen(nullptr), _);
|
||||
break;
|
||||
}
|
||||
|
||||
case RDB_MODULE_OPCODE_STRING: {
|
||||
RdbVariant dest;
|
||||
error_code ec = ReadStringObj(&dest);
|
||||
if (ec) {
|
||||
return ec;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case RDB_MODULE_OPCODE_DOUBLE: {
|
||||
[[maybe_unused]] double _;
|
||||
SET_OR_RETURN(FetchBinaryDouble(), _);
|
||||
break;
|
||||
}
|
||||
|
||||
default:
|
||||
// TODO: handle RDB_MODULE_OPCODE_FLOAT
|
||||
LOG(ERROR) << "Unsupported module section: " << opcode;
|
||||
return RdbError(errc::rdb_file_corrupted);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
error_code RdbLoaderBase::HandleCompressedBlob(int op_type) {
|
||||
AllocateDecompressOnce(op_type);
|
||||
// Fetch uncompress blob
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue