From d74b076e18d987175ffe4429fc3d348af6f8e53d Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 6 Dec 2022 13:28:19 +0200 Subject: [PATCH] feat(rdb save): refactor move zstd serializer under rdb serializer (#533) Signed-off-by: adi_holden --- src/server/generic_family.cc | 4 +- src/server/rdb_save.cc | 147 ++++++++++++++++++++--------------- src/server/rdb_save.h | 40 ++++------ src/server/snapshot.cc | 35 ++++----- src/server/snapshot.h | 4 +- 5 files changed, 115 insertions(+), 115 deletions(-) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 525f89d13..506765aea 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -431,7 +431,9 @@ OpResult OpDump(const OpArgs& op_args, string_view key) { DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it"; std::unique_ptr<::io::StringSink> sink = std::make_unique<::io::StringSink>(); int compression_mode = absl::GetFlag(FLAGS_compression_mode); - RdbSerializer serializer(compression_mode != 0); + CompressionMode serializer_compression_mode = + compression_mode == 0 ? CompressionMode::NONE : CompressionMode::SINGLE_ENTRY; + RdbSerializer serializer(serializer_compression_mode); // According to Redis code we need to // 1. Save the value itself - without the key diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 84feadaca..2905cb6d6 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -166,11 +166,52 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) { return 0; /* avoid warning */ } -RdbSerializer::RdbSerializer(bool do_compression) - : mem_buf_{4_KB}, tmp_buf_(nullptr), do_entry_level_compression_(do_compression) { +class ZstdCompressImpl { + public: + ZstdCompressImpl() { + cctx_ = ZSTD_createCCtx(); + compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level); + } + ~ZstdCompressImpl() { + ZSTD_freeCCtx(cctx_); + + VLOG(1) << "zstd compressed size: " << compressed_size_total_; + VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_; + } + + io::Bytes Compress(io::Bytes str); + + private: + ZSTD_CCtx* cctx_; + int compression_level_ = 1; + size_t compressed_size_total_ = 0; + size_t uncompressed_size_total_ = 0; + base::PODArray compr_buf_; +}; + +io::Bytes ZstdCompressImpl::Compress(io::Bytes str) { + size_t buf_size = ZSTD_compressBound(str.size()); + if (compr_buf_.capacity() < buf_size) { + compr_buf_.reserve(buf_size); + } + size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), + str.data(), str.size(), compression_level_); + + compressed_size_total_ += compressed_size; + uncompressed_size_total_ += str.size(); + return io::Bytes(compr_buf_.data(), compressed_size); +} + +RdbSerializer::RdbSerializer(CompressionMode compression_mode) + : mem_buf_{4_KB}, tmp_buf_(nullptr), compression_mode_(compression_mode) { } RdbSerializer::~RdbSerializer() { + VLOG(1) << "compression mode: " << uint32_t(compression_mode_); + if (compression_stats_) { + VLOG(1) << "compression not effective: " << compression_stats_->compression_no_effective; + VLOG(1) << "small string none compression applied: " << compression_stats_->small_str_count; + } } std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) { @@ -611,6 +652,12 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) { DVLOG(2) << "FlushToSink " << sz << " bytes"; + if (compression_mode_ == CompressionMode::MULTY_ENTRY) { + CompressBlob(); + // After blob was compressed membuf was overwirten with compressed data + sz = mem_buf_.InputLen(); + } + // interrupt point. RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer())); mem_buf_.ConsumeInput(sz); @@ -636,7 +683,7 @@ error_code RdbSerializer::SaveString(string_view val) { /* Try LZF compression - under 20 bytes it's unable to compress even * aaaaaaaaaaaaaaaaaa so skip it */ size_t len = val.size(); - if (do_entry_level_compression_ && len > 20) { + if ((compression_mode_ == CompressionMode::SINGLE_ENTRY) && (len > 20)) { size_t comprlen, outlen = len; tmp_buf_.resize(outlen + 1); @@ -786,8 +833,9 @@ class RdbSaver::Impl { RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode compression_mode, io::Sink* sink) : sink_(sink), shard_snapshots_(producers_len), - meta_serializer_(compression_mode != CompressionMode::NONE), channel_{128, producers_len}, - compression_mode_(compression_mode) { + meta_serializer_(CompressionMode::NONE), // Note: I think there is not need for compression + // at all in meta serializer + channel_{128, producers_len}, compression_mode_(compression_mode) { if (align_writes) { aligned_buf_.emplace(kBufLen, sink); sink_ = &aligned_buf_.value(); @@ -1044,75 +1092,46 @@ void RdbSaver::Cancel() { impl_->Cancel(); } -class ZstdCompressSerializer::ZstdCompressImpl { - public: - ZstdCompressImpl() { - cctx_ = ZSTD_createCCtx(); - compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level); +void RdbSerializer::CompressBlob() { + if (!compression_stats_) { + compression_stats_.emplace(); } - ~ZstdCompressImpl() { - ZSTD_freeCCtx(cctx_); - - VLOG(1) << "zstd compressed size: " << compressed_size_total_; - VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_; + Bytes blob_to_compress = mem_buf_.InputBuffer(); + size_t blob_size = blob_to_compress.size(); + if (blob_size < kMinStrSizeToCompress) { + ++compression_stats_->small_str_count; + return; } - std::string_view Compress(std::string_view str); - - private: - ZSTD_CCtx* cctx_; - int compression_level_ = 1; - base::PODArray compr_buf_; - uint32_t compressed_size_total_ = 0; - uint32_t uncompressed_size_total_ = 0; -}; - -std::string_view ZstdCompressSerializer::ZstdCompressImpl::Compress(string_view str) { - size_t buf_size = ZSTD_compressBound(str.size()); - if (compr_buf_.size() < buf_size) { - compr_buf_.reserve(buf_size); - } - size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), - str.data(), str.size(), compression_level_); - - compressed_size_total_ += compressed_size; - uncompressed_size_total_ += str.size(); - return string_view(reinterpret_cast(compr_buf_.data()), compressed_size); -} - -ZstdCompressSerializer::ZstdCompressSerializer() { - impl_.reset(new ZstdCompressImpl()); -} - -optional ZstdCompressSerializer::Compress(std::string_view str) { - if (str.size() < kMinStrSizeToCompress) { - ++small_str_count_; - return nullopt; + // Compress the data + if (!compressor_impl_) { + compressor_impl_.reset(new ZstdCompressImpl()); } - // Compress the string - string_view compressed_res = impl_->Compress(str); - if (compressed_res.size() > str.size() * kMinCompressionReductionPrecentage) { - ++compression_no_effective_; - return nullopt; + Bytes compressed_blob = compressor_impl_->Compress(blob_to_compress); + if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) { + ++compression_stats_->compression_no_effective; + return; } - string serialized_compressed_blob; + // Clear membuf and write the compressed blob to it + mem_buf_.ConsumeInput(blob_size); + mem_buf_.Reserve(compressed_blob.length() + 1 + 9); // reserve space for blob + opcode + len + // First write opcode for compressed string - serialized_compressed_blob.push_back(RDB_OPCODE_COMPRESSED_BLOB_START); - // Get compressed string len encoded - uint8_t buf[9]; - unsigned enclen = SerializeLen(compressed_res.size(), buf); + auto dest = mem_buf_.AppendBuffer(); + dest[0] = RDB_OPCODE_COMPRESSED_BLOB_START; + mem_buf_.CommitWrite(1); - // Write encoded compressed string len and than the compressed string - serialized_compressed_blob.append(reinterpret_cast(buf), enclen); - serialized_compressed_blob.append(compressed_res); - return serialized_compressed_blob; -} + // Write encoded compressed blob len + dest = mem_buf_.AppendBuffer(); + unsigned enclen = SerializeLen(compressed_blob.length(), dest.data()); + mem_buf_.CommitWrite(enclen); -ZstdCompressSerializer::~ZstdCompressSerializer() { - VLOG(1) << "zstd compression not effective: " << compression_no_effective_; - VLOG(1) << "small string none compression applied: " << small_str_count_; + // Write compressed blob + dest = mem_buf_.AppendBuffer(); + memcpy(dest.data(), compressed_blob.data(), compressed_blob.length()); + mem_buf_.CommitWrite(compressed_blob.length()); } } // namespace dfly diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 23090939f..7ac8e6dfe 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -112,32 +112,11 @@ class RdbSaver { CompressionMode compression_mode_; }; -class ZstdCompressSerializer { - public: - ZstdCompressSerializer(); - ZstdCompressSerializer(const ZstdCompressSerializer&) = delete; - void operator=(const ZstdCompressSerializer&) = delete; - - ~ZstdCompressSerializer(); - - // Returns string if compression was applied, null otherwise - std::optional Compress(std::string_view str); - - private: - class ZstdCompressImpl; - std::unique_ptr impl_; - static constexpr size_t kMinStrSizeToCompress = 256; - static constexpr double kMinCompressionReductionPrecentage = 0.95; - uint32_t compression_no_effective_ = 0; - uint32_t small_str_count_ = 0; -}; +class ZstdCompressImpl; class RdbSerializer { public: - // TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned - // mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer - // directly. - RdbSerializer(bool do_entry_level_compression); + RdbSerializer(CompressionMode compression_mode); ~RdbSerializer(); @@ -158,7 +137,6 @@ class RdbSerializer { return SaveString(std::string_view{reinterpret_cast(buf), len}); } - // TODO(Adi) : add flag to flush compressed blob to sink, move zstd serializer under RdbSerializer std::error_code FlushToSink(io::Sink* s); std::error_code SaveLen(size_t len); @@ -184,12 +162,24 @@ class RdbSerializer { std::error_code SaveListPackAsZiplist(uint8_t* lp); std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamConsumers(streamCG* cg); + // If membuf data is compressable use compression impl to compress the data and write it to membuf + void CompressBlob(); std::unique_ptr lzf_; base::IoBuf mem_buf_; base::PODArray tmp_buf_; std::string tmp_str_; - bool do_entry_level_compression_; + CompressionMode compression_mode_; + // TODO : This compressor impl should support different compression algorithms zstd/lz4 etc. + std::unique_ptr compressor_impl_; + + static constexpr size_t kMinStrSizeToCompress = 256; + static constexpr double kMinCompressionReductionPrecentage = 0.95; + struct CompressionStats { + uint32_t compression_no_effective = 0; + uint32_t small_str_count = 0; + }; + std::optional compression_stats_; }; } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 612c68527..cb0be6771 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -51,9 +51,8 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { journal_cb_id_ = journal->RegisterOnChange(move(journal_cb)); } - bool do_compression = (compression_mode_ == CompressionMode::SINGLE_ENTRY); default_buffer_.reset(new io::StringFile); - default_serializer_.reset(new RdbSerializer(do_compression)); + default_serializer_.reset(new RdbSerializer(compression_mode_)); VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; @@ -203,7 +202,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite optional tmp_serializer; RdbSerializer* serializer_ptr = default_serializer_.get(); if (db_index != current_db_) { - tmp_serializer.emplace(compression_mode_ != CompressionMode::NONE); + CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE + ? CompressionMode::NONE + : CompressionMode::SINGLE_ENTRY; + tmp_serializer.emplace(compression_mode); serializer_ptr = &*tmp_serializer; } @@ -235,21 +237,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr ++type_freq_map_[*res]; } -void SliceSnapshot::PushFileToChannel(DbIndex db_index, bool should_compress, - io::StringFile* sfile) { - string payload = std::move(sfile->val); - - if (should_compress) { - if (!zstd_serializer_) { - zstd_serializer_.reset(new ZstdCompressSerializer()); - } - - if (auto comp = zstd_serializer_->Compress(payload); comp) { - payload = std::move(*comp); - } - } - - dest_->Push(GetDbRecord(db_index, std::move(payload))); +void SliceSnapshot::PushFileToChannel(DbIndex db_index, io::StringFile* sfile) { + dest_->Push(GetDbRecord(db_index, std::move(sfile->val))); } bool SliceSnapshot::FlushDefaultBuffer(bool force) { @@ -263,8 +252,7 @@ bool SliceSnapshot::FlushDefaultBuffer(bool force) { VLOG(2) << "FlushDefaultBuffer " << default_buffer_->val.size() << " bytes"; - bool multi_entries_compression = (compression_mode_ == CompressionMode::MULTY_ENTRY); - PushFileToChannel(current_db_, multi_entries_compression, default_buffer_.get()); + PushFileToChannel(current_db_, default_buffer_.get()); return true; } @@ -290,7 +278,10 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { optional tmp_serializer; RdbSerializer* serializer_ptr = default_serializer_.get(); if (entry.db_ind != current_db_) { - tmp_serializer.emplace(compression_mode_ != CompressionMode::NONE); + CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE + ? CompressionMode::NONE + : CompressionMode::SINGLE_ENTRY; + tmp_serializer.emplace(compression_mode); serializer_ptr = &*tmp_serializer; } @@ -327,6 +318,6 @@ void SliceSnapshot::FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializ io::StringFile sfile{}; error_code ec = serializer->FlushToSink(&sfile); CHECK(!ec && !sfile.val.empty()); - PushFileToChannel(db_index, false, &sfile); + PushFileToChannel(db_index, &sfile); } } // namespace dfly diff --git a/src/server/snapshot.h b/src/server/snapshot.h index c06ed021b..1f2a45e66 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -21,7 +21,6 @@ struct Entry; } // namespace journal class RdbSerializer; -class ZstdCompressSerializer; //┌────────────────┐ ┌─────────────┐ //│IterateBucketsFb│ │ OnDbChange │ @@ -97,7 +96,7 @@ class SliceSnapshot { std::optional expire, RdbSerializer* serializer); // Push StringFile buffer to channel. - void PushFileToChannel(DbIndex db_index, bool should_compress, io::StringFile* sfile); + void PushFileToChannel(DbIndex db_index, io::StringFile* sfile); // DbChange listener void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); @@ -148,7 +147,6 @@ class SliceSnapshot { // TODO : drop default_buffer from this class, we dont realy need it. std::unique_ptr default_buffer_; // filled by default_serializer_ std::unique_ptr default_serializer_; - std::unique_ptr zstd_serializer_; ::boost::fibers::mutex mu_; ::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb