mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix(rdb_load): fix partial reads dropping elements (#3853)
This commit is contained in:
parent
a066579930
commit
44fbe09108
2 changed files with 16 additions and 13 deletions
|
@ -504,7 +504,8 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
|
|||
size_t len = ltrace->blob_count();
|
||||
|
||||
bool is_intset = true;
|
||||
if (rdb_type_ == RDB_TYPE_HASH && ltrace->blob_count() <= SetFamily::MaxIntsetEntries()) {
|
||||
if (!config_.streamed && rdb_type_ == RDB_TYPE_HASH &&
|
||||
ltrace->blob_count() <= SetFamily::MaxIntsetEntries()) {
|
||||
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
||||
if (!holds_alternative<long long>(blob.rdb_var)) {
|
||||
is_intset = false;
|
||||
|
@ -513,7 +514,8 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
|
|||
return true;
|
||||
});
|
||||
} else {
|
||||
/* Use a regular set when there are too many entries. */
|
||||
/* Use a regular set when there are too many entries, or when the
|
||||
* set is being streamed. */
|
||||
is_intset = false;
|
||||
}
|
||||
|
||||
|
@ -550,9 +552,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
|
|||
} else {
|
||||
StringSet* set;
|
||||
if (config_.append) {
|
||||
// Note we only use append_ when the set size exceeds kMaxBlobLen,
|
||||
// which is greater than SetFamily::MaxIntsetEntries so we'll always use
|
||||
// a string set not an int set.
|
||||
// Note we always use StringSet when the object is being streamed.
|
||||
if (!EnsureObjEncoding(OBJ_SET, kEncodingStrMap2)) {
|
||||
return;
|
||||
}
|
||||
|
@ -619,7 +619,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|||
size_t len = ltrace->blob_count() / increment;
|
||||
|
||||
/* Too many entries? Use a hash table right from the start. */
|
||||
bool keep_lp = (len <= 64) && (rdb_type_ != RDB_TYPE_HASH_WITH_EXPIRY);
|
||||
bool keep_lp = !config_.streamed && (len <= 64) && (rdb_type_ != RDB_TYPE_HASH_WITH_EXPIRY);
|
||||
|
||||
size_t lp_size = 0;
|
||||
if (keep_lp) {
|
||||
|
@ -660,9 +660,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|||
} else {
|
||||
StringMap* string_map;
|
||||
if (config_.append) {
|
||||
// Note we only use append_ when the map size exceeds kMaxBlobLen,
|
||||
// which is greater than 64 so we'll always use a StringMap set not
|
||||
// listpack.
|
||||
// Note we always use StringMap when the object is being streamed.
|
||||
if (!EnsureObjEncoding(OBJ_HASH, kEncodingStrMap2)) {
|
||||
return;
|
||||
}
|
||||
|
@ -815,9 +813,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
|||
unsigned encoding = OBJ_ENCODING_SKIPLIST;
|
||||
detail::SortedMap* zs;
|
||||
if (config_.append) {
|
||||
// Note we only use append_ when the set size exceeds kMaxBlobLen,
|
||||
// which is greater than server.zset_max_listpack_entries so we'll always
|
||||
// use a SortedMap set not listpack.
|
||||
// Note we always use SortedMap when the object is being streamed.
|
||||
if (!EnsureObjEncoding(OBJ_ZSET, OBJ_ENCODING_SKIPLIST)) {
|
||||
return;
|
||||
}
|
||||
|
@ -868,7 +864,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
|||
return;
|
||||
|
||||
void* inner = zs;
|
||||
if (zs->Size() <= server.zset_max_listpack_entries &&
|
||||
if (!config_.streamed && zs->Size() <= server.zset_max_listpack_entries &&
|
||||
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
|
||||
encoding = OBJ_ENCODING_LISTPACK;
|
||||
inner = zs->ToListPack();
|
||||
|
@ -2723,6 +2719,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
|
|||
std::string key;
|
||||
SET_OR_RETURN(ReadKey(), key);
|
||||
|
||||
bool streamed = false;
|
||||
do {
|
||||
// If there is a cached Item in the free pool, take it, otherwise allocate
|
||||
// a new Item (LoadItemsBuffer returns free items).
|
||||
|
@ -2749,11 +2746,13 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
|
|||
|
||||
if (pending_read_.remaining > 0) {
|
||||
item->key = key;
|
||||
streamed = true;
|
||||
} else {
|
||||
// Avoid copying the key if this is the last read of the object.
|
||||
item->key = std::move(key);
|
||||
}
|
||||
|
||||
item->load_config.streamed = streamed;
|
||||
item->load_config.reserve = pending_read_.reserve;
|
||||
// Clear 'reserve' as we must only set when the object is first
|
||||
// initialized.
|
||||
|
|
|
@ -128,6 +128,10 @@ class RdbLoaderBase {
|
|||
};
|
||||
|
||||
struct LoadConfig {
|
||||
// Whether the loaded item is being streamed incrementally in partial
|
||||
// reads.
|
||||
bool streamed = false;
|
||||
|
||||
// Number of elements in the object to reserve.
|
||||
//
|
||||
// Used to reserve the elements in a huge object up front, then append
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue