From 2c04311cc368b01cd55432e644d87ca3dcdba310 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 17 Jul 2023 09:34:12 +0300 Subject: [PATCH] chore: Remove robj reference from zset_family (#1554) This is pure refactoring PR that does not change any functionality besides prohibiting using AsRobj/SyncRobj functions for compact objects of type OBJ_ZSET. This is needed in case we decide in the future to implement our own zset type. Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 23 ++- src/core/compact_object.h | 26 +++- src/core/compact_object_test.cc | 3 +- src/redis/object.c | 18 --- src/redis/object.h | 2 - src/redis/t_zset.c | 70 ++------- src/redis/zset.h | 4 + src/server/container_utils.cc | 16 +-- src/server/container_utils.h | 5 +- src/server/generic_family.cc | 3 +- src/server/rdb_load.cc | 115 ++++++++++++--- src/server/rdb_save.cc | 15 +- src/server/rdb_save.h | 2 +- src/server/zset_family.cc | 243 ++++++++++++++++++-------------- 14 files changed, 319 insertions(+), 226 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index d1bff9ca4..b6dbf12c1 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -137,9 +137,7 @@ inline void FreeObjZset(unsigned encoding, void* ptr) { switch (encoding) { case OBJ_ENCODING_SKIPLIST: zs = (zset*)ptr; - dictRelease(zs->dict); - zslFree(zs->zsl); - zfree(zs); + zsetFree(zs); break; case OBJ_ENCODING_LISTPACK: zfree(ptr); @@ -361,6 +359,19 @@ bool RobjWrapper::DefragIfNeeded(float ratio) { return false; } +int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, double* newscore) { + robj self{.type = type_, + .encoding = encoding_, + .lru = 0, + .refcount = OBJ_STATIC_REFCOUNT, + .ptr = inner_obj_}; + + int res = zsetAdd(&self, score, ele, in_flags, out_flags, newscore); + inner_obj_ = self.ptr; + encoding_ = self.encoding; + return res; +} + bool RobjWrapper::Reallocate(MemoryResource* mr) { void* old_ptr = inner_obj_; inner_obj_ = mr->allocate(sz_, kAlignSize); @@ -531,6 +542,7 @@ unsigned CompactObj::Encoding() const { void CompactObj::ImportRObj(robj* o) { CHECK(1 == o->refcount || o->refcount == OBJ_STATIC_REFCOUNT); CHECK_NE(o->encoding, OBJ_ENCODING_EMBSTR); // need regular one + CHECK_NE(o->type, OBJ_ZSET); SetMeta(ROBJ_TAG); @@ -563,7 +575,7 @@ robj* CompactObj::AsRObj() const { unsigned enc = u_.r_obj.encoding(); res->type = u_.r_obj.type(); - if (res->type == OBJ_SET || res->type == OBJ_HASH) { + if (res->type == OBJ_SET || res->type == OBJ_HASH || res->type == OBJ_ZSET) { LOG(DFATAL) << "Should not call AsRObj for type " << res->type; } @@ -585,7 +597,8 @@ void CompactObj::SyncRObj() { DCHECK_EQ(ROBJ_TAG, taglen_); DCHECK_EQ(u_.r_obj.type(), obj->type); - CHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj"; + DCHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj"; + CHECK_NE(OBJ_ZSET, obj->type) << "zsets should be handled without robj"; unsigned enc = obj->encoding; u_.r_obj.Init(obj->type, enc, obj->ptr); diff --git a/src/core/compact_object.h b/src/core/compact_object.h index d179df6fe..035c99565 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -58,12 +58,19 @@ class RobjWrapper { return inner_obj_; } + void set_inner_obj(void* ptr) { + inner_obj_ = ptr; + } + std::string_view AsView() const { return std::string_view{reinterpret_cast(inner_obj_), sz_}; } bool DefragIfNeeded(float ratio); + // as defined in zset.h + int ZsetAdd(double score, char* ele, int in_flags, int* out_flags, double* newscore); + private: bool Reallocate(MemoryResource* mr); size_t InnerObjMallocUsed() const; @@ -94,7 +101,13 @@ class CompactObj { CompactObj(const CompactObj&) = delete; // 0-16 is reserved for inline lengths of string type. - enum TagEnum { INT_TAG = 17, SMALL_TAG = 18, ROBJ_TAG = 19, EXTERNAL_TAG = 20, JSON_TAG = 21 }; + enum TagEnum { + INT_TAG = 17, + SMALL_TAG = 18, + ROBJ_TAG = 19, + EXTERNAL_TAG = 20, + JSON_TAG = 21, + }; enum MaskBit { REF_BIT = 1, @@ -245,7 +258,7 @@ class CompactObj { robj* AsRObj() const; - // takes ownership over obj. + // takes ownership over obj_inner. // type should not be OBJ_STRING. void InitRobj(unsigned type, unsigned encoding, void* obj_inner); @@ -257,6 +270,15 @@ class CompactObj { void SetInt(int64_t val); std::optional TryGetInt() const; + // We temporary expose this function to avoid passing around robj objects. + detail::RobjWrapper* GetRobjWrapper() { + return &u_.r_obj; + } + + const detail::RobjWrapper* GetRobjWrapper() const { + return &u_.r_obj; + } + // For STR object. void SetString(std::string_view str); void GetString(std::string* res) const; diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 0caca6dcd..d582ff665 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -377,8 +377,7 @@ TEST_F(CompactObjectTest, ZSet) { "minstring"; EXPECT_EQ(9, sdslen(kMinStrData + 1)); - robj* src = createZsetListpackObject(); - cobj_.ImportRObj(src); + cobj_.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lpNew(0)); EXPECT_EQ(OBJ_ZSET, cobj_.ObjType()); EXPECT_EQ(OBJ_ENCODING_LISTPACK, cobj_.Encoding()); diff --git a/src/redis/object.c b/src/redis/object.c index 8763d26e0..a97eb61dd 100644 --- a/src/redis/object.c +++ b/src/redis/object.c @@ -269,24 +269,6 @@ robj *createHashObject(void) { return o; } -robj *createZsetObject(void) { - zset *zs = zmalloc(sizeof(*zs)); - robj *o; - - zs->dict = dictCreate(&zsetDictType); - zs->zsl = zslCreate(); - o = createObject(OBJ_ZSET,zs); - o->encoding = OBJ_ENCODING_SKIPLIST; - return o; -} - -robj *createZsetListpackObject(void) { - unsigned char *lp = lpNew(0); - robj *o = createObject(OBJ_ZSET,lp); - o->encoding = OBJ_ENCODING_LISTPACK; - return o; -} - robj *createStreamObject(void) { stream *s = streamNew(); robj *o = createObject(OBJ_STREAM,s); diff --git a/src/redis/object.h b/src/redis/object.h index c02ae8d2d..8a6852f4c 100644 --- a/src/redis/object.h +++ b/src/redis/object.h @@ -96,8 +96,6 @@ robj *createQuicklistObject(void); robj *createSetObject(void); robj *createIntsetObject(void); robj *createHashObject(void); -robj *createZsetObject(void); -robj *createZsetListpackObject(void); unsigned long long estimateObjectIdleTime(const robj *o); uint8_t LFUDecrAndReturn(time_t epoch_sec, const robj *o); void listTypeConvert(robj *subject, int enc); diff --git a/src/redis/t_zset.c b/src/redis/t_zset.c index 92402888a..143f414bf 100644 --- a/src/redis/t_zset.c +++ b/src/redis/t_zset.c @@ -1123,6 +1123,19 @@ unsigned char *zzlDeleteRangeByRank(unsigned char *zl, unsigned int start, unsig * Common sorted set API *----------------------------------------------------------------------------*/ +zset* zsetCreate(void) { + zset *zs = zmalloc(sizeof(*zs)); + zs->dict = dictCreate(&zsetDictType); + zs->zsl = zslCreate(); + return zs; +} + +void zsetFree(zset *zs) { + dictRelease(zs->dict); + zslFree(zs->zsl); + zfree(zs); +} + unsigned long zsetLength(const robj *zobj) { unsigned long length = 0; if (zobj->encoding == OBJ_ENCODING_LISTPACK) { @@ -1423,7 +1436,7 @@ int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, dou * returning 1 if the element existed and was deleted, 0 otherwise (the * element was not there). It does not resize the dict after deleting the * element. */ -static int zsetRemoveFromSkiplist(zset *zs, sds ele) { +int zsetRemoveFromSkiplist(zset *zs, sds ele) { dictEntry *de; double score; @@ -1536,58 +1549,3 @@ long zsetRank(robj *zobj, sds ele, int reverse) { serverPanic("Unknown sorted set encoding"); } } - -/* This is a helper function for the COPY command. - * Duplicate a sorted set object, with the guarantee that the returned object - * has the same encoding as the original one. - * - * The resulting object always has refcount set to 1 */ -robj *zsetDup(robj *o) { - robj *zobj; - zset *zs; - zset *new_zs; - - serverAssert(o->type == OBJ_ZSET); - - /* Create a new sorted set object that have the same encoding as the original object's encoding */ - if (o->encoding == OBJ_ENCODING_LISTPACK) { - unsigned char *zl = o->ptr; - size_t sz = lpBytes(zl); - unsigned char *new_zl = zmalloc(sz); - memcpy(new_zl, zl, sz); - zobj = createObject(OBJ_ZSET, new_zl); - zobj->encoding = OBJ_ENCODING_LISTPACK; - } else if (o->encoding == OBJ_ENCODING_SKIPLIST) { - zobj = createZsetObject(); - zs = o->ptr; - new_zs = zobj->ptr; - dictExpand(new_zs->dict,dictSize(zs->dict)); - zskiplist *zsl = zs->zsl; - zskiplistNode *ln; - sds ele; - long llen = zsetLength(o); - - /* We copy the skiplist elements from the greatest to the - * smallest (that's trivial since the elements are already ordered in - * the skiplist): this improves the load process, since the next loaded - * element will always be the smaller, so adding to the skiplist - * will always immediately stop at the head, making the insertion - * O(1) instead of O(log(N)). */ - ln = zsl->tail; - while (llen--) { - ele = ln->ele; - sds new_ele = sdsdup(ele); - zskiplistNode *znode = zslInsert(new_zs->zsl,ln->score,new_ele); - dictAdd(new_zs->dict,new_ele,&znode->score); - ln = ln->backward; - } - } else { - serverPanic("Unknown sorted set encoding"); - } - return zobj; -} - -/* Create a new sds string from the listpack entry. */ -sds zsetSdsFromListpackEntry(listpackEntry *e) { - return e->sval ? sdsnewlen(e->sval, e->slen) : sdsfromlonglong(e->lval); -} diff --git a/src/redis/zset.h b/src/redis/zset.h index ef2469889..361d5f3e8 100644 --- a/src/redis/zset.h +++ b/src/redis/zset.h @@ -69,9 +69,13 @@ zskiplistNode* zslLastInRange(zskiplist* zsl, const zrangespec* range); // double zzlGetScore(unsigned char *sptr); // void zzlNext(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); // void zzlPrev(unsigned char *zl, unsigned char **eptr, unsigned char **sptr); +unsigned char *zzlFind(unsigned char *lp, sds ele, double *score); unsigned char* zzlFirstInRange(unsigned char* zl, const zrangespec* range); unsigned char* zzlLastInRange(unsigned char* zl, const zrangespec* range); +zset* zsetCreate(void); +void zsetFree(zset *o); unsigned long zsetLength(const robj* zobj); +int zsetRemoveFromSkiplist(zset *zs, sds ele); void zsetConvert(robj* zobj, int encoding); void zsetConvertToZiplistIfNeeded(robj* zobj, size_t maxelelen); int zsetScore(robj* zobj, sds member, double* score); diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 9577db33a..e9e711f39 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -90,16 +90,16 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func) { return success; } -bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, int32_t end, - bool reverse, bool use_score) { - unsigned long llen = zsetLength(zobj); +bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func, + int32_t start, int32_t end, bool reverse, bool use_score) { + unsigned long llen = robj_wrapper->Size(); if (end < 0 || unsigned(end) >= llen) end = llen - 1; unsigned rangelen = unsigned(end - start) + 1; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = static_cast(zobj->ptr); + if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = static_cast(robj_wrapper->inner_obj()); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen; @@ -138,15 +138,15 @@ bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, } return success; } else { - CHECK_EQ(zobj->encoding, OBJ_ENCODING_SKIPLIST); - zset* zs = static_cast(zobj->ptr); + CHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); + zset* zs = static_cast(robj_wrapper->inner_obj()); zskiplist* zsl = zs->zsl; zskiplistNode* ln; /* Check if starting point is trivial, before doing log(N) lookup. */ if (reverse) { ln = zsl->tail; - unsigned long llen = zsetLength(zobj); + unsigned long llen = robj_wrapper->Size(); if (start > 0) ln = zslGetElementByRank(zsl, llen - start); } else { diff --git a/src/server/container_utils.h b/src/server/container_utils.h index 87e19226f..1d8e2aab3 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -69,8 +69,9 @@ bool IterateSet(const PrimeValue& pv, const IterateFunc& func); // Iterate over all values and call func(val). Iteration stops as soon // as func return false. Returns true if it successfully processed all elements // without stopping. -bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0, - int32_t end = -1, bool reverse = false, bool use_score = false); +bool IterateSortedSet(const detail::RobjWrapper* robj_wrapper, const IterateSortedFunc& func, + int32_t start = 0, int32_t end = -1, bool reverse = false, + bool use_score = false); // Get StringMap pointer from primetable value. Sets expire time from db_context StringMap* GetStringMap(const PrimeValue& pv, const DbContext& db_context); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 7b70a3f17..a53aed507 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -923,7 +923,8 @@ template bool Iterate(const PrimeValue& pv, F&& func) { return container_utils::IterateSet(pv, cb); case OBJ_ZSET: return container_utils::IterateSortedSet( - pv.AsRObj(), [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); }); + pv.GetRobjWrapper(), + [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); }); default: return false; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 06d4b95ce..f7aa4ba44 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -69,6 +69,76 @@ inline void YieldIfNeeded(size_t i) { } } +// taken from zset.c +unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) { + unsigned char* sptr; + char scorebuf[128]; + int scorelen; + + scorelen = d2string(scorebuf, sizeof(scorebuf), score); + if (eptr == NULL) { + zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele)); + zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen); + } else { + /* Insert member before the element 'eptr'. */ + zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr); + + /* Insert score after the member. */ + zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL); + } + return zl; +} + +// taken from zset.c +uint8_t* ToListPack(const zskiplist* zsl) { + uint8_t* lp = lpNew(0); + + /* Approach similar to zslFree(), since we want to free the skiplist at + * the same time as creating the listpack. */ + zskiplistNode* node = zsl->header->level[0].forward; + + while (node) { + lp = zzlInsertAt(lp, NULL, node->ele, node->score); + node = node->level[0].forward; + } + + return lp; +} + +// taken from zsetConvert +zset* FromListPack(const uint8_t* lp) { + uint8_t* zl = (uint8_t*)lp; + unsigned char *eptr, *sptr; + unsigned char* vstr; + unsigned int vlen; + long long vlong; + sds ele; + + eptr = lpSeek(zl, 0); + if (eptr != NULL) { + sptr = lpNext(zl, eptr); + CHECK(sptr != NULL); + } + + zset* zs = zsetCreate(); + + while (eptr != NULL) { + double score = zzlGetScore(sptr); + vstr = lpGetValue(eptr, &vlen, &vlong); + if (vstr == NULL) + ele = sdsfromlonglong(vlong); + else + ele = sdsnewlen((char*)vstr, vlen); + + zskiplistNode* node = zslInsert(zs->zsl, score, ele); + CHECK_EQ(DICT_OK, dictAdd(zs->dict, ele, &node->score)); + + zzlNext(zl, &eptr, &sptr); + } + + return zs; +} + class error_category : public std::error_category { public: const char* name() const noexcept final { @@ -693,10 +763,8 @@ void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { } void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { - robj* res = createZsetObject(); - zset* zs = (zset*)res->ptr; - - auto cleanup = absl::Cleanup([&] { decrRefCount(res); }); + zset* zs = zsetCreate(); + auto cleanup = absl::Cleanup([&] { zsetFree(zs); }); size_t zsetlen = ltrace->blob_count(); if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) { @@ -734,15 +802,18 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { if (ec_) return; - /* Convert *after* loading, since sorted sets are not stored ordered. */ - if (zsetLength(res) <= server.zset_max_listpack_entries && + unsigned enc = OBJ_ENCODING_SKIPLIST; + void* inner = zs; + + if (zs->zsl->length <= server.zset_max_listpack_entries && maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) { - zsetConvert(res, OBJ_ENCODING_LISTPACK); + enc = OBJ_ENCODING_LISTPACK; + inner = ToListPack(zs->zsl); + } else { + std::move(cleanup).Cancel(); } - std::move(cleanup).Cancel(); - - pv_->ImportRObj(res); + pv_->InitRobj(OBJ_ZSET, enc, inner); } void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { @@ -866,7 +937,6 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { return; } - robj* res = nullptr; if (rdb_type_ == RDB_TYPE_SET_INTSET) { if (!intsetValidateIntegrity((const uint8_t*)blob.data(), blob.size(), 0)) { LOG(ERROR) << "Intset integrity check failed."; @@ -877,6 +947,8 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { const intset* is = (const intset*)blob.data(); unsigned len = intsetLen(is); + robj* res = nullptr; + if (len > SetFamily::MaxIntsetEntries()) { res = createSetObject(); if (!SetFamily::ConvertToStrSet(is, len, res)) { @@ -891,6 +963,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { res = createObject(OBJ_SET, mine); res->encoding = OBJ_ENCODING_INTSET; } + pv_->ImportRObj(res); } else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST) { unsigned char* lp = lpNew(blob.size()); if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(), &lp)) { @@ -930,18 +1003,20 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) { return; } - res = createObject(OBJ_ZSET, lp); - res->encoding = OBJ_ENCODING_LISTPACK; - - if (lpBytes(lp) > server.zset_max_listpack_entries) - zsetConvert(res, OBJ_ENCODING_SKIPLIST); - else - res->ptr = lpShrinkToFit(lp); + unsigned encoding = OBJ_ENCODING_LISTPACK; + void* inner; + if (lpBytes(lp) > server.zset_max_listpack_entries) { + inner = FromListPack(lp); + zfree(lp); + encoding = OBJ_ENCODING_SKIPLIST; + } else { + inner = lpShrinkToFit(lp); + } + pv_->InitRobj(OBJ_ZSET, encoding, inner); + return; } else { LOG(FATAL) << "Unsupported rdb type " << rdb_type_; } - - pv_->ImportRObj(res); } sds RdbLoaderBase::OpaqueObjLoader::ToSds(const RdbVariant& obj) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index dbd913fa1..e6fcbb81a 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -321,7 +321,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) { } if (obj_type == OBJ_ZSET) { - return SaveZSetObject(pv.AsRObj()); + return SaveZSetObject(pv); } if (obj_type == OBJ_STREAM) { @@ -441,10 +441,11 @@ error_code RdbSerializer::SaveHSetObject(const PrimeValue& pv) { return error_code{}; } -error_code RdbSerializer::SaveZSetObject(const robj* obj) { - DCHECK_EQ(OBJ_ZSET, obj->type); - if (obj->encoding == OBJ_ENCODING_SKIPLIST) { - zset* zs = (zset*)obj->ptr; +error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) { + DCHECK_EQ(OBJ_ZSET, pv.ObjType()); + const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); + if (pv.Encoding() == OBJ_ENCODING_SKIPLIST) { + zset* zs = (zset*)robj_wrapper->inner_obj(); zskiplist* zsl = zs->zsl; RETURN_ON_ERR(SaveLen(zsl->length)); @@ -462,8 +463,8 @@ error_code RdbSerializer::SaveZSetObject(const robj* obj) { zn = zn->backward; } } else { - CHECK_EQ(obj->encoding, unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding"; - uint8_t* lp = (uint8_t*)obj->ptr; + CHECK_EQ(pv.Encoding(), unsigned(OBJ_ENCODING_LISTPACK)) << "Unknown zset encoding"; + uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj(); RETURN_ON_ERR(SaveListPackAsZiplist(lp)); } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 12ba39fb1..4ebc78dc4 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -168,7 +168,7 @@ class RdbSerializer { std::error_code SaveListObject(const robj* obj); std::error_code SaveSetObject(const PrimeValue& pv); std::error_code SaveHSetObject(const PrimeValue& pv); - std::error_code SaveZSetObject(const robj* obj); + std::error_code SaveZSetObject(const PrimeValue& pv); std::error_code SaveStreamObject(const robj* obj); std::error_code SaveJsonObject(const PrimeValue& pv); diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 632016a50..b5c0e24a2 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -9,6 +9,7 @@ extern "C" { #include "redis/geohash_helper.h" #include "redis/listpack.h" #include "redis/object.h" +#include "redis/redis_aux.h" #include "redis/util.h" #include "redis/zset.h" } @@ -82,6 +83,53 @@ zlexrangespec GetLexRange(bool reverse, const ZSetFamily::LexInterval& li) { return range; } +/* Delete the element 'ele' from the sorted set, returning 1 if the element + * existed and was deleted, 0 otherwise (the element was not there). + * taken from t_zset.c + */ + +int ZsetDel(detail::RobjWrapper* robj_wrapper, sds ele) { + if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { + unsigned char* eptr; + uint8_t* lp = (uint8_t*)robj_wrapper->inner_obj(); + if ((eptr = zzlFind(lp, ele, NULL)) != NULL) { + lp = lpDeleteRangeWithEntry(lp, &eptr, 2); + robj_wrapper->set_inner_obj(lp); + return 1; + } + } else if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { + zset* zs = (zset*)robj_wrapper->inner_obj(); + if (zsetRemoveFromSkiplist(zs, ele)) { + if (htNeedsResize(zs->dict)) + dictResize(zs->dict); + return 1; + } + } + return 0; /* No such element found. */ +} + +// taken from t_zset.c +std::optional GetZsetScore(detail::RobjWrapper* robj_wrapper, sds member) { + if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { + double score; + if (zzlFind((uint8_t*)robj_wrapper->inner_obj(), member, &score) == NULL) + return std::nullopt; + return score; + } + + if (robj_wrapper->encoding() == OBJ_ENCODING_SKIPLIST) { + zset* zs = (zset*)robj_wrapper->inner_obj(); + dictEntry* de = dictFind(zs->dict, member); + if (de == NULL) + return std::nullopt; + + return *(double*)dictGetVal(de); + } + + LOG(FATAL) << "Unknown sorted set encoding"; + return 0; +} + struct ZParams { unsigned flags = 0; // mask of ZADD_IN_ macros. bool ch = false; // Corresponds to CH option. @@ -104,20 +152,20 @@ OpResult FindZEntry(const ZParams& zparams, const OpArgs& op_args } PrimeIterator& it = add_res.first; - if (add_res.second || zparams.override) { - robj* zobj = nullptr; + PrimeValue& pv = it->second; + if (add_res.second || zparams.override) { if (member_len > kMaxListPackValue) { - zobj = createZsetObject(); + zset* zs = zsetCreate(); + pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs); } else { - zobj = createZsetListpackObject(); + unsigned char* lp = lpNew(0); + pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp); } - DVLOG(2) << "Created zset " << zobj->ptr; if (!add_res.second) { db_slice.PreUpdate(op_args.db_cntx.db_index, it); } - it->second.ImportRObj(zobj); } else { if (it->second.ObjType() != OBJ_ZSET) return OpStatus::WRONG_TYPE; @@ -176,8 +224,8 @@ enum class Action { RANGE = 0, REMOVE = 1, POP = 2 }; class IntervalVisitor { public: - IntervalVisitor(Action action, const ZSetFamily::RangeParams& params, robj* o) - : action_(action), params_(params), zobj_(o) { + IntervalVisitor(Action action, const ZSetFamily::RangeParams& params, PrimeValue* pv) + : action_(action), params_(params), robj_wrapper_(pv->GetRobjWrapper()) { } void operator()(const ZSetFamily::IndexInterval& ii); @@ -236,14 +284,14 @@ class IntervalVisitor { Action action_; ZSetFamily::RangeParams params_; - robj* zobj_; + detail::RobjWrapper* robj_wrapper_; ZSetFamily::ScoredArray result_; unsigned removed_ = 0; }; void IntervalVisitor::operator()(const ZSetFamily::IndexInterval& ii) { - unsigned long llen = zsetLength(zobj_); + unsigned long llen = robj_wrapper_->Size(); int32_t start = ii.first; int32_t end = ii.second; @@ -322,7 +370,7 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) { end = static_cast(min(1ULL * start + params_.limit - 1, 1ULL * end)); container_utils::IterateSortedSet( - zobj_, + robj_wrapper_, [this](container_utils::ContainerEntry ce, double score) { result_.emplace_back(ce.ToString(), score); return true; @@ -331,76 +379,76 @@ void IntervalVisitor::ActionRange(unsigned start, unsigned end) { } void IntervalVisitor::ActionRange(const zrangespec& range) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { ExtractListPack(range); } else { - CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST); + CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); ExtractSkipList(range); } } void IntervalVisitor::ActionRange(const zlexrangespec& range) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { ExtractListPack(range); } else { - CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST); + CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); ExtractSkipList(range); } } void IntervalVisitor::ActionRem(unsigned start, unsigned end) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); removed_ = (end - start) + 1; zl = lpDeleteRange(zl, 2 * start, 2 * removed_); - zobj_->ptr = zl; + robj_wrapper_->set_inner_obj(zl); } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding); - zset* zs = (zset*)zobj_->ptr; + CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); + zset* zs = (zset*)robj_wrapper_->inner_obj(); removed_ = zslDeleteRangeByRank(zs->zsl, start + 1, end + 1, zs->dict); } } void IntervalVisitor::ActionRem(const zrangespec& range) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); unsigned long deleted = 0; zl = zzlDeleteRangeByScore(zl, &range, &deleted); - zobj_->ptr = zl; + robj_wrapper_->set_inner_obj(zl); removed_ = deleted; } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding); - zset* zs = (zset*)zobj_->ptr; + CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); + zset* zs = (zset*)robj_wrapper_->inner_obj(); removed_ = zslDeleteRangeByScore(zs->zsl, &range, zs->dict); } } void IntervalVisitor::ActionRem(const zlexrangespec& range) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); unsigned long deleted = 0; zl = zzlDeleteRangeByLex(zl, &range, &deleted); - zobj_->ptr = zl; + robj_wrapper_->set_inner_obj(zl); removed_ = deleted; } else { - CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding); - zset* zs = (zset*)zobj_->ptr; + CHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper_->encoding()); + zset* zs = (zset*)robj_wrapper_->inner_obj(); removed_ = zslDeleteRangeByLex(zs->zsl, &range, zs->dict); } } void IntervalVisitor::ActionPop(ZSetFamily::TopNScored sc) { - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { + if (robj_wrapper_->encoding() == OBJ_ENCODING_LISTPACK) { PopListPack(sc); } else { - CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST); + CHECK_EQ(robj_wrapper_->encoding(), OBJ_ENCODING_SKIPLIST); PopSkipList(sc); } } void IntervalVisitor::ExtractListPack(const zrangespec& range) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen = 0; @@ -444,7 +492,7 @@ void IntervalVisitor::ExtractListPack(const zrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zrangespec& range) { - zset* zs = (zset*)zobj_->ptr; + zset* zs = (zset*)robj_wrapper_->inner_obj(); zskiplist* zsl = zs->zsl; zskiplistNode* ln; unsigned offset = params_.offset; @@ -476,7 +524,7 @@ void IntervalVisitor::ExtractSkipList(const zrangespec& range) { } void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); uint8_t *eptr, *sptr = nullptr; uint8_t* vstr = nullptr; unsigned int vlen = 0; @@ -524,7 +572,7 @@ void IntervalVisitor::ExtractListPack(const zlexrangespec& range) { } void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) { - zset* zs = (zset*)zobj_->ptr; + zset* zs = (zset*)robj_wrapper_->inner_obj(); zskiplist* zsl = zs->zsl; zskiplistNode* ln; unsigned offset = params_.offset; @@ -561,7 +609,7 @@ void IntervalVisitor::ExtractSkipList(const zlexrangespec& range) { } void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { - uint8_t* zl = (uint8_t*)zobj_->ptr; + uint8_t* zl = (uint8_t*)robj_wrapper_->inner_obj(); uint8_t *eptr, *sptr; uint8_t* vstr; unsigned int vlen = 0; @@ -596,11 +644,11 @@ void IntervalVisitor::PopListPack(ZSetFamily::TopNScored sc) { } /* We can finally delete the elements */ - zobj_->ptr = lpDeleteRange(zl, start, 2 * sc); + robj_wrapper_->set_inner_obj(lpDeleteRange(zl, start, 2 * sc)); } void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) { - zset* zs = (zset*)zobj_->ptr; + zset* zs = (zset*)robj_wrapper_->inner_obj(); zskiplist* zsl = zs->zsl; zskiplistNode* ln; @@ -615,7 +663,7 @@ void IntervalVisitor::PopSkipList(ZSetFamily::TopNScored sc) { result_.emplace_back(string{ln->ele, sdslen(ln->ele)}, ln->score); /* we can delete the element now */ - zsetDel(zobj_, ln->ele); + ZsetDel(robj_wrapper_, ln->ele); ln = Next(ln); } @@ -687,11 +735,10 @@ void SendAtLeastOneKeyError(ConnectionContext* cntx) { enum class AggType : uint8_t { SUM, MIN, MAX, NOOP }; using ScoredMap = absl::flat_hash_map; -ScoredMap FromObject(const CompactObj& co, double weight) { - robj* obj = co.AsRObj(); +ScoredMap FromObject(CompactObj& co, double weight) { ZSetFamily::RangeParams params; params.with_scores = true; - IntervalVisitor vis(Action::RANGE, params, obj); + IntervalVisitor vis(Action::RANGE, params, &co); vis(ZSetFamily::IndexInterval(0, -1)); ZSetFamily::ScoredArray arr = vis.PopResult(); @@ -935,8 +982,6 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); - unsigned added = 0; unsigned updated = 0; unsigned processed = 0; @@ -947,12 +992,12 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ OpStatus op_status = OpStatus::OK; AddResult aresult; - + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); for (size_t j = 0; j < members.size(); j++) { const auto& m = members[j]; tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size()); - int retval = zsetAdd(zobj, m.first, tmp_str, zparams.flags, &retflags, &new_score); + int retval = robj_wrapper->ZsetAdd(m.first, tmp_str, zparams.flags, &retflags, &new_score); if (zparams.flags & ZADD_IN_INCR) { if (retval == 0) { @@ -975,9 +1020,6 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ processed++; } - DVLOG(2) << "ZAdd " << zobj->ptr; - - res_it.value()->second.SyncRObj(); op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zparams.flags & ZADD_IN_INCR) { @@ -1191,15 +1233,14 @@ ZSetFamily::ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_ DVLOG(2) << "popping from " << key << " " << t->DebugId(); db_slice.PreUpdate(t->GetDbIndex(), it); - robj* zobj = it_res.value()->second.AsRObj(); - IntervalVisitor iv{Action::POP, range_spec.params, zobj}; + PrimeValue& pv = it->second; + IntervalVisitor iv{Action::POP, range_spec.params, &pv}; std::visit(iv, range_spec.interval); - it_res.value()->second.SyncRObj(); db_slice.PostUpdate(t->GetDbIndex(), *it_res, key); - auto zlen = zsetLength(zobj); + auto zlen = pv.Size(); if (zlen == 0) { DVLOG(1) << "deleting key " << key << " " << t->DebugId(); CHECK(db_slice.Del(t->GetDbIndex(), *it_res)); @@ -1292,15 +1333,14 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); - robj* zobj = res_it.value()->second.AsRObj(); + PrimeValue& pv = res_it.value()->second; - IntervalVisitor iv{Action::POP, range_spec.params, zobj}; + IntervalVisitor iv{Action::POP, range_spec.params, &pv}; std::visit(iv, range_spec.interval); - res_it.value()->second.SyncRObj(); db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); - auto zlen = zsetLength(zobj); + auto zlen = pv.Size(); if (zlen == 0) { CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } @@ -1314,8 +1354,8 @@ auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, st if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); - IntervalVisitor iv{Action::RANGE, range_spec.params, zobj}; + PrimeValue& pv = res_it.value()->second; + IntervalVisitor iv{Action::RANGE, range_spec.params, &pv}; std::visit(iv, range_spec.interval); @@ -1331,15 +1371,13 @@ OpResult OpRemRange(const OpArgs& op_args, string_view key, db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); - robj* zobj = res_it.value()->second.AsRObj(); - - IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj}; + PrimeValue& pv = res_it.value()->second; + IntervalVisitor iv{Action::REMOVE, range_spec.params, &pv}; std::visit(iv, range_spec.interval); - res_it.value()->second.SyncRObj(); db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); - auto zlen = zsetLength(zobj); + auto zlen = pv.Size(); if (zlen == 0) { CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value())); } @@ -1353,10 +1391,17 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view me if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size()); + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); + robj self{ + .type = OBJ_ZSET, + .encoding = robj_wrapper->encoding(), + .lru = 0, + .refcount = OBJ_STATIC_REFCOUNT, + .ptr = robj_wrapper->inner_obj(), + }; - long res = zsetRank(zobj, op_args.shard->tmp_str1, reverse); + long res = zsetRank(&self, op_args.shard->tmp_str1, reverse); if (res < 0) return OpStatus::KEY_NOTFOUND; return res; @@ -1368,12 +1413,12 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); zrangespec range = GetZrangeSpec(false, interval); unsigned count = 0; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj->ptr; + if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj(); uint8_t *eptr, *sptr; double score; @@ -1404,8 +1449,8 @@ OpResult OpCount(const OpArgs& op_args, std::string_view key, } } } else { - CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding); - zset* zs = (zset*)zobj->ptr; + CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), robj_wrapper->encoding()); + zset* zs = (zset*)robj_wrapper->inner_obj(); zskiplist* zsl = zs->zsl; zskiplistNode* zn; unsigned long rank; @@ -1439,11 +1484,12 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); zlexrangespec range = GetLexRange(false, interval); unsigned count = 0; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj->ptr; + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); + + if (robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj(); uint8_t *eptr, *sptr; /* Use the first element in range as the starting point */ @@ -1453,7 +1499,7 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, if (eptr) { /* First element is in range */ sptr = lpNext(zl, eptr); - serverAssertWithInfo(c, zobj, zzlLexValueLteMax(eptr, &range)); + serverAssertWithInfo(c, robj_wrapper, zzlLexValueLteMax(eptr, &range)); /* Iterate over elements in range */ while (eptr) { @@ -1467,8 +1513,8 @@ OpResult OpLexCount(const OpArgs& op_args, string_view key, } } } else { - DCHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj->encoding); - zset* zs = (zset*)zobj->ptr; + DCHECK_EQ(OBJ_ENCODING_SKIPLIST, robj_wrapper->encoding()); + zset* zs = (zset*)robj_wrapper->inner_obj(); zskiplist* zsl = zs->zsl; zskiplistNode* zn; unsigned long rank; @@ -1503,15 +1549,14 @@ OpResult OpRem(const OpArgs& op_args, string_view key, ArgSlice member return res_it.status(); db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it); - robj* zobj = res_it.value()->second.AsRObj(); + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); sds& tmp_str = op_args.shard->tmp_str1; unsigned deleted = 0; for (string_view member : members) { tmp_str = sdscpylen(tmp_str, member.data(), member.size()); - deleted += zsetDel(zobj, tmp_str); + deleted += ZsetDel(robj_wrapper, tmp_str); } - auto zlen = zsetLength(zobj); - res_it.value()->second.SyncRObj(); + auto zlen = robj_wrapper->Size(); db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key); if (zlen == 0) { @@ -1526,15 +1571,15 @@ OpResult OpScore(const OpArgs& op_args, string_view key, string_view mem if (!res_it) return res_it.status(); - robj* zobj = res_it.value()->second.AsRObj(); + PrimeValue& pv = res_it.value()->second; sds& tmp_str = op_args.shard->tmp_str1; tmp_str = sdscpylen(tmp_str, member.data(), member.size()); - double score; - int retval = zsetScore(zobj, tmp_str, &score); - if (retval != C_OK) { + + detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); + auto res = GetZsetScore(robj_wrapper, tmp_str); + if (!res) return OpStatus::KEY_NOTFOUND; - } - return score; + return *res; } OpResult OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) { @@ -1544,20 +1589,14 @@ OpResult OpMScore(const OpArgs& op_args, string_view key, ArgSli MScoreResponse scores(members.size()); - robj* zobj = res_it.value()->second.AsRObj(); + detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); sds& tmp_str = op_args.shard->tmp_str1; for (size_t i = 0; i < members.size(); i++) { const auto& m = members[i]; tmp_str = sdscpylen(tmp_str, m.data(), m.size()); - double score; - int retval = zsetScore(zobj, tmp_str, &score); - if (retval == C_OK) { - scores[i] = score; - } else { - scores[i] = std::nullopt; - } + scores[i] = GetZsetScore(robj_wrapper, tmp_str); } return scores; @@ -1571,14 +1610,14 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t return find_res.status(); PrimeIterator it = find_res.value(); + PrimeValue& pv = it->second; StringVec res; - robj* zobj = it->second.AsRObj(); char buf[128]; - if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + if (pv.Encoding() == OBJ_ENCODING_LISTPACK) { ZSetFamily::RangeParams params; params.with_scores = true; - IntervalVisitor iv{Action::RANGE, params, zobj}; + IntervalVisitor iv{Action::RANGE, params, &pv}; iv(ZSetFamily::IndexInterval{0, kuint32max}); ZSetFamily::ScoredArray arr = iv.PopResult(); @@ -1593,9 +1632,9 @@ OpResult OpScan(const OpArgs& op_args, std::string_view key, uint64_t } *cursor = 0; } else { - CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding); + CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), pv.Encoding()); uint32_t count = scan_op.limit; - zset* zs = (zset*)zobj->ptr; + zset* zs = (zset*)pv.RObjPtr(); dict* ht = zs->dict; long maxiterations = count * 10; @@ -1749,7 +1788,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { return find_res.status(); } - return zsetLength(find_res.value()->second.AsRObj()); + return find_res.value()->second.Size(); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb));