diff --git a/docs/api_status.md b/docs/api_status.md index 0d91bba54..89b48dcb7 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -173,7 +173,7 @@ with respect to Memcached and Redis APIs. - [X] SCAN - [X] PEXPIREAT - [ ] PEXPIRE - - [ ] DUMP + - [x] DUMP - [X] EVAL - [X] EVALSHA - [ ] OBJECT diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 535fd73e2..f58d10ef3 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -5,18 +5,21 @@ #include "server/generic_family.h" extern "C" { +#include "redis/crc64.h" #include "redis/object.h" #include "redis/util.h" } #include "base/flags.h" #include "base/logging.h" +#include "redis/rdb.h" #include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/rdb_save.h" #include "server/transaction.h" #include "util/varz.h" @@ -28,6 +31,35 @@ using namespace std; using namespace facade; namespace { +using VersionBuffer = std::array; +using CrcBuffer = std::array; + +VersionBuffer MakeRdbVersion() { + VersionBuffer buf; + buf[0] = RDB_VERSION & 0xff; + buf[1] = (RDB_VERSION >> 8) & 0xff; + return buf; +} + +CrcBuffer MakeCheckSum(std::string_view dump_res) { + uint64_t chksum = crc64(0, reinterpret_cast(dump_res.data()), dump_res.size()); + CrcBuffer buf; + absl::little_endian::Store64(buf.data(), chksum); + return buf; +} + +void AppendFooter(std::string* dump_res) { + /* Write the footer, this is how it looks like: + * ----------------+---------------------+---------------+ + * ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 | + * ----------------+---------------------+---------------+ + * RDB version and CRC are both in little endian. + */ + const auto ver = MakeRdbVersion(); + dump_res->append(ver.data(), ver.size()); + const auto crc = MakeCheckSum(*dump_res); + dump_res->append(crc.data(), crc.size()); +} OpStatus OpPersist(const OpArgs& op_args, string_view key); @@ -195,6 +227,38 @@ OpStatus OpPersist(const OpArgs& op_args, string_view key) { } } +OpResult OpDump(const OpArgs& op_args, string_view key) { + auto& db_slice = op_args.shard->db_slice(); + auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key); + + if (IsValid(it)) { + DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it"; + std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>(); + RdbSerializer serializer(sink.get()); + + // According to Redis code we need to + // 1. Save the value itself - without the key + // 2. Save footer: this include the RDB version and the CRC value for the message + unsigned obj_type = it->second.ObjType(); + unsigned encoding = it->second.Encoding(); + auto type = RdbObjectType(obj_type, encoding); + DVLOG(1) << "We are going to dump object type: " << type; + std::error_code ec = serializer.WriteOpcode(type); + CHECK(!ec); + ec = serializer.SaveValue(it->second); + CHECK(!ec); // make sure that fully was successful + ec = serializer.FlushMem(); + CHECK(!ec); // make sure that fully was successful + std::string dump_payload(sink->str()); + AppendFooter(&dump_payload); // version and crc + CHECK_GT(dump_payload.size(), 10u); + return dump_payload; + } + // fallback + DVLOG(1) << "Dump: '" << key << "' Not found"; + return OpStatus::KEY_NOTFOUND; +} + bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) { auto& db_slice = op_args.shard->db_slice(); if (it->second.HasExpire()) { @@ -754,6 +818,23 @@ void GenericFamily::Select(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendOk(); } +void GenericFamily::Dump(CmdArgList args, ConnectionContext* cntx) { + std::string_view key = ArgS(args, 1); + DVLOG(1) << "Dumping before ::ScheduleSingleHopT " << key; + auto cb = [&](Transaction* t, EngineShard* shard) { return OpDump(t->GetOpArgs(shard), key); }; + + Transaction* trans = cntx->transaction; + OpResult result = trans->ScheduleSingleHopT(std::move(cb)); + if (result) { + DVLOG(1) << "Dump " << trans->DebugId() << ": " << key << ", dump size " + << result.value().size(); + (*cntx)->SendBulkString(*result); + } else { + DVLOG(1) << "Dump failed: " << result.DebugFormat() << key << " nil"; + (*cntx)->SendNull(); + } +} + void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); @@ -1074,6 +1155,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) << CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, 0}.HFUNC(Time) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) + << CI{"DUMP", CO::READONLY, 2, 1, 1, 1}.HFUNC(Dump) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick) << CI{"SORT", CO::READONLY, -2, 1, 1, 1}.HFUNC(Sort) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 37d009492..a7d2da81b 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -63,6 +63,7 @@ class GenericFamily { static void Scan(CmdArgList args, ConnectionContext* cntx); static void Time(CmdArgList args, ConnectionContext* cntx); static void Type(CmdArgList args, ConnectionContext* cntx); + static void Dump(CmdArgList args, ConnectionContext* cntx); static OpResult RenameGeneric(CmdArgList args, bool skip_exist_dest, ConnectionContext* cntx); diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 91f08d21b..b5fcfc1ec 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -4,6 +4,10 @@ #include "server/generic_family.h" +extern "C" { +#include "redis/rdb.h" +} + #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" @@ -376,4 +380,41 @@ TEST_F(GenericFamilyTest, Persist) { EXPECT_EQ(-1, CheckedInt({"TTL", "mykey"})); } +TEST_F(GenericFamilyTest, Dump) { + // The following would only work for RDB version 9 + // The format was changed at version 10 + // The expected results were taken from running the same with Redis branch 6.2 + ASSERT_THAT(RDB_VERSION, 9); + uint8_t EXPECTED_STRING_DUMP[13] = {0x00, 0xc0, 0x13, 0x09, 0x00, 0x23, 0x13, + 0x6f, 0x4d, 0x68, 0xf6, 0x35, 0x6e}; + uint8_t EXPECTED_HASH_DUMP[] = {0x0d, 0x12, 0x12, 0x00, 0x00, 0x00, 0x0d, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0xfe, 0x13, 0x03, 0xc0, 0xd2, 0x04, 0xff, + 0x09, 0x00, 0xb1, 0x0b, 0xae, 0x6c, 0x23, 0x5d, 0x17, 0xaa}; + uint8_t EXPECTED_LIST_DUMP[] = {0x0e, 0x01, 0x0e, 0x0e, 0x00, 0x00, 0x00, 0x0a, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0xfe, 0x14, 0xff, 0x09, + 0x00, 0xba, 0x1e, 0xa9, 0x6b, 0xba, 0xfe, 0x2d, 0x3f}; + + // Check string dump + auto resp = Run({"set", "z", "19"}); + EXPECT_EQ(resp, "OK"); + resp = Run({"dump", "z"}); + auto dump = resp.GetBuf(); + CHECK_EQ(ToSV(dump), ToSV(EXPECTED_STRING_DUMP)); + + // Check list dump + EXPECT_EQ(1, CheckedInt({"rpush", "l", "20"})); + resp = Run({"dump", "l"}); + dump = resp.GetBuf(); + CHECK_EQ(ToSV(dump), ToSV(EXPECTED_LIST_DUMP)); + + // Check for hash dump + EXPECT_EQ(1, CheckedInt({"hset", "z2", "19", "1234"})); + resp = Run({"dump", "z2"}); + dump = resp.GetBuf(); + CHECK_EQ(ToSV(dump), ToSV(EXPECTED_HASH_DUMP)); + + // Check that when running with none existing key we're getting nil + resp = Run({"dump", "foo"}); + EXPECT_EQ(resp.type, RespExpr::NIL); +} } // namespace dfly diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 711439c9f..99e9ad0e2 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -117,6 +117,11 @@ inline unsigned SerializeLen(uint64_t len, uint8_t* buf) { return 1 + 8; } +constexpr size_t kBufLen = 64_KB; +constexpr size_t kAmask = 4_KB - 1; + +} // namespace + uint8_t RdbObjectType(unsigned type, unsigned encoding) { switch (type) { case OBJ_STRING: @@ -152,17 +157,27 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) { return 0; /* avoid warning */ } -constexpr size_t kBufLen = 64_KB; -constexpr size_t kAmask = 4_KB - 1; - -} // namespace - RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) { } RdbSerializer::~RdbSerializer() { } +std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) { + std::error_code ec; + if (pv.ObjType() == OBJ_STRING) { + auto opt_int = pv.TryGetInt(); + if (opt_int) { + ec = SaveLongLongAsString(*opt_int); + } else { + ec = SaveString(pv.GetSlice(&tmp_str_)); + } + } else { + ec = SaveObject(pv); + } + return ec; +} + error_code RdbSerializer::SelectDb(uint32_t dbid) { uint8_t buf[16]; buf[0] = RDB_OPCODE_SELECTDB; @@ -189,7 +204,7 @@ io::Result RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu unsigned encoding = pv.Encoding(); uint8_t rdb_type = RdbObjectType(obj_type, encoding); - DVLOG(3) << "Saving keyval start " << key; + DVLOG(3) << "Saving key/val start " << key; ec = WriteOpcode(rdb_type); if (ec) @@ -198,18 +213,7 @@ io::Result RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu ec = SaveString(key); if (ec) return make_unexpected(ec); - - if (obj_type == OBJ_STRING) { - auto opt_int = pv.TryGetInt(); - if (opt_int) { - ec = SaveLongLongAsString(*opt_int); - } else { - ec = SaveString(pv.GetSlice(&tmp_str_)); - } - } else { - ec = SaveObject(pv); - } - + ec = SaveValue(pv); if (ec) return make_unexpected(ec); return rdb_type; diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 009aa50d6..79a51f7d8 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -21,6 +21,8 @@ typedef struct streamCG streamCG; namespace dfly { +uint8_t RdbObjectType(unsigned type, unsigned encoding); + class EngineShard; class AlignedBuffer : public ::io::Sink { @@ -118,6 +120,12 @@ class RdbSerializer { std::error_code FlushMem(); + // This would work for either string or an object. + // The arg pv is taken from it->second if accessing + // this by finding the key. This function is used + // for the dump command - thus it is public function + std::error_code SaveValue(const PrimeValue& pv); + private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv);