Support lz4 compression (#545)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2022-12-08 09:41:55 +02:00 committed by GitHub
parent 2a67dc307e
commit ae9e3c45d9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 156 additions and 59 deletions

View file

@ -18,6 +18,7 @@ extern "C" {
}
#include <absl/cleanup/cleanup.h>
#include <absl/strings/str_cat.h>
#include <lz4frame.h>
#include <zstd.h>
#include "base/endian.h"
@ -205,12 +206,24 @@ bool resizeStringSet(robj* set, size_t size, bool use_set2) {
} // namespace
class ZstdDecompressImpl {
class DecompressImpl {
public:
ZstdDecompressImpl() : uncompressed_mem_buf_{16_KB} {
DecompressImpl() : uncompressed_mem_buf_{16_KB} {
}
~DecompressImpl() {
}
virtual io::Result<base::IoBuf*> Decompress(std::string_view str) = 0;
protected:
base::IoBuf uncompressed_mem_buf_;
};
class ZstdDecompress : public DecompressImpl {
public:
ZstdDecompress() {
dctx_ = ZSTD_createDCtx();
}
~ZstdDecompressImpl() {
~ZstdDecompress() {
ZSTD_freeDCtx(dctx_);
}
@ -218,10 +231,9 @@ class ZstdDecompressImpl {
private:
ZSTD_DCtx* dctx_;
base::IoBuf uncompressed_mem_buf_;
};
io::Result<base::IoBuf*> ZstdDecompressImpl::Decompress(std::string_view str) {
io::Result<base::IoBuf*> ZstdDecompress::Decompress(std::string_view str) {
// Prepare membuf memory to uncompressed string.
auto uncomp_size = ZSTD_getFrameContentSize(str.data(), str.size());
if (uncomp_size == ZSTD_CONTENTSIZE_UNKNOWN) {
@ -259,6 +271,87 @@ io::Result<base::IoBuf*> ZstdDecompressImpl::Decompress(std::string_view str) {
return &uncompressed_mem_buf_;
}
class Lz4Decompress : public DecompressImpl {
public:
Lz4Decompress() {
auto result = LZ4F_createDecompressionContext(&dctx_, LZ4F_VERSION);
CHECK(!LZ4F_isError(result));
}
~Lz4Decompress() {
auto result = LZ4F_freeDecompressionContext(dctx_);
CHECK(!LZ4F_isError(result));
}
io::Result<base::IoBuf*> Decompress(std::string_view str);
private:
LZ4F_dctx* dctx_;
};
io::Result<base::IoBuf*> Lz4Decompress::Decompress(std::string_view data) {
LZ4F_frameInfo_t frame_info;
size_t frame_size = data.size();
// Get content size from frame data
size_t consumed = frame_size; // The nb of bytes consumed from data will be written into consumed
size_t res = LZ4F_getFrameInfo(dctx_, &frame_info, data.data(), &consumed);
if (LZ4F_isError(res)) {
return make_unexpected(error_code{int(res), generic_category()});
}
if (frame_info.contentSize == 0) {
LOG(ERROR) << "Missing frame content size";
return Unexpected(errc::rdb_file_corrupted);
}
// reserve place for uncompressed data and end opcode
size_t reserve = frame_info.contentSize + 1;
uncompressed_mem_buf_.Reserve(reserve);
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer();
if (dest.size() < reserve) {
return Unexpected(errc::out_of_memory);
}
// Uncompress data to membuf
string_view src = data.substr(consumed);
size_t src_size = src.size();
size_t ret = 1;
while (ret != 0) {
IoBuf::Bytes dest = uncompressed_mem_buf_.AppendBuffer();
size_t dest_capacity = dest.size();
// It will read up to src_size bytes from src,
// and decompress data into dest, of capacity dest_capacity
// The nb of bytes consumed from src will be written into src_size
// The nb of bytes decompressed into dest will be written into dest_capacity
ret = LZ4F_decompress(dctx_, dest.data(), &dest_capacity, src.data(), &src_size, nullptr);
if (LZ4F_isError(ret)) {
return make_unexpected(error_code{int(ret), generic_category()});
}
consumed += src_size;
uncompressed_mem_buf_.CommitWrite(dest_capacity);
src = src.substr(src_size);
src_size = src.size();
}
if (consumed != frame_size) {
return Unexpected(errc::rdb_file_corrupted);
}
if (uncompressed_mem_buf_.InputLen() != frame_info.contentSize) {
return Unexpected(errc::rdb_file_corrupted);
}
// Add opcode of compressed blob end to membuf.
dest = uncompressed_mem_buf_.AppendBuffer();
if (dest.size() < 1) {
return Unexpected(errc::out_of_memory);
}
dest[0] = RDB_OPCODE_COMPRESSED_BLOB_END;
uncompressed_mem_buf_.CommitWrite(1);
return &uncompressed_mem_buf_;
}
class RdbLoaderBase::OpaqueObjLoader {
public:
OpaqueObjLoader(int rdb_type, PrimeValue* pv) : rdb_type_(rdb_type), pv_(pv) {
@ -1666,14 +1759,11 @@ error_code RdbLoader::Load(io::Source* src) {
return RdbError(errc::feature_not_supported);
}
if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) {
RETURN_ON_ERR(HandleCompressedBlob());
if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START ||
type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
RETURN_ON_ERR(HandleCompressedBlob(type));
continue;
}
if (type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
LOG(ERROR) << "LZ4 not supported yet";
return RdbError(errc::feature_not_supported);
}
if (type == RDB_OPCODE_COMPRESSED_BLOB_END) {
RETURN_ON_ERR(HandleCompressedBlobFinish());
@ -1794,11 +1884,21 @@ auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result<uint64_t> {
return res;
}
error_code RdbLoaderBase::HandleCompressedBlob() {
if (!zstd_decompress_) {
zstd_decompress_.reset(new ZstdDecompressImpl());
void RdbLoaderBase::AlocateDecompressOnce(int op_type) {
if (decompress_impl_) {
return;
}
if (op_type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) {
decompress_impl_.reset(new ZstdDecompress());
} else if (op_type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
decompress_impl_.reset(new Lz4Decompress());
} else {
CHECK(false) << "Decompressor allocation should not be done";
}
}
error_code RdbLoaderBase::HandleCompressedBlob(int op_type) {
AlocateDecompressOnce(op_type);
// Fetch uncompress blob
string res;
SET_OR_RETURN(FetchGenericString(), res);
@ -1807,7 +1907,7 @@ error_code RdbLoaderBase::HandleCompressedBlob() {
// Last type in the compressed blob is RDB_OPCODE_COMPRESSED_BLOB_END
// in which we will switch back to the origin membuf (HandleCompressedBlobFinish)
string_view uncompressed_blob;
SET_OR_RETURN(zstd_decompress_->Decompress(res), mem_buf_);
SET_OR_RETURN(decompress_impl_->Decompress(res), mem_buf_);
return kOk;
}

View file

@ -21,7 +21,7 @@ class EngineShardSet;
class ScriptMgr;
class CompactObj;
class ZstdDecompressImpl;
class DecompressImpl;
class RdbLoaderBase {
protected:
@ -127,8 +127,9 @@ class RdbLoaderBase {
::io::Result<OpaqueObj> ReadZSetZL();
::io::Result<OpaqueObj> ReadListQuicklist(int rdbtype);
::io::Result<OpaqueObj> ReadStreams();
std::error_code HandleCompressedBlob();
std::error_code HandleCompressedBlob(int op_type);
std::error_code HandleCompressedBlobFinish();
void AlocateDecompressOnce(int op_type);
static size_t StrLen(const RdbVariant& tset);
@ -144,7 +145,7 @@ class RdbLoaderBase {
size_t bytes_read_ = 0;
size_t source_limit_ = SIZE_MAX;
base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<ZstdDecompressImpl> zstd_decompress_;
std::unique_ptr<DecompressImpl> decompress_impl_;
};
class RdbLoader : protected RdbLoaderBase {

View file

@ -31,7 +31,7 @@ extern "C" {
#include "server/snapshot.h"
#include "util/fibers/simple_channel.h"
ABSL_FLAG(int, compression_mode, 2,
ABSL_FLAG(int, compression_mode, 3,
"set 0 for no compression,"
"set 1 for single entry lzf compression,"
"set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot,"
@ -190,7 +190,6 @@ class ZstdCompressor : public CompressorImpl {
public:
ZstdCompressor() {
cctx_ = ZSTD_createCCtx();
compression_level_ = absl::GetFlag(FLAGS_compression_level);
}
~ZstdCompressor() {
ZSTD_freeCCtx(cctx_);
@ -200,9 +199,6 @@ class ZstdCompressor : public CompressorImpl {
private:
ZSTD_CCtx* cctx_;
int compression_level_ = 1;
size_t compressed_size_total_ = 0;
size_t uncompressed_size_total_ = 0;
base::PODArray<uint8_t> compr_buf_;
};
@ -224,54 +220,35 @@ io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
class Lz4Compressor : public CompressorImpl {
public:
// create a compression context
Lz4Compressor() {
LZ4F_errorCode_t result = LZ4F_createCompressionContext(&ctx_, LZ4F_VERSION);
CHECK(!LZ4F_isError(result));
lz4_pref_.compressionLevel = compression_level_;
}
// destroy the compression context
~Lz4Compressor() {
LZ4F_freeCompressionContext(ctx_);
}
// compress a string of data
io::Result<io::Bytes> Compress(io::Bytes data);
private:
LZ4F_cctx* ctx_;
LZ4F_preferences_t lz4_pref_;
LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES;
};
io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
size_t buf_size = LZ4F_compressFrameBound(data.size(), NULL);
lz4_pref_.frameInfo.contentSize = data.size();
size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_);
if (compr_buf_.capacity() < buf_size) {
compr_buf_.reserve(buf_size);
}
// initialize the compression context
size_t compressed_size =
LZ4F_compressBegin(ctx_, compr_buf_.data(), compr_buf_.capacity(), &lz4_pref_);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}
// compress the data
compressed_size = LZ4F_compressUpdate(ctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(),
data.size(), NULL);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(),
data.size(), &lz4_pref_);
if (LZ4F_isError(frame_size)) {
return make_unexpected(error_code{int(frame_size), generic_category()});
}
// finish the compression process
compressed_size = LZ4F_compressEnd(ctx_, compr_buf_.data(), compr_buf_.capacity(), NULL);
if (LZ4F_isError(compressed_size)) {
return make_unexpected(error_code{int(compressed_size), generic_category()});
}
compressed_size_total_ += compressed_size;
compressed_size_total_ += frame_size;
uncompressed_size_total_ += data.size();
return io::Bytes(compr_buf_.data(), compressed_size);
return io::Bytes(compr_buf_.data(), frame_size);
}
RdbSerializer::RdbSerializer(CompressionMode compression_mode)
@ -283,6 +260,8 @@ RdbSerializer::~RdbSerializer() {
if (compression_stats_) {
VLOG(1) << "compression not effective: " << compression_stats_->compression_no_effective;
VLOG(1) << "small string none compression applied: " << compression_stats_->small_str_count;
VLOG(1) << "compression failed: " << compression_stats_->compression_failed;
VLOG(1) << "compressed blobs:" << compression_stats_->compressed_blobs;
}
}
@ -724,7 +703,8 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
DVLOG(2) << "FlushToSink " << sz << " bytes";
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) {
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD ||
compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
CompressBlob();
// After blob was compressed membuf was overwirten with compressed data
sz = mem_buf_.InputLen();
@ -1040,7 +1020,6 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) {
producer_count = 1;
if (compression_mode == 3) {
compression_mode_ = CompressionMode::MULTY_ENTRY_LZ4;
LOG(ERROR) << "Lz4 compression not supported yet";
} else if (compression_mode == 2) {
compression_mode_ = CompressionMode::MULTY_ENTRY_ZSTD;
} else if (compression_mode == 1) {
@ -1167,6 +1146,19 @@ void RdbSaver::Cancel() {
impl_->Cancel();
}
void RdbSerializer::AllocateCompressorOnce() {
if (compressor_impl_) {
return;
}
if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) {
compressor_impl_.reset(new ZstdCompressor());
} else if (compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) {
compressor_impl_.reset(new Lz4Compressor());
} else {
CHECK(false) << "Compressor allocation should not be done";
}
}
void RdbSerializer::CompressBlob() {
if (!compression_stats_) {
compression_stats_.emplace();
@ -1178,11 +1170,8 @@ void RdbSerializer::CompressBlob() {
return;
}
AllocateCompressorOnce();
// Compress the data
if (!compressor_impl_) {
compressor_impl_.reset(new ZstdCompressor()); // TODO(Adi): add support for lz4
}
auto ec = compressor_impl_->Compress(blob_to_compress);
if (!ec) {
++compression_stats_->compression_failed;
@ -1200,7 +1189,10 @@ void RdbSerializer::CompressBlob() {
// First write opcode for compressed string
auto dest = mem_buf_.AppendBuffer();
dest[0] = RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START; // TODO(Adi): add support for lz4
uint8_t opcode = compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD
? RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START
: RDB_OPCODE_COMPRESSED_LZ4_BLOB_START;
dest[0] = opcode;
mem_buf_.CommitWrite(1);
// Write encoded compressed blob len
@ -1212,6 +1204,7 @@ void RdbSerializer::CompressBlob() {
dest = mem_buf_.AppendBuffer();
memcpy(dest.data(), compressed_blob.data(), compressed_blob.length());
mem_buf_.CommitWrite(compressed_blob.length());
++compression_stats_->compressed_blobs;
}
} // namespace dfly

View file

@ -160,6 +160,7 @@ class RdbSerializer {
std::error_code SaveStreamConsumers(streamCG* cg);
// If membuf data is compressable use compression impl to compress the data and write it to membuf
void CompressBlob();
void AllocateCompressorOnce();
std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_;
@ -175,6 +176,7 @@ class RdbSerializer {
uint32_t compression_no_effective = 0;
uint32_t small_str_count = 0;
uint32_t compression_failed = 0;
uint32_t compressed_blobs = 0;
};
std::optional<CompressionStats> compression_stats_;
};

View file

@ -148,7 +148,7 @@ TEST_F(RdbTest, Stream) {
TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
Run({"debug", "populate", "500000"});
for (int i = 0; i <= 2; ++i) {
for (int i = 0; i <= 3; ++i) {
SetFlag(&FLAGS_compression_mode, i);
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
@ -156,6 +156,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info->file_name});
ASSERT_EQ(resp, "OK");
ASSERT_EQ(500000, CheckedInt({"dbsize"}));
}
}