feat(server): Add RestoreSerializer (#2366)

* feat(server): Add `RestoreSerializer`

This utility class serializes `CompactObj`s as `RESTORE` commands, and
has a similar interface (and a common base class) as `RdbSerializer`

* RETURN_ON_ERR

* fixes
This commit is contained in:
Shahar Mike 2024-01-04 15:03:14 +02:00 committed by GitHub
parent b15109d4d2
commit 13c2e672b0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 125 additions and 50 deletions

View file

@ -419,7 +419,9 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
if (IsValid(it)) {
DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it";
return RdbSerializer::DumpObject(it->second);
io::StringSink sink;
SerializerBase::DumpObject(it->second, &sink);
return sink.str(); // TODO: Add rvalue overload to str()
}
// fallback
DVLOG(1) << "Dump: '" << key << "' Not found";

View file

@ -281,8 +281,11 @@ io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
return io::Bytes(compr_buf_.data(), frame_size);
}
SerializerBase::SerializerBase(CompressionMode compression_mode)
: compression_mode_(compression_mode), mem_buf_{4_KB} {
}
RdbSerializer::RdbSerializer(CompressionMode compression_mode)
: mem_buf_{4_KB}, tmp_buf_(nullptr), compression_mode_(compression_mode) {
: SerializerBase(compression_mode), tmp_buf_(nullptr) {
}
RdbSerializer::~RdbSerializer() {
@ -764,11 +767,11 @@ error_code RdbSerializer::SendFullSyncCut() {
return WriteRaw(buf);
}
size_t RdbSerializer::GetTotalBufferCapacity() const {
size_t SerializerBase::GetTotalBufferCapacity() const {
return mem_buf_.Capacity();
}
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
error_code SerializerBase::WriteRaw(const io::Bytes& buf) {
mem_buf_.Reserve(mem_buf_.InputLen() + buf.size());
IoBuf::Bytes dest = mem_buf_.AppendBuffer();
memcpy(dest.data(), buf.data(), buf.size());
@ -776,7 +779,7 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
return error_code{};
}
error_code RdbSerializer::FlushToSink(io::Sink* s) {
error_code SerializerBase::FlushToSink(io::Sink* s) {
auto bytes = PrepareFlush();
if (bytes.empty())
return error_code{};
@ -786,12 +789,19 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
// interrupt point.
RETURN_ON_ERR(s->Write(bytes));
mem_buf_.ConsumeInput(bytes.size());
return error_code{};
}
error_code RdbSerializer::FlushToSink(io::Sink* s) {
RETURN_ON_ERR(SerializerBase::FlushToSink(s));
// After every flush we should write the DB index again because the blobs in the channel are
// interleaved and multiple savers can correspond to a single writer (in case of single file rdb
// snapshot)
last_entry_db_index_ = kInvalidDbId;
return error_code{};
return {};
}
namespace {
@ -812,7 +822,11 @@ CrcBuffer MakeCheckSum(std::string_view dump_res) {
return buf;
}
void AppendFooter(std::string* dump_res) {
void AppendFooter(io::StringSink* dump_res) {
auto to_bytes = [](const auto& buf) {
return io::Bytes(reinterpret_cast<const uint8_t*>(buf.data()), buf.size());
};
/* Write the footer, this is how it looks like:
* ----------------+---------------------+---------------+
* ... RDB payload | 2 bytes RDB version | 8 bytes CRC64 |
@ -820,14 +834,13 @@ void AppendFooter(std::string* dump_res) {
* RDB version and CRC are both in little endian.
*/
const auto ver = MakeRdbVersion();
dump_res->append(ver.data(), ver.size());
const auto crc = MakeCheckSum(*dump_res);
dump_res->append(crc.data(), crc.size());
dump_res->Write(to_bytes(ver));
const auto crc = MakeCheckSum(dump_res->str());
dump_res->Write(to_bytes(crc));
}
} // namespace
std::string RdbSerializer::DumpObject(const CompactObj& obj) {
io::StringSink sink;
void SerializerBase::DumpObject(const CompactObj& obj, io::StringSink* out) {
CompressionMode compression_mode = GetDefaultCompressionMode();
if (compression_mode != CompressionMode::NONE) {
compression_mode = CompressionMode::SINGLE_ENTRY;
@ -843,19 +856,17 @@ std::string RdbSerializer::DumpObject(const CompactObj& obj) {
CHECK(!ec);
ec = serializer.SaveValue(obj);
CHECK(!ec); // make sure that fully was successful
ec = serializer.FlushToSink(&sink);
CHECK(!ec); // make sure that fully was successful
std::string dump_payload(sink.str());
AppendFooter(&dump_payload); // version and crc
CHECK_GT(dump_payload.size(), 10u);
return dump_payload;
ec = serializer.FlushToSink(out);
CHECK(!ec); // make sure that fully was successful
AppendFooter(out); // version and crc
CHECK_GT(out->str().size(), 10u);
}
size_t RdbSerializer::SerializedLen() const {
size_t SerializerBase::SerializedLen() const {
return mem_buf_.InputLen();
}
io::Bytes RdbSerializer::PrepareFlush() {
io::Bytes SerializerBase::PrepareFlush() {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return mem_buf_.InputBuffer();
@ -1461,7 +1472,7 @@ size_t RdbSaver::GetTotalBuffersSize() const {
return impl_->GetTotalBuffersSize();
}
void RdbSerializer::AllocateCompressorOnce() {
void SerializerBase::AllocateCompressorOnce() {
if (compressor_impl_) {
return;
}
@ -1474,7 +1485,7 @@ void RdbSerializer::AllocateCompressorOnce() {
}
}
void RdbSerializer::CompressBlob() {
void SerializerBase::CompressBlob() {
if (!compression_stats_) {
compression_stats_.emplace(CompressionStats{});
}
@ -1522,4 +1533,38 @@ void RdbSerializer::CompressBlob() {
++compression_stats_->compressed_blobs;
}
RestoreSerializer::RestoreSerializer(CompressionMode compression_mode)
: SerializerBase(compression_mode) {
}
error_code RestoreSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv,
uint64_t expire_ms, DbIndex dbid) {
absl::InlinedVector<string_view, 4> args;
key_buffer_.clear();
string_view key = pk.GetSlice(&key_buffer_);
args.push_back(key);
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);
value_dump_sink_.Clear();
DumpObject(pv, &value_dump_sink_);
args.push_back(value_dump_sink_.str());
args.push_back("ABSTTL"); // Means expire string is since epoch
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
dbid, //
1, // shard count
make_pair("RESTORE", ArgSlice{args}));
sink_.Clear();
JournalWriter writer{&sink_};
writer.Write(entry);
return WriteRaw(io::Buffer(sink_.str()));
}
} // namespace dfly

View file

@ -131,7 +131,48 @@ class RdbSaver {
class CompressorImpl;
class RdbSerializer {
class SerializerBase {
public:
explicit SerializerBase(CompressionMode compression_mode);
virtual ~SerializerBase() = default;
// Dumps `obj` in DUMP command format into `out`. Uses default compression mode.
static void DumpObject(const CompactObj& obj, io::StringSink* out);
// Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const;
// Flush internal buffer to sink.
virtual std::error_code FlushToSink(io::Sink* s);
size_t GetTotalBufferCapacity() const;
std::error_code WriteRaw(const ::io::Bytes& buf);
protected:
// Prepare internal buffer for flush. Compress it.
io::Bytes PrepareFlush();
// If membuf data is compressable use compression impl to compress the data and write it to membuf
void CompressBlob();
void AllocateCompressorOnce();
CompressionMode compression_mode_;
base::IoBuf mem_buf_;
std::unique_ptr<CompressorImpl> compressor_impl_;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
struct CompressionStats {
uint32_t compression_no_effective = 0;
uint32_t small_str_count = 0;
uint32_t compression_failed = 0;
uint32_t compressed_blobs = 0;
};
std::optional<CompressionStats> compression_stats_;
};
class RdbSerializer : public SerializerBase {
public:
explicit RdbSerializer(CompressionMode compression_mode);
@ -140,12 +181,7 @@ class RdbSerializer {
// Dumps `obj` in DUMP command format. Uses default compression mode.
static std::string DumpObject(const CompactObj& obj);
// Internal buffer size. Might shrink after flush due to compression.
size_t SerializedLen() const;
// Flush internal buffer to sink.
std::error_code FlushToSink(io::Sink* s);
std::error_code FlushToSink(io::Sink* s) override;
std::error_code SelectDb(uint32_t dbid);
// Must be called in the thread to which `it` belongs.
@ -166,7 +202,6 @@ class RdbSerializer {
// for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv);
std::error_code WriteRaw(const ::io::Bytes& buf);
std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
@ -179,12 +214,7 @@ class RdbSerializer {
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();
size_t GetTotalBufferCapacity() const;
private:
// Prepare internal buffer for flush. Compress it.
io::Bytes PrepareFlush();
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const robj* obj);
@ -199,29 +229,27 @@ class RdbSerializer {
std::error_code SaveListPackAsZiplist(uint8_t* lp);
std::error_code SaveStreamPEL(rax* pel, bool nacks);
std::error_code SaveStreamConsumers(streamCG* cg);
// If membuf data is compressable use compression impl to compress the data and write it to membuf
void CompressBlob();
void AllocateCompressorOnce();
base::IoBuf mem_buf_;
std::string tmp_str_;
base::PODArray<uint8_t> tmp_buf_;
DbIndex last_entry_db_index_ = kInvalidDbId;
std::unique_ptr<LZF_HSLOT[]> lzf_;
};
CompressionMode compression_mode_;
std::unique_ptr<CompressorImpl> compressor_impl_;
// Serializes CompactObj as RESTORE commands.
class RestoreSerializer : public SerializerBase {
public:
explicit RestoreSerializer(CompressionMode compression_mode);
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;
struct CompressionStats {
uint32_t compression_no_effective = 0;
uint32_t small_str_count = 0;
uint32_t compression_failed = 0;
uint32_t compressed_blobs = 0;
};
std::optional<CompressionStats> compression_stats_;
std::error_code SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
DbIndex dbid);
private:
// All members are used for saving allocations.
std::string key_buffer_;
io::StringSink value_dump_sink_;
io::StringSink sink_;
};
} // namespace dfly