feat(rdb save): Create compressor interface (#538)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2022-12-07 10:12:18 +02:00 committed by GitHub
parent 9aa9a78ea5
commit 90bc3ab8b8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 52 additions and 35 deletions

View file

@ -10,5 +10,6 @@
// to notify that it finished streaming static data and is ready
// to switch to the stable state replication phase.
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_START = 201;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 202;
const uint8_t RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START = 201;
const uint8_t RDB_OPCODE_COMPRESSED_LZ4_BLOB_START = 202;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 203;

View file

@ -223,7 +223,7 @@ class ZstdDecompressImpl {
io::Result<base::IoBuf*> ZstdDecompressImpl::Decompress(std::string_view str) {
// Prepare membuf memory to uncompressed string.
unsigned long long const uncomp_size = ZSTD_getFrameContentSize(str.data(), str.size());
auto uncomp_size = ZSTD_getFrameContentSize(str.data(), str.size());
if (uncomp_size == ZSTD_CONTENTSIZE_UNKNOWN) {
LOG(ERROR) << "Zstd compression missing frame content size";
return Unexpected(errc::invalid_encoding);
@ -1666,10 +1666,15 @@ error_code RdbLoader::Load(io::Source* src) {
return RdbError(errc::feature_not_supported);
}
if (type == RDB_OPCODE_COMPRESSED_BLOB_START) {
if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) {
RETURN_ON_ERR(HandleCompressedBlob());
continue;
}
if (type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
LOG(ERROR) << "LZ4 not supported yet";
return RdbError(errc::feature_not_supported);
}
if (type == RDB_OPCODE_COMPRESSED_BLOB_END) {
RETURN_ON_ERR(HandleCompressedBlobFinish());
continue;

View file

@ -34,8 +34,9 @@ extern "C" {
ABSL_FLAG(int, compression_mode, 2,
"set 0 for no compression,"
"set 1 for single entry lzf compression,"
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot");
ABSL_FLAG(int, zstd_compression_level, 2, "Compression level to use on zstd compression");
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
"set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot");
ABSL_FLAG(int, compression_level, 2, "The compression level to use on zstd/lz4 compression");
namespace dfly {
@ -167,17 +168,32 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
return 0; /* avoid warning */
}
class ZstdCompressor {
class CompressorImpl {
public:
CompressorImpl() {
compression_level_ = absl::GetFlag(FLAGS_compression_level);
}
~CompressorImpl() {
VLOG(1) << "compressed size: " << compressed_size_total_;
VLOG(1) << "uncompressed size: " << uncompressed_size_total_;
}
virtual io::Result<io::Bytes> Compress(io::Bytes data) = 0;
protected:
int compression_level_ = 1;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
base::PODArray<uint8_t> compr_buf_;
};
class ZstdCompressor : public CompressorImpl {
public:
ZstdCompressor() {
cctx_ = ZSTD_createCCtx();
compression_level_ = absl::GetFlag(FLAGS_zstd_compression_level);
compression_level_ = absl::GetFlag(FLAGS_compression_level);
}
~ZstdCompressor() {
ZSTD_freeCCtx(cctx_);
VLOG(1) << "zstd compressed size: " << compressed_size_total_;
VLOG(1) << "zstd uncompressed size: " << uncompressed_size_total_;
}
io::Result<io::Bytes> Compress(io::Bytes data);
@ -206,21 +222,18 @@ io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
return io::Bytes(compr_buf_.data(), compressed_size);
}
class Lz4Compressor {
class Lz4Compressor : public CompressorImpl {
public:
// create a compression context
Lz4Compressor() {
LZ4F_errorCode_t result = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
if (LZ4F_isError(result)) {
// TODO this can fail on memory allocation. What should we do in this case?
}
CHECK(!LZ4F_isError(result));
lz4_pref_.compressionLevel = compression_level_;
}
// destroy the compression context
~Lz4Compressor() {
LZ4F_freeCompressionContext(ctx_);
VLOG(1) << "lz4 compressed size: " << compressed_size_total_;
VLOG(1) << "lz4 uncompressed size: " << uncompressed_size_total_;
}
// compress a string of data
@ -228,9 +241,7 @@ class Lz4Compressor {
private:
LZ4F_cctx* ctx_;
base::PODArray<uint8_t> compr_buf_;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
LZ4F_preferences_t lz4_pref_;
};
io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
@ -239,7 +250,8 @@ io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
compr_buf_.reserve(buf_size);
}
// initialize the compression context
size_t compressed_size = LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL);
size_t compressed_size =
LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), &lz4_pref_);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}
@ -712,7 +724,7 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
DVLOG(2) << "FlushToSink " << sz << " bytes";
if (compression_mode_ == CompressionMode::MULTY_ENTRY) {
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) {
CompressBlob();
// After blob was compressed membuf was overwirten with compressed data
sz = mem_buf_.InputLen();
@ -1018,7 +1030,7 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
switch (save_mode) {
case SaveMode::SUMMARY:
producer_count = 0;
if (compression_mode == 1 || compression_mode == 2) {
if (compression_mode >= 1) {
compression_mode_ = CompressionMode::SINGLE_ENTRY;
} else {
compression_mode_ = CompressionMode::NONE;
@ -1026,8 +1038,11 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
break;
case SaveMode::SINGLE_SHARD:
producer_count = 1;
if (compression_mode == 2) {
compression_mode_ = CompressionMode::MULTY_ENTRY;
if (compression_mode == 3) {
compression_mode_ = CompressionMode::MULTY_ENTRY_LZ4;
LOG(ERROR) << "Lz4 compression not supported yet";
} else if (compression_mode == 2) {
compression_mode_ = CompressionMode::MULTY_ENTRY_ZSTD;
} else if (compression_mode == 1) {
compression_mode_ = CompressionMode::SINGLE_ENTRY;
} else {
@ -1036,7 +1051,7 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
break;
case SaveMode::RDB:
producer_count = shard_set->size();
if (compression_mode == 1 || compression_mode == 2) {
if (compression_mode >= 1) {
compression_mode_ = CompressionMode::SINGLE_ENTRY;
} else {
compression_mode_ = CompressionMode::NONE;
@ -1165,7 +1180,7 @@ void RdbSerializer::CompressBlob() {
// Compress the data
if (!compressor_impl_) {
compressor_impl_.reset(new ZstdCompressor());
compressor_impl_.reset(new ZstdCompressor()); // TODO(Adi): add support for lz4
}
auto ec = compressor_impl_->Compress(blob_to_compress);
@ -1185,7 +1200,7 @@ void RdbSerializer::CompressBlob() {
// First write opcode for compressed string
auto dest = mem_buf_.AppendBuffer();
dest[0] = RDB_OPCODE_COMPRESSED_BLOB_START;
dest[0] = RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START; // TODO(Adi): add support for lz4
mem_buf_.CommitWrite(1);
// Write encoded compressed blob len

View file

@ -61,11 +61,7 @@ enum class SaveMode {
RDB, // Save .rdb file. Expected to read all shards.
};
enum class CompressionMode {
NONE,
SINGLE_ENTRY,
MULTY_ENTRY,
};
enum class CompressionMode { NONE, SINGLE_ENTRY, MULTY_ENTRY_ZSTD, MULTY_ENTRY_LZ4 };
class RdbSaver {
public:
@ -112,7 +108,7 @@ class RdbSaver {
CompressionMode compression_mode_;
};
class ZstdCompressor;
class CompressorImpl;
class RdbSerializer {
public:
@ -171,7 +167,7 @@ class RdbSerializer {
std::string tmp_str_;
CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.
std::unique_ptr<ZstdCompressor> compressor_impl_;
std::unique_ptr<CompressorImpl> compressor_impl_;
static constexpr size_t kMinStrSizeToCompress = 256;
static constexpr double kMinCompressionReductionPrecentage = 0.95;