mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
bug(rdb loader): When reading from zstd uncompressed buf skip ensure … (#525)
* bug(rdb loader): When reading from zstd uncompressed buf skip ensure read flow Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
74d1839f97
commit
c7974a4e80
3 changed files with 38 additions and 13 deletions
|
@ -38,12 +38,14 @@ ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
|
||||||
ABSL_DECLARE_FLAG(uint32_t, dbnum);
|
ABSL_DECLARE_FLAG(uint32_t, dbnum);
|
||||||
ABSL_DECLARE_FLAG(bool, use_set2);
|
ABSL_DECLARE_FLAG(bool, use_set2);
|
||||||
|
|
||||||
#define SET_OR_RETURN(expr, dest) \
|
#define SET_OR_RETURN(expr, dest) \
|
||||||
do { \
|
do { \
|
||||||
auto exp_val = (expr); \
|
auto exp_val = (expr); \
|
||||||
if (!exp_val) \
|
if (!exp_val) { \
|
||||||
return exp_val.error(); \
|
VLOG(1) << "Error while calling " #expr; \
|
||||||
dest = exp_val.value(); \
|
return exp_val.error(); \
|
||||||
|
} \
|
||||||
|
dest = exp_val.value(); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define SET_OR_UNEXPECT(expr, dest) \
|
#define SET_OR_UNEXPECT(expr, dest) \
|
||||||
|
@ -230,6 +232,7 @@ io::Result<base::IoBuf*> ZstdDecompressImpl::Decompress(std::string_view str) {
|
||||||
LOG(ERROR) << "Invalid ZSTD compressed string";
|
LOG(ERROR) << "Invalid ZSTD compressed string";
|
||||||
return Unexpected(errc::invalid_encoding);
|
return Unexpected(errc::invalid_encoding);
|
||||||
}
|
}
|
||||||
|
|
||||||
uncompressed_mem_buf_.Reserve(uncomp_size + 1);
|
uncompressed_mem_buf_.Reserve(uncomp_size + 1);
|
||||||
|
|
||||||
// Uncompress string to membuf
|
// Uncompress string to membuf
|
||||||
|
@ -1705,6 +1708,19 @@ error_code RdbLoader::Load(io::Source* src) {
|
||||||
return kOk;
|
return kOk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) {
|
||||||
|
// In the flow of reading compressed data, we store the uncompressed data to in uncompressed
|
||||||
|
// buffer. When parsing entries we call ensure read with 9 bytes to read the length of key/value.
|
||||||
|
// If the key/value is very small (less than 9 bytes) the remainded data in uncompressed buffer
|
||||||
|
// might contain less than 9 bytes. We need to make sure that we dont read from sink to the
|
||||||
|
// uncompressed buffer and therefor in this flow we return here.
|
||||||
|
if (mem_buf_ != &origin_mem_buf_)
|
||||||
|
return std::error_code{};
|
||||||
|
if (mem_buf_->InputLen() >= min_sz)
|
||||||
|
return std::error_code{};
|
||||||
|
return EnsureReadInternal(min_sz);
|
||||||
|
}
|
||||||
|
|
||||||
error_code RdbLoaderBase::EnsureReadInternal(size_t min_sz) {
|
error_code RdbLoaderBase::EnsureReadInternal(size_t min_sz) {
|
||||||
DCHECK_LT(mem_buf_->InputLen(), min_sz);
|
DCHECK_LT(mem_buf_->InputLen(), min_sz);
|
||||||
|
|
||||||
|
@ -1792,7 +1808,8 @@ error_code RdbLoaderBase::HandleCompressedBlob() {
|
||||||
}
|
}
|
||||||
|
|
||||||
error_code RdbLoaderBase::HandleCompressedBlobFinish() {
|
error_code RdbLoaderBase::HandleCompressedBlobFinish() {
|
||||||
// TODO validate that all uncompressed data was fetched
|
CHECK_NE(&origin_mem_buf_, mem_buf_);
|
||||||
|
CHECK_EQ(mem_buf_->InputLen(), size_t(0));
|
||||||
mem_buf_ = &origin_mem_buf_;
|
mem_buf_ = &origin_mem_buf_;
|
||||||
return kOk;
|
return kOk;
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,12 +132,7 @@ class RdbLoaderBase {
|
||||||
|
|
||||||
static size_t StrLen(const RdbVariant& tset);
|
static size_t StrLen(const RdbVariant& tset);
|
||||||
|
|
||||||
std::error_code EnsureRead(size_t min_sz) {
|
std::error_code EnsureRead(size_t min_sz);
|
||||||
if (mem_buf_->InputLen() >= min_sz)
|
|
||||||
return std::error_code{};
|
|
||||||
|
|
||||||
return EnsureReadInternal(min_sz);
|
|
||||||
}
|
|
||||||
|
|
||||||
std::error_code EnsureReadInternal(size_t min_sz);
|
std::error_code EnsureReadInternal(size_t min_sz);
|
||||||
|
|
||||||
|
|
|
@ -159,6 +159,19 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) {
|
||||||
|
SetFlag(&FLAGS_compression_mode, 2);
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
Run({"set", StrCat(i), "1"});
|
||||||
|
}
|
||||||
|
RespExpr resp = Run({"save", "df"});
|
||||||
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
|
resp = Run({"debug", "load", save_info->file_name});
|
||||||
|
ASSERT_EQ(resp, "OK");
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(RdbTest, Reload) {
|
TEST_F(RdbTest, Reload) {
|
||||||
absl::FlagSaver fs;
|
absl::FlagSaver fs;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue