chore: factor out CompressorImpl into separate files (#4319)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-16 14:11:44 +02:00 committed by GitHub
parent 8237d8fa81
commit 03516c2752
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 141 additions and 93 deletions

View file

@ -50,6 +50,7 @@ add_library(dragonfly_lib bloom_family.cc
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
protocol_client.cc
snapshot.cc script_mgr.cc server_family.cc
detail/compressor.cc
detail/decompress.cc
detail/save_stages_controller.cc
detail/snapshot_storage.cc

View file

@ -0,0 +1,103 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/detail/compressor.h"
#include <absl/flags/flag.h>
#include <lz4frame.h>
#include <zstd.h>
#include "base/logging.h"
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
namespace dfly::detail {
using namespace std;
class ZstdCompressor : public CompressorImpl {
public:
ZstdCompressor() {
cctx_ = ZSTD_createCCtx();
}
~ZstdCompressor() {
ZSTD_freeCCtx(cctx_);
}
io::Result<io::Bytes> Compress(io::Bytes data);
private:
ZSTD_CCtx* cctx_;
base::PODArray<uint8_t> compr_buf_;
};
io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
size_t buf_size = ZSTD_compressBound(data.size());
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(),
data.data(), data.size(), compression_level_);
if (ZSTD_isError(compressed_size)) {
LOG(ERROR) << "ZSTD_compressCCtx failed with error " << ZSTD_getErrorName(compressed_size);
return nonstd::make_unexpected(make_error_code(errc::operation_not_supported));
}
compressed_size_total_ += compressed_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), compressed_size);
}
class Lz4Compressor : public CompressorImpl {
public:
Lz4Compressor() {
lz4_pref_.compressionLevel = compression_level_;
}
~Lz4Compressor() {
}
// compress a string of data
io::Result<io::Bytes> Compress(io::Bytes data);
private:
LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES;
};
io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
lz4_pref_.frameInfo.contentSize = data.size();
size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_);
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(),
data.size(), &lz4_pref_);
if (LZ4F_isError(frame_size)) {
LOG(ERROR) << "LZ4F_compressFrame failed with error " << LZ4F_getErrorName(frame_size);
return nonstd::make_unexpected(make_error_code(errc::operation_not_supported));
}
compressed_size_total_ += frame_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), frame_size);
}
CompressorImpl::CompressorImpl() {
compression_level_ = absl::GetFlag(FLAGS_compression_level);
}
CompressorImpl::~CompressorImpl() {
VLOG(1) << "compressed size: " << compressed_size_total_;
VLOG(1) << "uncompressed size: " << uncompressed_size_total_;
}
unique_ptr<CompressorImpl> CompressorImpl::CreateZstd() {
return make_unique<ZstdCompressor>();
}
unique_ptr<CompressorImpl> CompressorImpl::CreateLZ4() {
return make_unique<Lz4Compressor>();
}
} // namespace dfly::detail

View file

@ -0,0 +1,30 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <memory>
#include "base/pod_array.h"
#include "io/io.h"
namespace dfly::detail {
class CompressorImpl {
public:
static std::unique_ptr<CompressorImpl> CreateZstd();
static std::unique_ptr<CompressorImpl> CreateLZ4();
CompressorImpl();
virtual ~CompressorImpl();
virtual io::Result<io::Bytes> Compress(io::Bytes data) = 0;
protected:
int compression_level_ = 1;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
base::PODArray<uint8_t> compr_buf_;
};
} // namespace dfly::detail

View file

@ -7,8 +7,6 @@
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_format.h>
#include <lz4frame.h>
#include <zstd.h>
#include <queue>
@ -50,7 +48,6 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_
"set 1 for single entry lzf compression,"
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
// TODO: to retire both flags in v1.27 (Jan 2025)
ABSL_FLAG(bool, list_rdb_encode_v2, true,
@ -228,89 +225,6 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
return 0; /* avoid warning */
}
class CompressorImpl {
public:
CompressorImpl() {
compression_level_ = absl::GetFlag(FLAGS_compression_level);
}
virtual ~CompressorImpl() {
VLOG(1) << "compressed size: " << compressed_size_total_;
VLOG(1) << "uncompressed size: " << uncompressed_size_total_;
}
virtual io::Result<io::Bytes> Compress(io::Bytes data) = 0;
protected:
int compression_level_ = 1;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
base::PODArray<uint8_t> compr_buf_;
};
class ZstdCompressor : public CompressorImpl {
public:
ZstdCompressor() {
cctx_ = ZSTD_createCCtx();
}
~ZstdCompressor() {
ZSTD_freeCCtx(cctx_);
}
io::Result<io::Bytes> Compress(io::Bytes data);
private:
ZSTD_CCtx* cctx_;
base::PODArray<uint8_t> compr_buf_;
};
io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
size_t buf_size = ZSTD_compressBound(data.size());
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t compressed_size = ZSTD_compressCCtx(cctx_, compr_buf_.data(), compr_buf_.capacity(),
data.data(), data.size(), compression_level_);
if (ZSTD_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}
compressed_size_total_ += compressed_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), compressed_size);
}
class Lz4Compressor : public CompressorImpl {
public:
Lz4Compressor() {
lz4_pref_.compressionLevel = compression_level_;
}
~Lz4Compressor() {
}
// compress a string of data
io::Result<io::Bytes> Compress(io::Bytes data);
private:
LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES;
};
io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
lz4_pref_.frameInfo.contentSize = data.size();
size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_);
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(),
data.size(), &lz4_pref_);
if (LZ4F_isError(frame_size)) {
return make_unexpected(error_code{int(frame_size), generic_category()});
}
compressed_size_total_ += frame_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), frame_size);
}
SerializerBase::SerializerBase(CompressionMode compression_mode)
: compression_mode_(compression_mode), mem_buf_{4_KB}, tmp_buf_(nullptr) {
}
@ -1668,11 +1582,11 @@ void SerializerBase::AllocateCompressorOnce() {
return;
}
if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD) {
compressor_impl_.reset(new ZstdCompressor());
compressor_impl_ = detail::CompressorImpl::CreateZstd();
} else if (compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) {
compressor_impl_.reset(new Lz4Compressor());
compressor_impl_ = detail::CompressorImpl::CreateLZ4();
} else {
CHECK(false) << "Compressor allocation should not be done";
LOG(FATAL) << "Invalid compression mode " << unsigned(compression_mode_);
}
}

View file

@ -16,6 +16,7 @@ extern "C" {
#include "io/io.h"
#include "io/io_buf.h"
#include "server/common.h"
#include "server/detail/compressor.h"
#include "server/journal/serializer.h"
#include "server/journal/types.h"
#include "server/table.h"
@ -67,7 +68,8 @@ enum class SaveMode {
RDB, // Save .rdb file. Expected to read all shards.
};
enum class CompressionMode { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 };
enum class CompressionMode : uint8_t { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 };
CompressionMode GetDefaultCompressionMode();
class RdbSaver {
@ -147,8 +149,6 @@ class RdbSaver {
CompressionMode compression_mode_;
};
class CompressorImpl;
class SerializerBase {
public:
enum class FlushState { kFlushMidEntry, kFlushEndEntry };
@ -196,7 +196,7 @@ class SerializerBase {
CompressionMode compression_mode_;
io::IoBuf mem_buf_;
std::unique_ptr<CompressorImpl> compressor_impl_;
std::unique_ptr<detail::CompressorImpl> compressor_impl_;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;