From d9886024d3f12b0bf5616a231ecefebe8f5bf8d3 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Tue, 2 Jan 2024 15:48:50 +0200 Subject: [PATCH] refactor(rdb): Expose default compression mode without direct flag (#2360) * refactor(rdb): Expose default compression mode without direct flag * fixes --- src/server/generic_family.cc | 10 +++--- src/server/rdb_save.cc | 69 ++++++++++++++++++++++++++---------- src/server/rdb_save.h | 3 +- src/server/rdb_test.cc | 10 +++--- 4 files changed, 64 insertions(+), 28 deletions(-) diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 7e1e61489..58ece2435 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -31,7 +31,6 @@ extern "C" { ABSL_FLAG(uint32_t, dbnum, 16, "Number of databases"); ABSL_FLAG(uint32_t, keys_output_limit, 8192, "Maximum number of keys output by keys command"); -ABSL_DECLARE_FLAG(int, compression_mode); namespace dfly { using namespace std; @@ -450,10 +449,11 @@ OpResult OpDump(const OpArgs& op_args, string_view key) { if (IsValid(it)) { DVLOG(1) << "Dump: key '" << key << "' successfully found, going to dump it"; io::StringSink sink; - int compression_mode = absl::GetFlag(FLAGS_compression_mode); - CompressionMode serializer_compression_mode = - compression_mode == 0 ? CompressionMode::NONE : CompressionMode::SINGLE_ENTRY; - RdbSerializer serializer(serializer_compression_mode); + CompressionMode compression_mode = GetDefaultCompressionMode(); + if (compression_mode != CompressionMode::NONE) { + compression_mode = CompressionMode::SINGLE_ENTRY; + } + RdbSerializer serializer(compression_mode); // According to Redis code we need to // 1. Save the value itself - without the key diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index a6fe8f49e..9ca4338b5 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -39,7 +39,7 @@ extern "C" { #include "server/snapshot.h" #include "util/fibers/simple_channel.h" -ABSL_FLAG(int, compression_mode, 3, +ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ENTRY_LZ4, "set 0 for no compression," "set 1 for single entry lzf compression," "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," @@ -109,6 +109,47 @@ constexpr size_t kAmask = 4_KB - 1; } // namespace +bool AbslParseFlag(std::string_view in, dfly::CompressionMode* flag, std::string* err) { + if (in == "0" || in == "NONE") { + *flag = dfly::CompressionMode::NONE; + return true; + } + if (in == "1" || in == "SINGLE_ENTRY") { + *flag = dfly::CompressionMode::SINGLE_ENTRY; + return true; + } + if (in == "2" || in == "MULTI_ENTRY_ZSTD") { + *flag = dfly::CompressionMode::MULTI_ENTRY_ZSTD; + return true; + } + if (in == "3" || in == "MULTI_ENTRY_LZ4") { + *flag = dfly::CompressionMode::MULTI_ENTRY_LZ4; + return true; + } + + *err = absl::StrCat("Unknown value ", in, " for compression_mode flag"); + return false; +} + +std::string AbslUnparseFlag(dfly::CompressionMode flag) { + switch (flag) { + case dfly::CompressionMode::NONE: + return "NONE"; + case dfly::CompressionMode::SINGLE_ENTRY: + return "SINGLE_ENTRY"; + case dfly::CompressionMode::MULTI_ENTRY_ZSTD: + return "MULTI_ENTRY_ZSTD"; + case dfly::CompressionMode::MULTI_ENTRY_LZ4: + return "MULTI_ENTRY_LZ4"; + } + DCHECK(false) << "Unknown compression_mode flag value " << int(flag); + return "NONE"; +} + +dfly::CompressionMode GetDefaultCompressionMode() { + return absl::GetFlag(FLAGS_compression_mode); +} + uint8_t RdbObjectType(const PrimeValue& pv) { unsigned type = pv.ObjType(); unsigned compact_enc = pv.Encoding(); @@ -761,8 +802,8 @@ io::Bytes RdbSerializer::PrepareFlush() { if (sz == 0) return mem_buf_.InputBuffer(); - if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD || - compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { + if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD || + compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) { CompressBlob(); } @@ -1208,12 +1249,12 @@ unique_ptr& RdbSaver::Impl::GetSnapshot(EngineShard* shard) { RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) { CHECK_NOTNULL(sink); - int compression_mode = absl::GetFlag(FLAGS_compression_mode); + CompressionMode compression_mode = GetDefaultCompressionMode(); int producer_count = 0; switch (save_mode) { case SaveMode::SUMMARY: producer_count = 0; - if (compression_mode >= 1) { + if (compression_mode >= CompressionMode::SINGLE_ENTRY) { compression_mode_ = CompressionMode::SINGLE_ENTRY; } else { compression_mode_ = CompressionMode::NONE; @@ -1222,19 +1263,11 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) { case SaveMode::SINGLE_SHARD: case SaveMode::SINGLE_SHARD_WITH_SUMMARY: producer_count = 1; - if (compression_mode == 3) { - compression_mode_ = CompressionMode::MULTY_ENTRY_LZ4; - } else if (compression_mode == 2) { - compression_mode_ = CompressionMode::MULTY_ENTRY_ZSTD; - } else if (compression_mode == 1) { - compression_mode_ = CompressionMode::SINGLE_ENTRY; - } else { - compression_mode_ = CompressionMode::NONE; - } + compression_mode_ = compression_mode; break; case SaveMode::RDB: producer_count = shard_set->size(); - if (compression_mode >= 1) { + if (compression_mode >= CompressionMode::SINGLE_ENTRY) { compression_mode_ = CompressionMode::SINGLE_ENTRY; } else { compression_mode_ = CompressionMode::NONE; @@ -1374,9 +1407,9 @@ void RdbSerializer::AllocateCompressorOnce() { if (compressor_impl_) { return; } - if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD) { + if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD) { compressor_impl_.reset(new ZstdCompressor()); - } else if (compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { + } else if (compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) { compressor_impl_.reset(new Lz4Compressor()); } else { CHECK(false) << "Compressor allocation should not be done"; @@ -1413,7 +1446,7 @@ void RdbSerializer::CompressBlob() { // First write opcode for compressed string auto dest = mem_buf_.AppendBuffer(); - uint8_t opcode = compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD + uint8_t opcode = compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD ? RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START : RDB_OPCODE_COMPRESSED_LZ4_BLOB_START; dest[0] = opcode; diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 507e923b9..202684cf1 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -66,7 +66,8 @@ enum class SaveMode { RDB, // Save .rdb file. Expected to read all shards. }; -enum class CompressionMode { NONE, SINGLE_ENTRY, MULTY_ENTRY_ZSTD, MULTY_ENTRY_LZ4 }; +enum class CompressionMode { NONE, SINGLE_ENTRY, MULTI_ENTRY_ZSTD, MULTI_ENTRY_LZ4 }; +CompressionMode GetDefaultCompressionMode(); class RdbSaver { public: diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index cc6c7259a..6755a7649 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -20,6 +20,7 @@ extern "C" { #include "io/file.h" #include "server/engine_shard_set.h" #include "server/rdb_load.h" +#include "server/rdb_save.h" #include "server/test_utils.h" using namespace testing; @@ -31,7 +32,7 @@ using absl::StrCat; ABSL_DECLARE_FLAG(int32, list_compress_depth); ABSL_DECLARE_FLAG(int32, list_max_listpack_size); -ABSL_DECLARE_FLAG(int, compression_mode); +ABSL_DECLARE_FLAG(dfly::CompressionMode, compression_mode); namespace dfly { @@ -158,8 +159,9 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { auto resp = Run({"keys", "key:[5-9][0-9][0-9][0-9][0-9]*"}); EXPECT_EQ(resp.GetVec().size(), 0); - for (int i = 0; i <= 3; ++i) { - SetFlag(&FLAGS_compression_mode, i); + for (auto mode : {CompressionMode::NONE, CompressionMode::SINGLE_ENTRY, + CompressionMode::MULTI_ENTRY_ZSTD, CompressionMode::MULTI_ENTRY_LZ4}) { + SetFlag(&FLAGS_compression_mode, mode); RespExpr resp = Run({"save", "df"}); ASSERT_EQ(resp, "OK"); @@ -171,7 +173,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { } TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) { - SetFlag(&FLAGS_compression_mode, 2); + SetFlag(&FLAGS_compression_mode, CompressionMode::MULTI_ENTRY_ZSTD); for (int i = 0; i < 1000; ++i) { Run({"set", StrCat(i), "1"}); }