fix: bugs in rdb code (#1079)

1. Pull newest helio that fixes 32-bit overflow in IoBuf.
2. Allow responsiveness when loading huge sets and maps.
3. Break array of blobs into segments of limited size so that
   we won't allocate billion byte arrays when handling large (h)sets.
   It reduces pressure on allocator when loading millions of items.

Fixes #1076.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Co-authored-by: Roy Jacobson <roy@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-04-13 13:26:58 +03:00 committed by GitHub
parent fc66dbb2cf
commit fcb1c7bd55
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 211 additions and 141 deletions

2
helio

@ -1 +1 @@
Subproject commit 7322b320c15baf6ac504d897d0a7f5b440510248
Subproject commit deccfbcff1b2aa4ede0d5d89f4cce9a1094388b9

View file

@ -57,6 +57,16 @@ using absl::GetFlag;
using rdb::errc;
namespace {
constexpr size_t kYieldPeriod = 50000;
constexpr size_t kMaxBlobLen = 1ULL << 16;
inline void YieldIfNeeded(size_t i) {
if (i % kYieldPeriod == 0) {
ThisFiber::Yield();
}
}
class error_category : public std::error_category {
public:
const char* name() const noexcept final {
@ -376,6 +386,18 @@ class RdbLoaderBase::OpaqueObjLoader {
sds ToSds(const RdbVariant& obj);
string_view ToSV(const RdbVariant& obj);
template <typename F> static void Iterate(const LoadTrace& ltrace, F&& f) {
unsigned cnt = 0;
for (const auto& seg : ltrace.arr) {
for (const auto& blob : seg) {
if (!f(blob)) {
return;
}
YieldIfNeeded(++cnt);
}
}
}
std::error_code ec_;
int rdb_type_;
base::PODArray<char> tset_blob_;
@ -434,19 +456,19 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const JsonType& json) {
}
void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->arr.size();
size_t len = ltrace->blob_count();
bool is_intset = true;
if (len <= SetFamily::MaxIntsetEntries()) {
for (size_t i = 0; i < len; i++) {
if (!holds_alternative<long long>(ltrace->arr[i].rdb_var)) {
Iterate(*ltrace, [&](const LoadBlob& blob) {
if (!holds_alternative<long long>(blob.rdb_var)) {
is_intset = false;
break;
return false;
}
}
return true;
});
} else {
/* Use a regular set when there are too many entries. */
is_intset = false;
}
@ -462,16 +484,17 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
if (is_intset) {
res = createIntsetObject();
long long llval;
for (size_t i = 0; i < len; i++) {
llval = get<long long>(ltrace->arr[i].rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
llval = get<long long>(blob.rdb_var);
uint8_t success;
res->ptr = intsetAdd((intset*)res->ptr, llval, &success);
if (!success) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
return false;
}
}
return true;
});
} else {
bool use_set2 = GetFlag(FLAGS_use_set2);
if (use_set2) {
@ -493,65 +516,73 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
}
if (use_set2) {
for (size_t i = 0; i < len; i++) {
sdsele = ToSds(ltrace->arr[i].rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
sdsele = ToSds(blob.rdb_var);
if (!sdsele)
return;
return false;
if (!((StringSet*)res->ptr)->AddSds(sdsele)) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
return false;
}
}
return true;
});
} else {
for (size_t i = 0; i < len; i++) {
sdsele = ToSds(ltrace->arr[i].rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
sdsele = ToSds(blob.rdb_var);
if (!sdsele)
return;
return false;
if (dictAdd((dict*)res->ptr, sdsele, NULL) != DICT_OK) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
return false;
}
}
return true;
});
}
}
if (ec_)
return;
pv_->ImportRObj(res);
std::move(cleanup).Cancel();
}
void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
size_t len = ltrace->arr.size() / 2;
size_t len = ltrace->blob_count() / 2;
/* Too many entries? Use a hash table right from the start. */
bool keep_lp = (len <= server.hash_max_listpack_entries);
size_t lp_size = 0;
if (keep_lp) {
for (const auto& str : ltrace->arr) {
size_t str_len = StrLen(str.rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
size_t str_len = StrLen(blob.rdb_var);
lp_size += str_len;
if (str_len > server.hash_max_listpack_value) {
keep_lp = false;
break;
return false;
}
}
return true;
});
}
if (keep_lp) {
uint8_t* lp = lpNew(lp_size);
for (size_t i = 0; i < len; ++i) {
/* Add pair to listpack */
string_view sv = ToSV(ltrace->arr[i * 2].rdb_var);
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
for (const auto& seg : ltrace->arr) {
CHECK(seg.size() % 2 == 0);
for (size_t i = 0; i < seg.size(); i += 2) {
/* Add pair to listpack */
string_view sv = ToSV(seg[i].rdb_var);
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
sv = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
sv = ToSV(seg[i + 1].rdb_var);
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
}
}
if (ec_) {
@ -566,19 +597,21 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
auto cleanup = absl::MakeCleanup([&] { delete string_map; });
string_map->Reserve(len);
for (size_t i = 0; i < len; ++i) {
// ToSV may reference an internal buffer, therefore we can use only before the
// next call to ToSV. To workaround, I copy the key to string.
string key(ToSV(ltrace->arr[i * 2].rdb_var));
string_view val = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += 2) {
// ToSV may reference an internal buffer, therefore we can use only before the
// next call to ToSV. To workaround, I copy the key to string.
string key(ToSV(seg[i].rdb_var));
string_view val = ToSV(seg[i + 1].rdb_var);
if (ec_)
return;
if (ec_)
return;
if (!string_map->AddOrSkip(key, val)) {
LOG(ERROR) << "Duplicate hash fields detected";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
if (!string_map->AddOrSkip(key, val)) {
LOG(ERROR) << "Duplicate hash fields detected";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
}
}
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
@ -591,16 +624,16 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
auto cleanup = absl::Cleanup([&] { quicklistRelease(ql); });
for (size_t i = 0; i < ltrace->arr.size(); ++i) {
unsigned container = ltrace->arr[i].encoding;
string_view sv = ToSV(ltrace->arr[i].rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
unsigned container = blob.encoding;
string_view sv = ToSV(blob.rdb_var);
if (ec_)
return;
return false;
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
quicklistAppendPlainNode(ql, (uint8_t*)sv.data(), sv.size());
continue;
return true;
}
uint8_t* lp = nullptr;
@ -610,15 +643,15 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
if (!lpValidateIntegrity(src, sv.size(), 0, NULL, NULL)) {
LOG(ERROR) << "Listpack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
return false;
}
if (lpLength(src) == 0) {
continue;
return true;
}
lp = (uint8_t*)zmalloc(sv.size());
memcpy(lp, src, sv.size());
::memcpy(lp, src, sv.size());
} else {
lp = lpNew(sv.size());
if (!ziplistValidateIntegrity((uint8_t*)sv.data(), sv.size(), 1,
@ -626,21 +659,24 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
LOG(ERROR) << "Ziplist integrity check failed.";
zfree(lp);
ec_ = RdbError(errc::rdb_file_corrupted);
return;
return false;
}
/* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */
if (lpLength(lp) == 0) {
zfree(lp);
continue;
return true;
}
lp = lpShrinkToFit(lp);
}
quicklistAppendListpack(ql, lp);
}
return true;
});
if (ec_)
return;
if (quicklistCount(ql) == 0) {
ec_ = RdbError(errc::empty_key);
return;
@ -659,7 +695,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
auto cleanup = absl::Cleanup([&] { decrRefCount(res); });
size_t zsetlen = ltrace->arr.size();
size_t zsetlen = ltrace->blob_count();
if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
@ -668,12 +704,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
size_t maxelelen = 0, totelelen = 0;
for (size_t i = 0; i < zsetlen; ++i) {
sds sdsele = ToSds(ltrace->arr[i].rdb_var);
Iterate(*ltrace, [&](const LoadBlob& blob) {
sds sdsele = ToSds(blob.rdb_var);
if (!sdsele)
return;
return false;
double score = ltrace->arr[i].score;
double score = blob.score;
/* Don't care about integer-encoded strings. */
if (sdslen(sdsele) > maxelelen)
@ -686,9 +722,14 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
LOG(ERROR) << "Duplicate zset fields detected";
sdsfree(sdsele);
ec_ = RdbError(errc::rdb_file_corrupted);
return;
return false;
}
}
return true;
});
if (ec_)
return;
/* Convert *after* loading, since sorted sets are not stored ordered. */
if (zsetLength(res) <= server.zset_max_listpack_entries &&
@ -709,41 +750,40 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
auto cleanup = absl::Cleanup([&] { decrRefCount(res); });
size_t lpcnt = ltrace->arr.size() / 2;
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += 2) {
string_view nodekey = ToSV(seg[i].rdb_var);
string_view data = ToSV(seg[i + 1].rdb_var);
for (size_t i = 0; i < lpcnt; ++i) {
string_view nodekey = ToSV(ltrace->arr[i * 2].rdb_var);
string_view data = ToSV(ltrace->arr[i * 2 + 1].rdb_var);
uint8_t* lp = (uint8_t*)data.data();
uint8_t* lp = (uint8_t*)data.data();
if (!streamValidateListpackIntegrity(lp, data.size(), 0)) {
LOG(ERROR) << "Stream listpack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
unsigned char* first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
LOG(ERROR) << "Empty listpack inside stream";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
uint8_t* copy_lp = (uint8_t*)zmalloc(data.size());
memcpy(copy_lp, lp, data.size());
/* Insert the key in the radix tree. */
int retval =
raxTryInsert(s->rax_tree, (unsigned char*)nodekey.data(), nodekey.size(), copy_lp, NULL);
if (!retval) {
zfree(copy_lp);
LOG(ERROR) << "Listpack re-added with existing key";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
if (!streamValidateListpackIntegrity(lp, data.size(), 0)) {
LOG(ERROR) << "Stream listpack integrity check failed.";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
unsigned char* first = lpFirst(lp);
if (first == NULL) {
/* Serialized listpacks should never be empty, since on
* deletion we should remove the radix tree key if the
* resulting listpack is empty. */
LOG(ERROR) << "Empty listpack inside stream";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
uint8_t* copy_lp = (uint8_t*)zmalloc(data.size());
::memcpy(copy_lp, lp, data.size());
/* Insert the key in the radix tree. */
int retval =
raxTryInsert(s->rax_tree, (unsigned char*)nodekey.data(), nodekey.size(), copy_lp, NULL);
if (!retval) {
zfree(copy_lp);
LOG(ERROR) << "Listpack re-added with existing key";
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
}
}
s->length = ltrace->stream_trace->stream_len;
s->last_id.ms = ltrace->stream_trace->ms;
s->last_id.seq = ltrace->stream_trace->seq;
@ -844,7 +884,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
}
} else {
intset* mine = (intset*)zmalloc(blob.size());
memcpy(mine, blob.data(), blob.size());
::memcpy(mine, blob.data(), blob.size());
res = createObject(OBJ_SET, mine);
res->encoding = OBJ_ENCODING_INTSET;
}
@ -1312,16 +1352,20 @@ auto RdbLoaderBase::ReadSet() -> io::Result<OpaqueObj> {
if (len == 0)
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> res(new LoadTrace);
res->arr.resize(len);
for (size_t i = 0; i < len; i++) {
error_code ec = ReadStringObj(&res->arr[i].rdb_var);
if (ec) {
return make_unexpected(ec);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); i++) {
size_t n = std::min(len - i * kMaxBlobLen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; j++) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec) {
return make_unexpected(ec);
}
}
}
return OpaqueObj{std::move(res), RDB_TYPE_SET};
return OpaqueObj{std::move(load_trace), RDB_TYPE_SET};
}
auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
@ -1368,12 +1412,18 @@ auto RdbLoaderBase::ReadHMap() -> io::Result<OpaqueObj> {
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(len * 2);
len *= 2;
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
if (ec)
return make_unexpected(ec);
size_t n = std::min(len, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
}
len -= n;
}
return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH};
@ -1388,25 +1438,30 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(zsetlen);
load_trace->arr.resize((zsetlen + kMaxBlobLen - 1) / kMaxBlobLen);
double score;
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
size_t n = std::min(zsetlen, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
error_code ec = ReadStringObj(&load_trace->arr[i][j].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
SET_OR_UNEXPECT(FetchDouble(), score);
}
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
if (isnan(score)) {
LOG(ERROR) << "Zset with NAN score detected";
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].score = score;
}
load_trace->arr[i].score = score;
zsetlen -= n;
}
return OpaqueObj{std::move(load_trace), rdbtype};
@ -1433,30 +1488,35 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(len);
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < len; ++i) {
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), container);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min(len, kMaxBlobLen);
load_trace->arr[i].resize(n);
for (size_t j = 0; j < n; ++j) {
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
SET_OR_UNEXPECT(LoadLen(nullptr), container);
if (container != QUICKLIST_NODE_CONTAINER_PACKED &&
container != QUICKLIST_NODE_CONTAINER_PLAIN) {
LOG(ERROR) << "Quicklist integrity check failed.";
if (container != QUICKLIST_NODE_CONTAINER_PACKED &&
container != QUICKLIST_NODE_CONTAINER_PLAIN) {
LOG(ERROR) << "Quicklist integrity check failed.";
return Unexpected(errc::rdb_file_corrupted);
}
}
RdbVariant var;
error_code ec = ReadStringObj(&var);
if (ec)
return make_unexpected(ec);
if (StrLen(var) == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i][j].rdb_var = std::move(var);
load_trace->arr[i][j].encoding = container;
}
RdbVariant var;
error_code ec = ReadStringObj(&var);
if (ec)
return make_unexpected(ec);
if (StrLen(var) == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[i].rdb_var = std::move(var);
load_trace->arr[i].encoding = container;
len -= n;
}
return OpaqueObj{std::move(load_trace), rdbtype};
@ -1467,7 +1527,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
SET_OR_UNEXPECT(LoadLen(nullptr), listpacks);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(listpacks * 2);
load_trace->arr.emplace_back();
auto& blob_arr = load_trace->arr.back();
blob_arr.resize(listpacks * 2);
error_code ec;
for (size_t i = 0; i < listpacks; ++i) {
@ -1492,8 +1554,8 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
return Unexpected(errc::rdb_file_corrupted);
}
load_trace->arr[2 * i].rdb_var = std::move(stream_id);
load_trace->arr[2 * i + 1].rdb_var = std::move(blob);
blob_arr[2 * i].rdb_var = std::move(stream_id);
blob_arr[2 * i + 1].rdb_var = std::move(blob);
}
load_trace->stream_trace.reset(new StreamTrace);

View file

@ -85,8 +85,17 @@ class RdbLoaderBase {
};
struct LoadTrace {
std::vector<LoadBlob> arr;
// Some traces are very long. We divide them into multiple segments.
std::vector<std::vector<LoadBlob>> arr;
std::unique_ptr<StreamTrace> stream_trace;
size_t blob_count() const {
size_t count = 0;
for (const auto& seg : arr) {
count += seg.size();
}
return count;
}
};
class OpaqueObjLoader;
@ -136,7 +145,6 @@ class RdbLoaderBase {
std::error_code EnsureReadInternal(size_t min_sz);
protected:
base::IoBuf* mem_buf_ = nullptr;
base::IoBuf origin_mem_buf_;
::io::Source* src_ = nullptr;