diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index b837379bb..e51f7033d 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -50,6 +50,7 @@ add_library(dragonfly_lib bloom_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc protocol_client.cc snapshot.cc script_mgr.cc server_family.cc + detail/compressor.cc detail/decompress.cc detail/save_stages_controller.cc detail/snapshot_storage.cc diff --git a/src/server/detail/compressor.cc b/src/server/detail/compressor.cc new file mode 100644 index 000000000..331eb6349 --- /dev/null +++ b/src/server/detail/compressor.cc @@ -0,0 +1,103 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/detail/compressor.h" + +#include +#include +#include + +#include "base/logging.h" + +ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); + +namespace dfly::detail { + +using namespace std; + +class ZstdCompressor : public CompressorImpl { + public: + ZstdCompressor() { + cctx_ = ZSTD_createCCtx(); + } + ~ZstdCompressor() { + ZSTD_freeCCtx(cctx_); + } + + io::Result Compress(io::Bytes data); + + private: + ZSTD_CCtx* cctx_; + base::PODArray compr_buf_; +}; + +io::Result ZstdCompressor::Compress(io::Bytes data) { + size_t buf_size = ZSTD_compressBound(data.size()); + if (compr_buf_.capacity() < buf_size) { + compr_buf_.reserve(buf_size); + } + size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), + data.data(), data.size(), compression_level_); + + if (ZSTD_isError(compressed_size)) { + LOG(ERROR) << "ZSTD_compressCCtx failed with error " << ZSTD_getErrorName(compressed_size); + return nonstd::make_unexpected(make_error_code(errc::operation_not_supported)); + } + compressed_size_total_ += compressed_size; + uncompressed_size_total_ += data.size(); + return io::Bytes(compr_buf_.data(), compressed_size); +} + +class Lz4Compressor : public CompressorImpl { + public: + Lz4Compressor() { + lz4_pref_.compressionLevel = compression_level_; + } + + ~Lz4Compressor() { + } + + // compress a string of data + io::Result Compress(io::Bytes data); + + private: + LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES; +}; + +io::Result Lz4Compressor::Compress(io::Bytes data) { + lz4_pref_.frameInfo.contentSize = data.size(); + size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_); + if (compr_buf_.capacity() < buf_size) { + compr_buf_.reserve(buf_size); + } + + size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(), + data.size(), &lz4_pref_); + if (LZ4F_isError(frame_size)) { + LOG(ERROR) << "LZ4F_compressFrame failed with error " << LZ4F_getErrorName(frame_size); + return nonstd::make_unexpected(make_error_code(errc::operation_not_supported)); + } + compressed_size_total_ += frame_size; + uncompressed_size_total_ += data.size(); + return io::Bytes(compr_buf_.data(), frame_size); +} + +CompressorImpl::CompressorImpl() { + compression_level_ = absl::GetFlag(FLAGS_compression_level); +} + +CompressorImpl::~CompressorImpl() { + VLOG(1) << "compressed size: " << compressed_size_total_; + VLOG(1) << "uncompressed size: " << uncompressed_size_total_; +} + +unique_ptr CompressorImpl::CreateZstd() { + return make_unique(); +} + +unique_ptr CompressorImpl::CreateLZ4() { + return make_unique(); +} + +} // namespace dfly::detail diff --git a/src/server/detail/compressor.h b/src/server/detail/compressor.h new file mode 100644 index 000000000..38fb8cca0 --- /dev/null +++ b/src/server/detail/compressor.h @@ -0,0 +1,30 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include "base/pod_array.h" +#include "io/io.h" + +namespace dfly::detail { + +class CompressorImpl { + public: + static std::unique_ptr CreateZstd(); + static std::unique_ptr CreateLZ4(); + + CompressorImpl(); + virtual ~CompressorImpl(); + virtual io::Result Compress(io::Bytes data) = 0; + + protected: + int compression_level_ = 1; + size_t compressed_size_total_ = 0; + size_t uncompressed_size_total_ = 0; + base::PODArray compr_buf_; +}; + +} // namespace dfly::detail diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 72f977c36..516b5d67f 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -7,8 +7,6 @@ #include #include #include -#include -#include #include @@ -50,7 +48,6 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ "set 1 for single entry lzf compression," "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); -ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression"); // TODO: to retire both flags in v1.27 (Jan 2025) ABSL_FLAG(bool, list_rdb_encode_v2, true, @@ -228,89 +225,6 @@ uint8_t RdbObjectType(const PrimeValue& pv) { return 0; /* avoid warning */ } -class CompressorImpl { - public: - CompressorImpl() { - compression_level_ = absl::GetFlag(FLAGS_compression_level); - } - virtual ~CompressorImpl() { - VLOG(1) << "compressed size: " << compressed_size_total_; - VLOG(1) << "uncompressed size: " << uncompressed_size_total_; - } - virtual io::Result Compress(io::Bytes data) = 0; - - protected: - int compression_level_ = 1; - size_t compressed_size_total_ = 0; - size_t uncompressed_size_total_ = 0; - base::PODArray compr_buf_; -}; - -class ZstdCompressor : public CompressorImpl { - public: - ZstdCompressor() { - cctx_ = ZSTD_createCCtx(); - } - ~ZstdCompressor() { - ZSTD_freeCCtx(cctx_); - } - - io::Result Compress(io::Bytes data); - - private: - ZSTD_CCtx* cctx_; - base::PODArray compr_buf_; -}; - -io::Result ZstdCompressor::Compress(io::Bytes data) { - size_t buf_size = ZSTD_compressBound(data.size()); - if (compr_buf_.capacity() < buf_size) { - compr_buf_.reserve(buf_size); - } - size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(), - data.data(), data.size(), compression_level_); - - if (ZSTD_isError(compressed_size)) { - return make_unexpected(error_code{int(compressed_size), generic_category()}); - } - compressed_size_total_ += compressed_size; - uncompressed_size_total_ += data.size(); - return io::Bytes(compr_buf_.data(), compressed_size); -} - -class Lz4Compressor : public CompressorImpl { - public: - Lz4Compressor() { - lz4_pref_.compressionLevel = compression_level_; - } - - ~Lz4Compressor() { - } - - // compress a string of data - io::Result Compress(io::Bytes data); - - private: - LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES; -}; - -io::Result Lz4Compressor::Compress(io::Bytes data) { - lz4_pref_.frameInfo.contentSize = data.size(); - size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_); - if (compr_buf_.capacity() < buf_size) { - compr_buf_.reserve(buf_size); - } - - size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(), - data.size(), &lz4_pref_); - if (LZ4F_isError(frame_size)) { - return make_unexpected(error_code{int(frame_size), generic_category()}); - } - compressed_size_total_ += frame_size; - uncompressed_size_total_ += data.size(); - return io::Bytes(compr_buf_.data(), frame_size); -} - SerializerBase::SerializerBase(CompressionMode compression_mode) : compression_mode_(compression_mode), mem_buf_{4_KB}, tmp_buf_(nullptr) { } @@ -1668,11 +1582,11 @@ void SerializerBase::AllocateCompressorOnce() { return; } if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD) { - compressor_impl_.reset(new ZstdCompressor()); + compressor_impl_ = detail::CompressorImpl::CreateZstd(); } else if (compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) { - compressor_impl_.reset(new Lz4Compressor()); + compressor_impl_ = detail::CompressorImpl::CreateLZ4(); } else { - CHECK(false) << "Compressor allocation should not be done"; + LOG(FATAL) << "Invalid compression mode " << unsigned(compression_mode_); } } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index bed9ebfdb..532557070 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -16,6 +16,7 @@ extern "C" { #include "io/io.h" #include "io/io_buf.h" #include "server/common.h" +#include "server/detail/compressor.h" #include "server/journal/serializer.h" #include "server/journal/types.h" #include "server/table.h" @@ -67,7 +68,8 @@ enum class SaveMode { RDB, // Save .rdb file. Expected to read all shards. }; -enum class CompressionMode { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 }; +enum class CompressionMode : uint8_t { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 }; + CompressionMode GetDefaultCompressionMode(); class RdbSaver { @@ -147,8 +149,6 @@ class RdbSaver { CompressionMode compression_mode_; }; -class CompressorImpl; - class SerializerBase { public: enum class FlushState { kFlushMidEntry, kFlushEndEntry }; @@ -196,7 +196,7 @@ class SerializerBase { CompressionMode compression_mode_; io::IoBuf mem_buf_; - std::unique_ptr compressor_impl_; + std::unique_ptr compressor_impl_; static constexpr size_t kMinStrSizeToCompress = 256; static constexpr double kMinCompressionReductionPrecentage = 0.95;