chore: introduce sorted_map (#1558)

Consolidate skiplist based zset functionality into a dedicated class
called SortedMap. The code is adapted from t_zset.c
The old code in t_zset.c is deleted.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-07-18 09:56:45 +03:00 committed by GitHub
parent 6d2fcba168
commit e5be30cc79
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 666 additions and 835 deletions

View file

@ -28,6 +28,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "core/sorted_map.h"
#include "core/string_map.h"
#include "core/string_set.h"
#include "server/engine_shard_set.h"
@ -69,76 +70,6 @@ inline void YieldIfNeeded(size_t i) {
}
}
// taken from zset.c
unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) {
unsigned char* sptr;
char scorebuf[128];
int scorelen;
scorelen = d2string(scorebuf, sizeof(scorebuf), score);
if (eptr == NULL) {
zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele));
zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr);
/* Insert score after the member. */
zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL);
}
return zl;
}
// taken from zset.c
uint8_t* ToListPack(const zskiplist* zsl) {
uint8_t* lp = lpNew(0);
/* Approach similar to zslFree(), since we want to free the skiplist at
* the same time as creating the listpack. */
zskiplistNode* node = zsl->header->level[0].forward;
while (node) {
lp = zzlInsertAt(lp, NULL, node->ele, node->score);
node = node->level[0].forward;
}
return lp;
}
// taken from zsetConvert
zset* FromListPack(const uint8_t* lp) {
uint8_t* zl = (uint8_t*)lp;
unsigned char *eptr, *sptr;
unsigned char* vstr;
unsigned int vlen;
long long vlong;
sds ele;
eptr = lpSeek(zl, 0);
if (eptr != NULL) {
sptr = lpNext(zl, eptr);
CHECK(sptr != NULL);
}
zset* zs = zsetCreate();
while (eptr != NULL) {
double score = zzlGetScore(sptr);
vstr = lpGetValue(eptr, &vlen, &vlong);
if (vstr == NULL)
ele = sdsfromlonglong(vlong);
else
ele = sdsnewlen((char*)vstr, vlen);
zskiplistNode* node = zslInsert(zs->zsl, score, ele);
CHECK_EQ(DICT_OK, dictAdd(zs->dict, ele, &node->score));
zzlNext(zl, &eptr, &sptr);
}
return zs;
}
class error_category : public std::error_category {
public:
const char* name() const noexcept final {
@ -763,11 +694,12 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
}
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
zset* zs = zsetCreate();
auto cleanup = absl::Cleanup([&] { zsetFree(zs); });
size_t zsetlen = ltrace->blob_count();
if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) {
detail::SortedMap* zs = new detail::SortedMap;
unsigned encoding = OBJ_ENCODING_SKIPLIST;
auto cleanup = absl::MakeCleanup([&] { delete zs; });
if (zsetlen > DICT_HT_INITIAL_SIZE && !zs->Reserve(zsetlen)) {
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
ec_ = RdbError(errc::out_of_memory);
return;
@ -787,9 +719,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
maxelelen = sdslen(sdsele);
totelelen += sdslen(sdsele);
zskiplistNode* znode = zslInsert(zs->zsl, score, sdsele);
int ret = dictAdd(zs->dict, sdsele, &znode->score);
if (ret != DICT_OK) {
if (!zs->Insert(score, sdsele)) {
LOG(ERROR) << "Duplicate zset fields detected";
sdsfree(sdsele);
ec_ = RdbError(errc::rdb_file_corrupted);
@ -802,18 +732,17 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
if (ec_)
return;
unsigned enc = OBJ_ENCODING_SKIPLIST;
void* inner = zs;
if (zs->zsl->length <= server.zset_max_listpack_entries &&
if (zs->Size() <= server.zset_max_listpack_entries &&
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
enc = OBJ_ENCODING_LISTPACK;
inner = ToListPack(zs->zsl);
} else {
std::move(cleanup).Cancel();
encoding = OBJ_ENCODING_LISTPACK;
inner = zs->ToListPack();
delete zs;
}
pv_->InitRobj(OBJ_ZSET, enc, inner);
std::move(cleanup).Cancel();
pv_->InitRobj(OBJ_ZSET, encoding, inner);
}
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
@ -1006,11 +935,12 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
unsigned encoding = OBJ_ENCODING_LISTPACK;
void* inner;
if (lpBytes(lp) > server.zset_max_listpack_entries) {
inner = FromListPack(lp);
zfree(lp);
inner = detail::SortedMap::FromListPack(lp).release();
lpFree(lp);
encoding = OBJ_ENCODING_SKIPLIST;
} else {
inner = lpShrinkToFit(lp);
lp = lpShrinkToFit(lp);
inner = lp;
}
pv_->InitRobj(OBJ_ZSET, encoding, inner);
return;