From ae9e3c45d951ccb6ae5671ba2d22aacad4e6e163 Mon Sep 17 00:00:00 2001 From: adiholden Date: Thu, 8 Dec 2022 09:41:55 +0200 Subject: [PATCH] Support lz4 compression (#545) Signed-off-by: adi_holden --- src/server/rdb_load.cc | 130 ++++++++++++++++++++++++++++++++++++----- src/server/rdb_load.h | 7 ++- src/server/rdb_save.cc | 73 +++++++++++------------ src/server/rdb_save.h | 2 + src/server/rdb_test.cc | 3 +- 5 files changed, 156 insertions(+), 59 deletions(-) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 8dd18042f..ded7846b5 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -18,6 +18,7 @@ extern "C" { } #include #include +#include #include #include "base/endian.h" @@ -205,12 +206,24 @@ bool resizeStringSet(robj* set, size_t size, bool use_set2) { } // namespace -class ZstdDecompressImpl { +class DecompressImpl { public: - ZstdDecompressImpl() : uncompressed_mem_buf_{16_KB} { + DecompressImpl() : uncompressed_mem_buf_{16_KB} { + } + ~DecompressImpl() { + } + virtual io::Result Decompress(std::string_view str) = 0; + + protected: + base::IoBuf uncompressed_mem_buf_; +}; + +class ZstdDecompress : public DecompressImpl { + public: + ZstdDecompress() { dctx_ = ZSTD_createDCtx(); } - ~ZstdDecompressImpl() { + ~ZstdDecompress() { ZSTD_freeDCtx(dctx_); } @@ -218,10 +231,9 @@ class ZstdDecompressImpl { private: ZSTD_DCtx* dctx_; - base::IoBuf uncompressed_mem_buf_; }; -io::Result ZstdDecompressImpl::Decompress(std::string_view str) { +io::Result ZstdDecompress::Decompress(std::string_view str) { // Prepare membuf memory to uncompressed string. auto uncomp_size = ZSTD_getFrameContentSize(str.data(), str.size()); if (uncomp_size == ZSTD_CONTENTSIZE_UNKNOWN) { @@ -259,6 +271,87 @@ io::Result ZstdDecompressImpl::Decompress(std::string_view str) { return &uncompressed_mem_buf_; } +class Lz4Decompress : public DecompressImpl { + public: + Lz4Decompress() { + auto result = LZ4F_createDecompressionContext(&dctx_, LZ4F_VERSION); + CHECK(!LZ4F_isError(result)); + } + ~Lz4Decompress() { + auto result = LZ4F_freeDecompressionContext(dctx_); + CHECK(!LZ4F_isError(result)); + } + + io::Result Decompress(std::string_view str); + + private: + LZ4F_dctx* dctx_; +}; + +io::Result Lz4Decompress::Decompress(std::string_view data) { + LZ4F_frameInfo_t frame_info; + size_t frame_size = data.size(); + + // Get content size from frame data + size_t consumed = frame_size; // The nb of bytes consumed from data will be written into consumed + size_t res = LZ4F_getFrameInfo(dctx_, &frame_info, data.data(), &consumed); + if (LZ4F_isError(res)) { + return make_unexpected(error_code{int(res), generic_category()}); + } + if (frame_info.contentSize == 0) { + LOG(ERROR) << "Missing frame content size"; + return Unexpected(errc::rdb_file_corrupted); + } + + // reserve place for uncompressed data and end opcode + size_t reserve = frame_info.contentSize + 1; + uncompressed_mem_buf_.Reserve(reserve); + IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer(); + if (dest.size() < reserve) { + return Unexpected(errc::out_of_memory); + } + + // Uncompress data to membuf + string_view src = data.substr(consumed); + size_t src_size = src.size(); + + size_t ret = 1; + while (ret != 0) { + IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer(); + size_t dest_capacity = dest.size(); + + // It will read up to src_size bytes from src, + // and decompress data into dest, of capacity dest_capacity + // The nb of bytes consumed from src will be written into src_size + // The nb of bytes decompressed into dest will be written into dest_capacity + ret = LZ4F_decompress(dctx_, dest.data(), &dest_capacity, src.data(), &src_size, nullptr); + if (LZ4F_isError(ret)) { + return make_unexpected(error_code{int(ret), generic_category()}); + } + consumed += src_size; + + uncompressed_mem_buf_.CommitWrite(dest_capacity); + src = src.substr(src_size); + src_size = src.size(); + } + if (consumed != frame_size) { + return Unexpected(errc::rdb_file_corrupted); + } + if (uncompressed_mem_buf_.InputLen() != frame_info.contentSize) { + return Unexpected(errc::rdb_file_corrupted); + } + + // Add opcode of compressed blob end to membuf. + dest = uncompressed_mem_buf_.AppendBuffer(); + if (dest.size() < 1) { + return Unexpected(errc::out_of_memory); + } + dest[0] = RDB_OPCODE_COMPRESSED_BLOB_END; + uncompressed_mem_buf_.CommitWrite(1); + + return &uncompressed_mem_buf_; +} + class RdbLoaderBase::OpaqueObjLoader { public: OpaqueObjLoader(int rdb_type, PrimeValue* pv) : rdb_type_(rdb_type), pv_(pv) { @@ -1666,14 +1759,11 @@ error_code RdbLoader::Load(io::Source* src) { return RdbError(errc::feature_not_supported); } - if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) { - RETURN_ON_ERR(HandleCompressedBlob()); + if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START || + type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) { + RETURN_ON_ERR(HandleCompressedBlob(type)); continue; } - if (type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) { - LOG(ERROR) << "LZ4 not supported yet"; - return RdbError(errc::feature_not_supported); - } if (type == RDB_OPCODE_COMPRESSED_BLOB_END) { RETURN_ON_ERR(HandleCompressedBlobFinish()); @@ -1794,11 +1884,21 @@ auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result { return res; } -error_code RdbLoaderBase::HandleCompressedBlob() { - if (!zstd_decompress_) { - zstd_decompress_.reset(new ZstdDecompressImpl()); +void RdbLoaderBase::AlocateDecompressOnce(int op_type) { + if (decompress_impl_) { + return; } + if (op_type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) { + decompress_impl_.reset(new ZstdDecompress()); + } else if (op_type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) { + decompress_impl_.reset(new Lz4Decompress()); + } else { + CHECK(false) << "Decompressor allocation should not be done"; + } +} +error_code RdbLoaderBase::HandleCompressedBlob(int op_type) { + AlocateDecompressOnce(op_type); // Fetch uncompress blob string res; SET_OR_RETURN(FetchGenericString(), res); @@ -1807,7 +1907,7 @@ error_code RdbLoaderBase::HandleCompressedBlob() { // Last type in the compressed blob is RDB_OPCODE_COMPRESSED_BLOB_END // in which we will switch back to the origin membuf (HandleCompressedBlobFinish) string_view uncompressed_blob; - SET_OR_RETURN(zstd_decompress_->Decompress(res), mem_buf_); + SET_OR_RETURN(decompress_impl_->Decompress(res), mem_buf_); return kOk; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index b181ade4b..2acf46ea0 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -21,7 +21,7 @@ class EngineShardSet; class ScriptMgr; class CompactObj; -class ZstdDecompressImpl; +class DecompressImpl; class RdbLoaderBase { protected: @@ -127,8 +127,9 @@ class RdbLoaderBase { ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(); - std::error_code HandleCompressedBlob(); + std::error_code HandleCompressedBlob(int op_type); std::error_code HandleCompressedBlobFinish(); + void AlocateDecompressOnce(int op_type); static size_t StrLen(const RdbVariant& tset); @@ -144,7 +145,7 @@ class RdbLoaderBase { size_t bytes_read_ = 0; size_t source_limit_ = SIZE_MAX; base::PODArray compr_buf_; - std::unique_ptr zstd_decompress_; + std::unique_ptr decompress_impl_; }; class RdbLoader : protected RdbLoaderBase { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 2eca192f2..dc09f3e04 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -31,7 +31,7 @@ extern "C" { #include "server/snapshot.h" #include "util/fibers/simple_channel.h" -ABSL_FLAG(int, compression_mode, 2, +ABSL_FLAG(int, compression_mode, 3, "set 0 for no compression," "set 1 for single entry lzf compression," "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," @@ -190,7 +190,6 @@ class ZstdCompressor : public CompressorImpl { public: ZstdCompressor() { cctx_ = ZSTD_createCCtx(); - compression_level_ = absl::GetFlag(FLAGS_compression_level); } ~ZstdCompressor() { ZSTD_freeCCtx(cctx_); @@ -200,9 +199,6 @@ class ZstdCompressor : public CompressorImpl { private: ZSTD_CCtx* cctx_; - int compression_level_ = 1; - size_t compressed_size_total_ = 0; - size_t uncompressed_size_total_ = 0; base::PODArray compr_buf_; }; @@ -224,54 +220,35 @@ io::Result ZstdCompressor::Compress(io::Bytes data) { class Lz4Compressor : public CompressorImpl { public: - // create a compression context Lz4Compressor() { - LZ4F_errorCode_t result = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION); - CHECK(!LZ4F_isError(result)); lz4_pref_.compressionLevel = compression_level_; } - // destroy the compression context ~Lz4Compressor() { - LZ4F_freeCompressionContext(ctx_); } // compress a string of data io::Result Compress(io::Bytes data); private: - LZ4F_cctx* ctx_; - LZ4F_preferences_t lz4_pref_; + LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES; }; io::Result Lz4Compressor::Compress(io::Bytes data) { - size_t buf_size = LZ4F_compressFrameBound(data.size(), NULL); + lz4_pref_.frameInfo.contentSize = data.size(); + size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_); if (compr_buf_.capacity() < buf_size) { compr_buf_.reserve(buf_size); } - // initialize the compression context - size_t compressed_size = - LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), &lz4_pref_); - if (LZ4F_isError(compressed_size)) { - return make_unexpected(error_code{int(compressed_size), generic_category()}); - } - // compress the data - compressed_size = LZ4F_compressUpdate(ctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(), - data.size(), NULL); - if (LZ4F_isError(compressed_size)) { - return make_unexpected(error_code{int(compressed_size), generic_category()}); + size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(), + data.size(), &lz4_pref_); + if (LZ4F_isError(frame_size)) { + return make_unexpected(error_code{int(frame_size), generic_category()}); } - - // finish the compression process - compressed_size = LZ4F_compressEnd(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL); - if (LZ4F_isError(compressed_size)) { - return make_unexpected(error_code{int(compressed_size), generic_category()}); - } - - compressed_size_total_ += compressed_size; + compressed_size_total_ += frame_size; uncompressed_size_total_ += data.size(); - return io::Bytes(compr_buf_.data(), compressed_size); + return io::Bytes(compr_buf_.data(), frame_size); } RdbSerializer::RdbSerializer(CompressionMode compression_mode) @@ -283,6 +260,8 @@ RdbSerializer::~RdbSerializer() { if (compression_stats_) { VLOG(1) << "compression not effective: " << compression_stats_->compression_no_effective; VLOG(1) << "small string none compression applied: " << compression_stats_->small_str_count; + VLOG(1) << "compression failed: " << compression_stats_->compression_failed; + VLOG(1) << "compressed blobs:" << compression_stats_->compressed_blobs; } } @@ -724,7 +703,8 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) { DVLOG(2) << "FlushToSink " << sz << " bytes"; - if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) { + if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD || + compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { CompressBlob(); // After blob was compressed membuf was overwirten with compressed data sz = mem_buf_.InputLen(); @@ -1040,7 +1020,6 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) { producer_count = 1; if (compression_mode == 3) { compression_mode_ = CompressionMode::MULTY_ENTRY_LZ4; - LOG(ERROR) << "Lz4 compression not supported yet"; } else if (compression_mode == 2) { compression_mode_ = CompressionMode::MULTY_ENTRY_ZSTD; } else if (compression_mode == 1) { @@ -1167,6 +1146,19 @@ void RdbSaver::Cancel() { impl_->Cancel(); } +void RdbSerializer::AllocateCompressorOnce() { + if (compressor_impl_) { + return; + } + if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) { + compressor_impl_.reset(new ZstdCompressor()); + } else if (compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { + compressor_impl_.reset(new Lz4Compressor()); + } else { + CHECK(false) << "Compressor allocation should not be done"; + } +} + void RdbSerializer::CompressBlob() { if (!compression_stats_) { compression_stats_.emplace(); @@ -1178,11 +1170,8 @@ void RdbSerializer::CompressBlob() { return; } + AllocateCompressorOnce(); // Compress the data - if (!compressor_impl_) { - compressor_impl_.reset(new ZstdCompressor()); // TODO(Adi): add support for lz4 - } - auto ec = compressor_impl_->Compress(blob_to_compress); if (!ec) { ++compression_stats_->compression_failed; @@ -1200,7 +1189,10 @@ void RdbSerializer::CompressBlob() { // First write opcode for compressed string auto dest = mem_buf_.AppendBuffer(); - dest[0] = RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START; // TODO(Adi): add support for lz4 + uint8_t opcode = compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD + ? RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START + : RDB_OPCODE_COMPRESSED_LZ4_BLOB_START; + dest[0] = opcode; mem_buf_.CommitWrite(1); // Write encoded compressed blob len @@ -1212,6 +1204,7 @@ void RdbSerializer::CompressBlob() { dest = mem_buf_.AppendBuffer(); memcpy(dest.data(), compressed_blob.data(), compressed_blob.length()); mem_buf_.CommitWrite(compressed_blob.length()); + ++compression_stats_->compressed_blobs; } } // namespace dfly diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 8b696d298..55a775dd4 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -160,6 +160,7 @@ class RdbSerializer { std::error_code SaveStreamConsumers(streamCG* cg); // If membuf data is compressable use compression impl to compress the data and write it to membuf void CompressBlob(); + void AllocateCompressorOnce(); std::unique_ptr lzf_; base::IoBuf mem_buf_; @@ -175,6 +176,7 @@ class RdbSerializer { uint32_t compression_no_effective = 0; uint32_t small_str_count = 0; uint32_t compression_failed = 0; + uint32_t compressed_blobs = 0; }; std::optional compression_stats_; }; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index fafbc5e1e..cc82dec95 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -148,7 +148,7 @@ TEST_F(RdbTest, Stream) { TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { Run({"debug", "populate", "500000"}); - for (int i = 0; i <= 2; ++i) { + for (int i = 0; i <= 3; ++i) { SetFlag(&FLAGS_compression_mode, i); RespExpr resp = Run({"save", "df"}); ASSERT_EQ(resp, "OK"); @@ -156,6 +156,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { auto save_info = service_->server_family().GetLastSaveInfo(); resp = Run({"debug", "load", save_info->file_name}); ASSERT_EQ(resp, "OK"); + ASSERT_EQ(500000, CheckedInt({"dbsize"})); } }