chore: remove ImportRObj routine and reduce reliance on object.c (#2607)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-18 10:07:00 +02:00 committed by GitHub
parent 6aafe3dc5e
commit 1ba59e9179
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 101 additions and 201 deletions

View file

@ -205,11 +205,6 @@ int ziplistPairsConvertAndValidateIntegrity(const uint8_t* zl, size_t size, unsi
return ret;
}
bool resizeStringSet(robj* set, size_t size) {
((dfly::StringSet*)set->ptr)->Reserve(size);
return true;
}
} // namespace
class DecompressImpl {
@ -363,10 +358,6 @@ class RdbLoaderBase::OpaqueObjLoader {
OpaqueObjLoader(int rdb_type, PrimeValue* pv) : rdb_type_(rdb_type), pv_(pv) {
}
void operator()(robj* o) {
pv_->ImportRObj(o);
}
void operator()(long long val) {
pv_->SetInt(val);
}
@ -475,22 +466,27 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
is_intset = false;
}
robj* res = nullptr;
sds sdsele = nullptr;
void* inner_obj = nullptr;
auto cleanup = absl::MakeCleanup([&] {
if (sdsele)
sdsfree(sdsele);
decrRefCount(res);
if (is_intset) {
zfree(inner_obj);
} else {
CompactObj::DeleteMR<StringSet>(inner_obj);
}
});
if (is_intset) {
res = createIntsetObject();
inner_obj = intsetNew();
long long llval;
Iterate(*ltrace, [&](const LoadBlob& blob) {
llval = get<long long>(blob.rdb_var);
uint8_t success;
res->ptr = intsetAdd((intset*)res->ptr, llval, &success);
inner_obj = intsetAdd((intset*)inner_obj, llval, &success);
if (!success) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
@ -499,57 +495,47 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
return true;
});
} else {
if (true) {
StringSet* set = CompactObj::AllocateMR<StringSet>();
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
res = createObject(OBJ_SET, set);
res->encoding = OBJ_ENCODING_HT;
}
StringSet* set = CompactObj::AllocateMR<StringSet>();
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
inner_obj = set;
// TODO: to move this logic to set_family similarly to ConvertToStrSet.
/* It's faster to expand the dict to the right size asap in order
* to avoid rehashing */
if (len > 2 && !resizeStringSet(res, len)) {
LOG(ERROR) << "OOM in dictTryExpand " << len;
ec_ = RdbError(errc::out_of_memory);
return;
set->Reserve(len);
size_t increment = 1;
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
increment = 2;
}
if (true) {
size_t increment = 1;
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
increment = 2;
}
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += increment) {
string_view element = ToSV(seg[i].rdb_var);
auto set = (StringSet*)res->ptr;
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += increment) {
string_view element = ToSV(seg[i].rdb_var);
uint32_t ttl_sec = UINT32_MAX;
if (increment == 2) {
int64_t ttl_time = -1;
string_view ttl_str = ToSV(seg[i + 1].rdb_var);
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
LOG(ERROR) << "Can't parse set TTL " << ttl_str;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
if (ttl_time != -1) {
if (ttl_time < set->time_now()) {
continue;
}
ttl_sec = ttl_time - set->time_now();
}
}
if (!set->Add(element, ttl_sec)) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
uint32_t ttl_sec = UINT32_MAX;
if (increment == 2) {
int64_t ttl_time = -1;
string_view ttl_str = ToSV(seg[i + 1].rdb_var);
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
LOG(ERROR) << "Can't parse set TTL " << ttl_str;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
if (ttl_time != -1) {
if (ttl_time < set->time_now()) {
continue;
}
ttl_sec = ttl_time - set->time_now();
}
}
if (!set->Add(element, ttl_sec)) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
}
}
}
@ -557,7 +543,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
if (ec_)
return;
pv_->ImportRObj(res);
pv_->InitRobj(OBJ_SET, is_intset ? kEncodingIntSet : kEncodingStrMap2, inner_obj);
std::move(cleanup).Cancel();
}
@ -719,11 +705,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
return;
}
robj* res = createObject(OBJ_LIST, ql);
res->encoding = OBJ_ENCODING_QUICKLIST;
std::move(cleanup).Cancel();
pv_->ImportRObj(res);
pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
}
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
@ -781,10 +765,9 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
CHECK(ltrace->stream_trace);
robj* res = createStreamObject();
stream* s = (stream*)res->ptr;
stream* s = streamNew();
auto cleanup = absl::Cleanup([&] { decrRefCount(res); });
auto cleanup = absl::Cleanup([&] { freeStream(s); });
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += 2) {
@ -890,7 +873,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
}
std::move(cleanup).Cancel();
pv_->ImportRObj(res);
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
}
void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
@ -909,23 +892,21 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
const intset* is = (const intset*)blob.data();
unsigned len = intsetLen(is);
robj* res = nullptr;
if (len > SetFamily::MaxIntsetEntries()) {
res = createSetObject();
if (!SetFamily::ConvertToStrSet(is, len, res)) {
StringSet* set = SetFamily::ConvertToStrSet(is, len);
if (!set) {
LOG(ERROR) << "OOM in ConvertToStrSet " << len;
decrRefCount(res);
ec_ = RdbError(errc::out_of_memory);
return;
}
pv_->InitRobj(OBJ_SET, kEncodingStrMap2, set);
} else {
intset* mine = (intset*)zmalloc(blob.size());
::memcpy(mine, blob.data(), blob.size());
res = createObject(OBJ_SET, mine);
res->encoding = OBJ_ENCODING_INTSET;
pv_->InitRobj(OBJ_SET, kEncodingIntSet, mine);
}
pv_->ImportRObj(res);
} else if (rdb_type_ == RDB_TYPE_SET_LISTPACK) {
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
LOG(ERROR) << "ListPack integrity check failed.";
@ -933,33 +914,24 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
return;
}
unsigned char* lp = (unsigned char*)blob.data();
auto iterate_and_apply_f = [lp](auto f) {
for (unsigned char* cur = lpFirst(lp); cur != nullptr; cur = lpNext(lp, cur)) {
unsigned int slen = 0;
long long lval = 0;
unsigned char* res = lpGetValue(cur, &slen, &lval);
f(res, slen, lval);
StringSet* set = CompactObj::AllocateMR<StringSet>();
for (unsigned char* cur = lpFirst(lp); cur != nullptr; cur = lpNext(lp, cur)) {
unsigned int slen = 0;
long long lval = 0;
unsigned char* res = lpGetValue(cur, &slen, &lval);
sds sdsele = res ? sdsnewlen(res, slen) : sdsfromlonglong(lval);
if (!set->AddSds(sdsele)) {
LOG(ERROR) << "Duplicate member " << sdsele;
sdsfree(sdsele);
ec_ = RdbError(errc::duplicate_key);
break;
}
};
robj* res = nullptr;
if (true) {
StringSet* set = CompactObj::AllocateMR<StringSet>();
res = createObject(OBJ_SET, set);
res->encoding = OBJ_ENCODING_HT;
auto f = [this, res](unsigned char* val, unsigned int slen, long long lval) {
sds sdsele = (val) ? sdsnewlen(val, slen) : sdsfromlonglong(lval);
if (!((StringSet*)res->ptr)->AddSds(sdsele)) {
LOG(ERROR) << "Error adding to member set2";
ec_ = RdbError(errc::duplicate_key);
}
};
iterate_and_apply_f(f);
}
if (ec_) {
decrRefCount(res);
CompactObj::DeleteMR<StringSet>(set);
return;
}
pv_->ImportRObj(res);
pv_->InitRobj(OBJ_SET, kEncodingStrMap2, set);
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST || rdb_type_ == RDB_TYPE_HASH_LISTPACK) {
unsigned char* lp = lpNew(blob.size());
switch (rdb_type_) {