fix(server): Added RDB load / save for DenseSet (#297)

Signed-off-by: Braydn <braydn.moore@uwaterloo.ca>
This commit is contained in:
Braydn 2022-09-14 13:09:32 -04:00 committed by GitHub
parent 231ef3d367
commit 7cc761134b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 17 deletions

View file

@ -23,6 +23,7 @@ extern "C" {
#include "base/endian.h" #include "base/endian.h"
#include "base/flags.h" #include "base/flags.h"
#include "base/logging.h" #include "base/logging.h"
#include "core/string_set.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/hset_family.h" #include "server/hset_family.h"
@ -34,6 +35,7 @@ extern "C" {
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size); ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
ABSL_DECLARE_FLAG(int32_t, list_compress_depth); ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
ABSL_DECLARE_FLAG(uint32_t, dbnum); ABSL_DECLARE_FLAG(uint32_t, dbnum);
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly { namespace dfly {
@ -173,6 +175,15 @@ int ziplistPairsConvertAndValidateIntegrity(const uint8_t* zl, size_t size, unsi
return ret; return ret;
} }
bool resizeStringSet(robj* set, size_t size, bool use_set2) {
if (use_set2) {
((dfly::StringSet*)set->ptr)->Reserve(size);
return true;
} else {
return dictTryExpand((dict*)set->ptr, size) != DICT_OK;
}
}
} // namespace } // namespace
class RdbLoader::OpaqueObjLoader { class RdbLoader::OpaqueObjLoader {
@ -295,27 +306,48 @@ void RdbLoader::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
} }
} }
} else { } else {
res = createSetObject(); bool use_set2 = GetFlag(FLAGS_use_set2);
if (use_set2) {
StringSet* set = new StringSet{CompactObj::memory_resource()};
res = createObject(OBJ_SET, set);
res->encoding = OBJ_ENCODING_HT;
} else {
res = createSetObject();
}
// TODO: to move this logic to set_family similarly to ConvertToStrSet. // TODO: to move this logic to set_family similarly to ConvertToStrSet.
/* It's faster to expand the dict to the right size asap in order /* It's faster to expand the dict to the right size asap in order
* to avoid rehashing */ * to avoid rehashing */
if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { if (len > DICT_HT_INITIAL_SIZE && !resizeStringSet(res, len, use_set2)) {
LOG(ERROR) << "OOM in dictTryExpand " << len; LOG(ERROR) << "OOM in dictTryExpand " << len;
ec_ = RdbError(errc::out_of_memory); ec_ = RdbError(errc::out_of_memory);
return; return;
} }
for (size_t i = 0; i < len; i++) { if (use_set2) {
sdsele = ToSds(ltrace->arr[i].rdb_var); for (size_t i = 0; i < len; i++) {
if (!sdsele) sdsele = ToSds(ltrace->arr[i].rdb_var);
return; if (!sdsele)
return;
if (dictAdd((dict*)res->ptr, sdsele, NULL) != DICT_OK) { if (!((StringSet*)res->ptr)->AddSds(sdsele)) {
LOG(ERROR) << "Duplicate set members detected"; LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key); ec_ = RdbError(errc::duplicate_key);
return; return;
}
}
} else {
for (size_t i = 0; i < len; i++) {
sdsele = ToSds(ltrace->arr[i].rdb_var);
if (!sdsele)
return;
if (dictAdd((dict*)res->ptr, sdsele, NULL) != DICT_OK) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
}
} }
} }
} }

View file

@ -2,6 +2,7 @@
// See LICENSE for licensing terms. // See LICENSE for licensing terms.
// //
#include "core/string_set.h"
#include "server/rdb_save.h" #include "server/rdb_save.h"
#include <absl/cleanup/cleanup.h> #include <absl/cleanup/cleanup.h>
@ -126,7 +127,7 @@ uint8_t RdbObjectType(unsigned type, unsigned encoding) {
case OBJ_SET: case OBJ_SET:
if (encoding == kEncodingIntSet) if (encoding == kEncodingIntSet)
return RDB_TYPE_SET_INTSET; return RDB_TYPE_SET_INTSET;
else if (encoding == kEncodingStrMap) else if (encoding == kEncodingStrMap || encoding == kEncodingStrMap2)
return RDB_TYPE_SET; return RDB_TYPE_SET;
break; break;
case OBJ_ZSET: case OBJ_ZSET:
@ -307,6 +308,14 @@ error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) {
while ((de = dictNext(di)) != NULL) { while ((de = dictNext(di)) != NULL) {
sds ele = (sds)de->key; sds ele = (sds)de->key;
RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)}));
}
} else if (obj.Encoding() == kEncodingStrMap2) {
StringSet *set = (StringSet*)obj.RObjPtr();
RETURN_ON_ERR(SaveLen(set->Size()));
for (sds ele : *set) {
RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)})); RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)}));
} }
} else { } else {

View file

@ -30,7 +30,6 @@ using absl::StrCat;
ABSL_DECLARE_FLAG(int32, list_compress_depth); ABSL_DECLARE_FLAG(int32, list_compress_depth);
ABSL_DECLARE_FLAG(int32, list_max_listpack_size); ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
ABSL_DECLARE_FLAG(bool, use_set2);
namespace dfly { namespace dfly {
@ -38,11 +37,6 @@ class RdbTest : public BaseFamilyTest {
protected: protected:
protected: protected:
io::FileSource GetSource(string name); io::FileSource GetSource(string name);
// disable usage of DenseSet until the RDB patches are added
RdbTest() : BaseFamilyTest() {
SetFlag(&FLAGS_use_set2, false);
}
}; };
inline const uint8_t* to_byte(const void* s) { inline const uint8_t* to_byte(const void* s) {