mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: avoid on stack allocation of lz4 compression context (#4322)
Fixes #4245 Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1fa9a47a86
commit
28848d0be2
4 changed files with 43 additions and 10 deletions
|
@ -84,7 +84,7 @@ add_third_party(
|
|||
|
||||
add_third_party(
|
||||
lz4
|
||||
URL https://github.com/lz4/lz4/archive/refs/tags/v1.9.4.tar.gz
|
||||
URL https://github.com/lz4/lz4/archive/refs/tags/v1.10.0.tar.gz
|
||||
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND echo skip
|
||||
|
|
|
@ -52,32 +52,47 @@ io::Result<io::Bytes> ZstdCompressor::Compress(io::Bytes data) {
|
|||
class Lz4Compressor : public CompressorImpl {
|
||||
public:
|
||||
Lz4Compressor() {
|
||||
lz4_pref_.compressionLevel = compression_level_;
|
||||
LZ4F_errorCode_t code = LZ4F_createCompressionContext(&cctx_, LZ4F_VERSION);
|
||||
CHECK(!LZ4F_isError(code));
|
||||
}
|
||||
|
||||
~Lz4Compressor() {
|
||||
LZ4F_errorCode_t code = LZ4F_freeCompressionContext(cctx_);
|
||||
CHECK(!LZ4F_isError(code));
|
||||
}
|
||||
|
||||
// compress a string of data
|
||||
io::Result<io::Bytes> Compress(io::Bytes data);
|
||||
|
||||
private:
|
||||
LZ4F_preferences_t lz4_pref_ = LZ4F_INIT_PREFERENCES;
|
||||
LZ4F_cctx* cctx_;
|
||||
};
|
||||
|
||||
io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
|
||||
lz4_pref_.frameInfo.contentSize = data.size();
|
||||
size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref_);
|
||||
LZ4F_preferences_t lz4_pref = LZ4F_INIT_PREFERENCES;
|
||||
lz4_pref.compressionLevel = compression_level_;
|
||||
lz4_pref.frameInfo.contentSize = data.size();
|
||||
|
||||
size_t buf_size = LZ4F_compressFrameBound(data.size(), &lz4_pref);
|
||||
if (compr_buf_.capacity() < buf_size) {
|
||||
compr_buf_.reserve(buf_size);
|
||||
}
|
||||
|
||||
// TODO: to remove LZ4F_compressFrame code once we confirm this code actually works.
|
||||
#if 1
|
||||
size_t frame_size =
|
||||
LZ4F_compressFrame_usingCDict(cctx_, compr_buf_.data(), compr_buf_.capacity(), data.data(),
|
||||
data.size(), nullptr /* dict */, &lz4_pref);
|
||||
#else
|
||||
size_t frame_size = LZ4F_compressFrame(compr_buf_.data(), compr_buf_.capacity(), data.data(),
|
||||
data.size(), &lz4_pref_);
|
||||
data.size(), &lz4_pref);
|
||||
#endif
|
||||
|
||||
if (LZ4F_isError(frame_size)) {
|
||||
LOG(ERROR) << "LZ4F_compressFrame failed with error " << LZ4F_getErrorName(frame_size);
|
||||
return nonstd::make_unexpected(make_error_code(errc::operation_not_supported));
|
||||
}
|
||||
|
||||
compressed_size_total_ += frame_size;
|
||||
uncompressed_size_total_ += data.size();
|
||||
return io::Bytes(compr_buf_.data(), frame_size);
|
||||
|
|
|
@ -1604,13 +1604,20 @@ void SerializerBase::CompressBlob() {
|
|||
}
|
||||
|
||||
AllocateCompressorOnce();
|
||||
// Compress the data
|
||||
auto ec = compressor_impl_->Compress(blob_to_compress);
|
||||
if (!ec) {
|
||||
|
||||
// Compress the data. We copy compressed data once into the internal buffer of compressor_impl_
|
||||
// and then we copy it again into the mem_buf_.
|
||||
//
|
||||
// TODO: it is possible to avoid double copying here by changing the compressor interface,
|
||||
// so that the compressor will accept the output buffer and return the final size. This requires
|
||||
// exposing the additional compress bound interface as well.
|
||||
io::Result<io::Bytes> res = compressor_impl_->Compress(blob_to_compress);
|
||||
if (!res) {
|
||||
++compression_stats_->compression_failed;
|
||||
return;
|
||||
}
|
||||
Bytes compressed_blob = *ec;
|
||||
|
||||
Bytes compressed_blob = *res;
|
||||
if (compressed_blob.length() > blob_size * kMinCompressionReductionPrecentage) {
|
||||
++compression_stats_->compression_no_effective;
|
||||
return;
|
||||
|
|
|
@ -42,6 +42,14 @@ class RdbTest : public BaseFamilyTest {
|
|||
protected:
|
||||
void SetUp();
|
||||
|
||||
static void SetUpTestSuite() {
|
||||
static bool init = true;
|
||||
if (exchange(init, false)) {
|
||||
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, 32_KB);
|
||||
}
|
||||
BaseFamilyTest::SetUpTestSuite();
|
||||
}
|
||||
|
||||
io::FileSource GetSource(string name);
|
||||
|
||||
std::error_code LoadRdb(const string& filename) {
|
||||
|
@ -56,6 +64,7 @@ class RdbTest : public BaseFamilyTest {
|
|||
|
||||
void RdbTest::SetUp() {
|
||||
InitWithDbFilename();
|
||||
CHECK_EQ(zmalloc_used_memory_tl, 0);
|
||||
max_memory_limit = 40000000;
|
||||
}
|
||||
|
||||
|
@ -457,6 +466,8 @@ TEST_F(RdbTest, JsonTest) {
|
|||
class HllRdbTest : public RdbTest, public testing::WithParamInterface<string> {};
|
||||
|
||||
TEST_P(HllRdbTest, Hll) {
|
||||
LOG(INFO) << " max memory: " << max_memory_limit
|
||||
<< " used_mem_current: " << used_mem_current.load();
|
||||
auto ec = LoadRdb("hll.rdb");
|
||||
|
||||
ASSERT_FALSE(ec) << ec.message();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue