diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 78c19e4d5..0e0422d53 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -88,7 +88,8 @@ cxx_test(stream_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(bitops_family_test dfly_test_lib LABELS DFLY) cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb - testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb LABELS DFLY) + testdata/redis6_stream.rdb testdata/hll.rdb testdata/redis7_small.rdb + testdata/redis_json.rdb LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 9c3fd1136..de2046b0c 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1364,6 +1364,9 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) { iores = ReadGeneric(rdbtype); } break; + case RDB_TYPE_MODULE_2: + iores = ReadRedisJson(); + break; default: LOG(ERROR) << "Unsupported rdb type " << rdbtype; @@ -1777,6 +1780,44 @@ auto RdbLoaderBase::ReadStreams() -> io::Result { return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS}; } +auto RdbLoaderBase::ReadRedisJson() -> io::Result { + auto json_magic_number = LoadLen(nullptr); + if (!json_magic_number) { + return Unexpected(errc::rdb_file_corrupted); + } + + constexpr string_view kJsonModule = "ReJSON-RL"sv; + string module_name = ModuleTypeName(*json_magic_number); + if (module_name != kJsonModule) { + LOG(ERROR) << "Unsupported module: " << module_name; + return Unexpected(errc::unsupported_operation); + } + + int encver = *json_magic_number & 1023; + if (encver != 3) { + LOG(ERROR) << "Unsupported ReJSON version: " << encver; + return Unexpected(errc::unsupported_operation); + } + + auto opcode = FetchInt(); + if (!opcode || *opcode != RDB_MODULE_OPCODE_STRING) { + return Unexpected(errc::rdb_file_corrupted); + } + + RdbVariant dest; + error_code ec = ReadStringObj(&dest); + if (ec) { + return make_unexpected(ec); + } + + opcode = FetchInt(); + if (!opcode || *opcode != RDB_MODULE_OPCODE_EOF) { + return Unexpected(errc::rdb_file_corrupted); + } + + return OpaqueObj{std::move(dest), RDB_TYPE_JSON}; +} + auto RdbLoaderBase::ReadJson() -> io::Result { RdbVariant dest; error_code ec = ReadStringObj(&dest); @@ -2001,8 +2042,9 @@ error_code RdbLoader::Load(io::Source* src) { SET_OR_RETURN(LoadLen(nullptr), module_id); string module_name = ModuleTypeName(module_id); - LOG(ERROR) << "Modules are not supported, error loading module " << module_name; - return RdbError(errc::feature_not_supported); + LOG(WARNING) << "WARNING: Skipping data for module " << module_name; + RETURN_ON_ERR(SkipModuleData()); + continue; } if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START || @@ -2142,6 +2184,50 @@ void RdbLoaderBase::AllocateDecompressOnce(int op_type) { } } +error_code RdbLoaderBase::SkipModuleData() { + uint64_t opcode; + SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when_opcode' + if (opcode != RDB_MODULE_OPCODE_UINT) + return RdbError(errc::rdb_file_corrupted); + SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when' + + while (true) { + SET_OR_RETURN(LoadLen(nullptr), opcode); + + switch (opcode) { + case RDB_MODULE_OPCODE_EOF: + return kOk; // Module data end + + case RDB_MODULE_OPCODE_SINT: + case RDB_MODULE_OPCODE_UINT: { + [[maybe_unused]] uint64_t _; + SET_OR_RETURN(LoadLen(nullptr), _); + break; + } + + case RDB_MODULE_OPCODE_STRING: { + RdbVariant dest; + error_code ec = ReadStringObj(&dest); + if (ec) { + return ec; + } + break; + } + + case RDB_MODULE_OPCODE_DOUBLE: { + [[maybe_unused]] double _; + SET_OR_RETURN(FetchBinaryDouble(), _); + break; + } + + default: + // TODO: handle RDB_MODULE_OPCODE_FLOAT + LOG(ERROR) << "Unsupported module section: " << opcode; + return RdbError(errc::rdb_file_corrupted); + } + } +} + error_code RdbLoaderBase::HandleCompressedBlob(int op_type) { AllocateDecompressOnce(op_type); // Fetch uncompress blob diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index b73df6347..871c37533 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -129,8 +129,10 @@ class RdbLoaderBase { ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(); + ::io::Result ReadRedisJson(); ::io::Result ReadJson(); + std::error_code SkipModuleData(); std::error_code HandleCompressedBlob(int op_type); std::error_code HandleCompressedBlobFinish(); void AllocateDecompressOnce(int op_type); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 0f59deefa..0dbb75d7b 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -509,4 +509,29 @@ TEST_F(RdbTest, LoadSmall7) { ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); EXPECT_THAT(resp.GetVec(), ElementsAre("einstein", "schrodinger")); } + +TEST_F(RdbTest, RedisJson) { + // RDB file generated via: + // ./redis-server --save "" --appendonly no --loadmodule ../lib/rejson.so + // and then: + // JSON.SET json-str $ '"hello"' + // JSON.SET json-arr $ "[1, true, \"hello\", 3.14]" + // JSON.SET json-obj $ + // '{"company":"DragonflyDB","product":"Dragonfly","website":"https://dragondlydb.io","years-active":[2021,2022,2023,2024,"and + // more!"]}' + io::FileSource fs = GetSource("redis_json.rdb"); + RdbLoader loader{service_.get()}; + + // must run in proactor thread in order to avoid polluting the serverstate + // in the main, testing thread. + auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + + ASSERT_FALSE(ec) << ec.message(); + + EXPECT_EQ(Run({"JSON.GET", "json-str"}), "\"hello\""); + EXPECT_EQ(Run({"JSON.GET", "json-arr"}), "[1,true,\"hello\",3.14]"); + EXPECT_EQ(Run({"JSON.GET", "json-obj"}), + "{\"company\":\"DragonflyDB\",\"product\":\"Dragonfly\",\"website\":\"https://" + "dragondlydb.io\",\"years-active\":[2021,2022,2023,2024,\"and more!\"]}"); +} } // namespace dfly diff --git a/src/server/testdata/redis_json.rdb b/src/server/testdata/redis_json.rdb new file mode 100644 index 000000000..b1f6ba6b3 Binary files /dev/null and b/src/server/testdata/redis_json.rdb differ