fix: improve performance of listpack sorted sets (#1885)

1. Cherry-pick changes from Redis 7 that encode integer scores more efficiently
2. Introduces optimization that first checks if the new element should be the last
   for listpack sorted sets.
3. Consolidates listpack related constants and tightens usage for listpack.
4. Introduce MEMORY USAGE command.
5. Introduce a small delay before decommitting memory pages back to OS.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-09-19 19:52:34 +03:00 committed by GitHub
parent cfd83a66a2
commit 8d1474aeac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 124 additions and 279 deletions

View file

@ -469,8 +469,8 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do
/* check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
if (zl_len + 1 > server.zset_max_listpack_entries ||
sdslen(ele) > server.zset_max_listpack_value || !lpSafeToAdd(lp, sdslen(ele))) {
if (zl_len >= server.zset_max_listpack_entries ||
sdslen(ele) > server.zset_max_listpack_value) {
unique_ptr<SortedMap> ss = SortedMap::FromListPack(tl.local_mr, lp);
lpFree(lp);
inner_obj_ = ss.release();

View file

@ -63,6 +63,8 @@ unsigned char *lpPrepend(unsigned char *lp, const unsigned char *s, uint32_t sle
unsigned char *lpPrependInteger(unsigned char *lp, long long lval);
unsigned char *lpAppend(unsigned char *lp, const unsigned char *s, uint32_t slen);
unsigned char *lpAppendInteger(unsigned char *lp, long long lval);
unsigned char *lpInsertInteger(unsigned char *lp, long long lval, unsigned char *p, int where,
unsigned char **newp);
unsigned char *lpReplace(unsigned char *lp, unsigned char **p, const unsigned char *s, uint32_t slen);
unsigned char *lpReplaceInteger(unsigned char *lp, unsigned char **p, long long lval);
unsigned char *lpDelete(unsigned char *lp, unsigned char *p, unsigned char **newp);

View file

@ -48,18 +48,6 @@
#define strtold(a,b) ((long double)strtod((a),(b)))
#endif
/* Try to release pages back to the OS directly (bypassing the allocator),
* in an effort to decrease CoW during fork. For small allocations, we can't
* release any full page, so in an effort to avoid getting the size of the
* allocation from the allocator (malloc_size) when we already know it's small,
* we check the size_hint. If the size is not already known, passing a size_hint
* of 0 will lead the checking the real size of the allocation.
* Also please note that the size may be not accurate, so in order to make this
* solution effective, the judgement for releasing memory pages should not be
* too strict. */
static void dismissMemory(void* ptr, size_t size_hint) {
}
/* ===================== Creation and parsing of objects ==================== */
robj *createObject(int type, void *ptr) {
@ -375,97 +363,6 @@ void decrRefCount(robj *o) {
}
}
/* See dismissObject() */
void dismissSds(sds s) {
dismissMemory(sdsAllocPtr(s), sdsAllocSize(s));
}
/* See dismissObject() */
void dismissStringObject(robj *o) {
if (o->encoding == OBJ_ENCODING_RAW) {
dismissSds(o->ptr);
}
}
/* See dismissObject() */
void dismissListObject(robj *o, size_t size_hint) {
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
serverAssert(ql->len != 0);
/* We iterate all nodes only when average node size is bigger than a
* page size, and there's a high chance we'll actually dismiss something. */
if (size_hint / ql->len >= server.page_size) {
quicklistNode *node = ql->head;
while (node) {
if (quicklistNodeIsCompressed(node)) {
dismissMemory(node->entry, ((quicklistLZF*)node->entry)->sz);
} else {
dismissMemory(node->entry, node->sz);
}
node = node->next;
}
}
} else {
serverPanic("Unknown list encoding type");
}
}
/* See dismissObject() */
void dismissSetObject(robj *o, size_t size_hint) {
if (o->encoding == OBJ_ENCODING_HT) {
dict *set = o->ptr;
serverAssert(dictSize(set) != 0);
/* We iterate all nodes only when average member size is bigger than a
* page size, and there's a high chance we'll actually dismiss something. */
if (size_hint / dictSize(set) >= server.page_size) {
dictEntry *de;
dictIterator *di = dictGetIterator(set);
while ((de = dictNext(di)) != NULL) {
dismissSds(dictGetKey(de));
}
dictReleaseIterator(di);
}
/* Dismiss hash table memory. */
dismissMemory(set->ht_table[0], DICTHT_SIZE(set->ht_size_exp[0])*sizeof(dictEntry*));
dismissMemory(set->ht_table[1], DICTHT_SIZE(set->ht_size_exp[1])*sizeof(dictEntry*));
} else if (o->encoding == OBJ_ENCODING_INTSET) {
dismissMemory(o->ptr, intsetBlobLen((intset*)o->ptr));
} else {
serverPanic("Unknown set encoding type");
}
}
/* See dismissObject() */
void dismissHashObject(robj *o, size_t size_hint) {
if (o->encoding == OBJ_ENCODING_HT) {
dict *d = o->ptr;
serverAssert(dictSize(d) != 0);
/* We iterate all fields only when average field/value size is bigger than
* a page size, and there's a high chance we'll actually dismiss something. */
if (size_hint / dictSize(d) >= server.page_size) {
dictEntry *de;
dictIterator *di = dictGetIterator(d);
while ((de = dictNext(di)) != NULL) {
/* Only dismiss values memory since the field size
* usually is small. */
dismissSds(dictGetVal(de));
}
dictReleaseIterator(di);
}
/* Dismiss hash table memory. */
dismissMemory(d->ht_table[0], DICTHT_SIZE(d->ht_size_exp[0])*sizeof(dictEntry*));
dismissMemory(d->ht_table[1], DICTHT_SIZE(d->ht_size_exp[1])*sizeof(dictEntry*));
} else if (o->encoding == OBJ_ENCODING_LISTPACK) {
dismissMemory(o->ptr, lpBytes((unsigned char*)o->ptr));
} else {
serverPanic("Unknown hash encoding type");
}
}
/* This variant of decrRefCount() gets its argument as void, and is useful
* as free method in data structures that expect a 'void free_object(void*)'
* prototype for the free method. */

View file

@ -180,7 +180,6 @@ void hashTypeCurrentObject(hashTypeIterator *hi, int what, unsigned char **vstr,
sds hashTypeCurrentObjectNewSds(hashTypeIterator *hi, int what);
int hashTypeGetValue(robj *o, sds field, unsigned char **vstr, unsigned int *vlen, long long *vll);
robj *hashTypeGetValueObject(robj *o, sds field);
int hashTypeSet(robj *o, sds field, sds value, int flags);
robj *hashTypeDup(robj *o);
int hashTypeGetFromListpack(robj *o, sds field,
unsigned char **vstr,

View file

@ -14,20 +14,16 @@ void InitRedisTables() {
crc64_init();
memset(&server, 0, sizeof(server));
server.page_size = sysconf(_SC_PAGESIZE);
server.maxmemory_policy = 0;
server.lfu_decay_time = 0;
// been used by t_zset routines that convert listpack to skiplist for cases
// above these thresholds.
server.zset_max_listpack_entries = 128;
server.zset_max_listpack_value = 64;
server.zset_max_listpack_entries = 100;
server.zset_max_listpack_value = 32;
// Present so that redis code compiles. However, we ignore this field and instead check against
// listpack total size in hset_family.cc
server.hash_max_listpack_entries = 512;
server.hash_max_listpack_value = 32; // decreased from redis default 64.
server.max_map_field_len = 64;
server.max_listpack_map_bytes = 1024;
server.stream_node_max_bytes = 4096;
server.stream_node_max_entries = 100;

View file

@ -71,8 +71,8 @@ typedef struct ServerStub {
// unused - left so that object.c will compile.
int maxmemory_policy; /* Policy for key eviction */
unsigned long page_size;
size_t hash_max_listpack_entries, hash_max_listpack_value;
size_t max_map_field_len, max_listpack_map_bytes;
size_t zset_max_listpack_entries;
size_t zset_max_listpack_value;

View file

@ -41,28 +41,6 @@
* Hash type API
*----------------------------------------------------------------------------*/
/* Check the length of a number of objects to see if we need to convert a
* listpack to a real hash. Note that we only check string encoded objects
* as their string length can be queried in constant time. */
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
int i;
size_t sum = 0;
if (o->encoding != OBJ_ENCODING_LISTPACK) return;
for (i = start; i <= end; i++) {
if (!sdsEncodedObject(argv[i]))
continue;
size_t len = sdslen(argv[i]->ptr);
if (len > server.hash_max_listpack_value) {
hashTypeConvert(o, OBJ_ENCODING_HT);
return;
}
sum += len;
}
if (!lpSafeToAdd(o->ptr, sum))
hashTypeConvert(o, OBJ_ENCODING_HT);
}
/* Get the value from a listpack encoded hash, identified by field.
* Returns -1 when the field cannot be found. */
@ -188,96 +166,6 @@ int hashTypeExists(robj *o, sds field) {
return 0;
}
/* Add a new field, overwrite the old with the new value if it already exists.
* Return 0 on insert and 1 on update.
*
* By default, the key and value SDS strings are copied if needed, so the
* caller retains ownership of the strings passed. However this behavior
* can be effected by passing appropriate flags (possibly bitwise OR-ed):
*
* HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function.
* HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function.
*
* When the flags are used the caller does not need to release the passed
* SDS string(s). It's up to the function to use the string to create a new
* entry or to free the SDS string before returning to the caller.
*
* HASH_SET_COPY corresponds to no flags passed, and means the default
* semantics of copying the values if needed.
*
*/
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
int update = 0;
if (o->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char *zl, *fptr, *vptr;
zl = o->ptr;
fptr = lpFirst(zl);
if (fptr != NULL) {
fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
if (fptr != NULL) {
/* Grab pointer to the value (fptr points to the field) */
vptr = lpNext(zl, fptr);
serverAssert(vptr != NULL);
update = 1;
/* Replace value */
zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value));
}
}
if (!update) {
/* Push new field/value pair onto the tail of the listpack */
zl = lpAppend(zl, (unsigned char*)field, sdslen(field));
zl = lpAppend(zl, (unsigned char*)value, sdslen(value));
}
o->ptr = zl;
/* Check if the listpack needs to be converted to a hash table */
if (hashTypeLength(o) > server.hash_max_listpack_entries)
hashTypeConvert(o, OBJ_ENCODING_HT);
} else if (o->encoding == OBJ_ENCODING_HT) {
dictEntry *de = dictFind(o->ptr,field);
if (de) {
sdsfree(dictGetVal(de));
if (flags & HASH_SET_TAKE_VALUE) {
dictGetVal(de) = value;
value = NULL;
} else {
dictGetVal(de) = sdsdup(value);
}
update = 1;
} else {
sds f,v;
if (flags & HASH_SET_TAKE_FIELD) {
f = field;
field = NULL;
} else {
f = sdsdup(field);
}
if (flags & HASH_SET_TAKE_VALUE) {
v = value;
value = NULL;
} else {
v = sdsdup(value);
}
dictAdd(o->ptr,f,v);
}
} else {
serverPanic("Unknown hash encoding");
}
/* Free SDS strings we did not referenced elsewhere if the flags
* want this function to be responsible. */
if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
return update;
}
/* Delete an element from a hash.
* Return 1 on deleted and 0 on not found. */
int hashTypeDelete(robj *o, sds field) {

View file

@ -56,6 +56,7 @@
* pointers being only at "level 1". This allows to traverse the list
* from tail to head, useful for ZREVRANGE. */
#include <float.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>
@ -80,6 +81,36 @@ sds cminstring = kMinStrData + 1;
sds cmaxstring = kMaxStrData + 1;
/* Returns 1 if the double value can safely be represented in long long without
* precision loss, in which case the corresponding long long is stored in the out variable. */
static int double2ll(double d, long long *out) {
#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL)
/* Check if the float is in a safe range to be casted into a
* long long. We are assuming that long long is 64 bit here.
* Also we are assuming that there are no implementations around where
* double has precision < 52 bit.
*
* Under this assumptions we test if a double is inside a range
* where casting to long long is safe. Then using two castings we
* make sure the decimal part is zero. If all this is true we can use
* integer without precision loss.
*
* Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part,
* and the exponent bits are positive, which means the "decimal" part must be 0.
* i.e. all double values in that range are representable as a long without precision loss,
* but not all long values in that range can be represented as a double.
* we only care about the first part here. */
if (d < (double)(-LLONG_MAX/2) || d > (double)(LLONG_MAX/2))
return 0;
long long ll = d;
if (ll == d) {
*out = ll;
return 1;
}
#endif
return 0;
}
int zslLexValueGteMin(sds value, const zlexrangespec *spec);
int zslLexValueLteMax(sds value, const zlexrangespec *spec);
@ -1008,17 +1039,25 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, doub
unsigned char *sptr;
char scorebuf[128];
int scorelen;
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
long long lscore;
int score_is_long = double2ll(score, &lscore);
if (!score_is_long)
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele));
zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
if (score_is_long)
zl = lpAppendInteger(zl,lscore);
else
zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr);
/* Insert score after the member. */
zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
if (score_is_long)
zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL);
else
zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
}
return zl;
}
@ -1026,9 +1065,18 @@ unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, doub
/* Insert (element,score) pair in listpack. This function assumes the element is
* not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = lpSeek(zl,0), *sptr;
unsigned char *eptr = NULL, *sptr = lpSeek(zl,-1);
double s;
// Optimization: check first whether the new element should be the last.
if (sptr != NULL) {
s = zzlGetScore(sptr);
if (s >= score) {
// It should not be the last, so fallback to the forward iteration.
eptr = lpSeek(zl,0);
}
}
while (eptr != NULL) {
sptr = lpNext(zl,eptr);
serverAssert(sptr != NULL);
@ -1038,13 +1086,11 @@ unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
zl = zzlInsertAt(zl,eptr,ele,score);
break;
return zzlInsertAt(zl,eptr,ele,score);
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
zl = zzlInsertAt(zl,eptr,ele,score);
break;
return zzlInsertAt(zl,eptr,ele,score);
}
}
@ -1053,9 +1099,7 @@ unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
}
/* Push on tail of list when it was not yet inserted. */
if (eptr == NULL)
zl = zzlInsertAt(zl,NULL,ele,score);
return zl;
return zzlInsertAt(zl,NULL,ele,score);
}
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, const zrangespec *range, unsigned long *deleted) {
@ -1108,4 +1152,4 @@ unsigned char *zzlDeleteRangeByLex(unsigned char *zl, const zlexrangespec *range
if (deleted != NULL) *deleted = num;
return zl;
}
}

View file

@ -59,8 +59,13 @@ void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* s
size_t value_heap_size = pv.MallocUsed();
stats.inline_keys -= del_it->first.IsInline();
stats.obj_memory_usage -= (del_it->first.MallocUsed() + value_heap_size);
if (pv.ObjType() == OBJ_STRING)
if (pv.ObjType() == OBJ_STRING) {
stats.strval_memory_usage -= value_heap_size;
} else if (pv.ObjType() == OBJ_HASH && pv.Encoding() == kEncodingListPack) {
--stats.listpack_blob_cnt;
} else if (pv.ObjType() == OBJ_ZSET && pv.Encoding() == OBJ_ENCODING_LISTPACK) {
--stats.listpack_blob_cnt;
}
if (ClusterConfig::IsEnabled()) {
string tmp;

View file

@ -858,7 +858,7 @@ Usage: dragonfly [FLAGS]
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_set(mi_option_decommit_delay, 0);
mi_option_set(mi_option_decommit_delay, 1);
unique_ptr<util::ProactorPool> pool;

View file

@ -31,7 +31,6 @@ using absl::SimpleAtoi;
namespace {
constexpr size_t kMaxListPackLen = 1024;
using IncrByParam = std::variant<double, int64_t>;
using OptStr = std::optional<std::string>;
enum GetAllMode : uint8_t { FIELDS = 1, VALUES = 2 };
@ -39,12 +38,12 @@ enum GetAllMode : uint8_t { FIELDS = 1, VALUES = 2 };
bool IsGoodForListpack(CmdArgList args, const uint8_t* lp) {
size_t sum = 0;
for (auto s : args) {
if (s.size() > server.hash_max_listpack_value)
if (s.size() > server.max_map_field_len)
return false;
sum += s.size();
}
return lpBytes(const_cast<uint8_t*>(lp)) + sum < kMaxListPackLen;
return lpBytes(const_cast<uint8_t*>(lp)) + sum < server.max_listpack_map_bytes;
}
using container_utils::GetStringMap;
@ -182,7 +181,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
lpb = lpBytes(lp);
stats->listpack_bytes -= lpb;
if (lpb >= kMaxListPackLen) {
if (lpb >= server.max_listpack_map_bytes) {
stats->listpack_blob_cnt--;
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
pv.InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
@ -1164,10 +1163,6 @@ void HSetFamily::Register(CommandRegistry* registry) {
<< CI{"HVALS", CO::READONLY, 2, 1, 1, 1, acl::kHVals}.HFUNC(HVals);
}
uint32_t HSetFamily::MaxListPackLen() {
return kMaxListPackLen;
}
StringMap* HSetFamily::ConvertToStrMap(uint8_t* lp) {
StringMap* sm = new StringMap(CompactObj::memory_resource());
size_t lplen = lpLength(lp);

View file

@ -21,7 +21,6 @@ using facade::OpStatus;
class HSetFamily {
public:
static void Register(CommandRegistry* registry);
static uint32_t MaxListPackLen();
// Does not free lp.
static StringMap* ConvertToStrMap(uint8_t* lp);

View file

@ -35,16 +35,6 @@ class HestFamilyTestProtocolVersioned : public HSetFamilyTest,
INSTANTIATE_TEST_CASE_P(HestFamilyTestProtocolVersioned, HestFamilyTestProtocolVersioned,
::testing::Values("2", "3"));
TEST_F(HSetFamilyTest, Hash) {
robj* obj = createHashObject();
sds field = sdsnew("field");
sds val = sdsnew("value");
hashTypeSet(obj, field, val, 0);
sdsfree(field);
sdsfree(val);
decrRefCount(obj);
}
TEST_F(HSetFamilyTest, Basic) {
auto resp = Run({"hset", "x", "a"});
EXPECT_THAT(resp, ErrArg("wrong number"));

View file

@ -69,6 +69,10 @@ std::string MallocStats(bool backing, unsigned tid) {
return str;
}
size_t MemoryUsage(PrimeIterator it) {
return it->first.MallocUsed() + it->second.MallocUsed();
}
} // namespace
MemoryCmd::MemoryCmd(ServerFamily* owner, ConnectionContext* cntx) : cntx_(cntx) {
@ -83,15 +87,14 @@ void MemoryCmd::Run(CmdArgList args) {
"MALLOC-STATS [BACKING] [thread-id]",
" Show malloc stats for a heap residing in specified thread-id. 0 by default.",
" If BACKING is specified, show stats for the backing heap.",
"USAGE",
" (not implemented).",
"USAGE <key>",
};
return (*cntx_)->SendSimpleStrArr(help_arr);
};
if (sub_cmd == "USAGE") {
// dummy output, in practice not implemented yet.
return (*cntx_)->SendLong(1);
if (sub_cmd == "USAGE" && args.size() > 1) {
string_view key = ArgS(args, 1);
return Usage(key);
}
if (sub_cmd == "MALLOC-STATS") {
@ -121,4 +124,22 @@ void MemoryCmd::Run(CmdArgList args) {
return (*cntx_)->SendError(err, kSyntaxErrType);
}
void MemoryCmd::Usage(std::string_view key) {
ShardId sid = Shard(key, shard_set->size());
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this]() -> ssize_t {
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index());
PrimeIterator it = pt->Find(key);
if (IsValid(it)) {
return MemoryUsage(it);
} else {
return -1;
}
});
if (memory_usage < 0)
return cntx_->SendError(kKeyNotFoundErr);
(*cntx_)->SendLong(memory_usage);
}
} // namespace dfly

View file

@ -17,6 +17,8 @@ class MemoryCmd {
void Run(CmdArgList args);
private:
void Usage(std::string_view key);
ConnectionContext* cntx_;
};

View file

@ -561,7 +561,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count() / 2;
/* Too many entries? Use a hash table right from the start. */
bool keep_lp = (len <= server.hash_max_listpack_entries);
bool keep_lp = (len <= 64);
size_t lp_size = 0;
if (keep_lp) {
@ -569,7 +569,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
size_t str_len = StrLen(blob.rdb_var);
lp_size += str_len;
if (str_len > server.hash_max_listpack_value) {
if (str_len > server.max_map_field_len) {
keep_lp = false;
return false;
}
@ -971,7 +971,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
return;
}
if (lpBytes(lp) > HSetFamily::MaxListPackLen()) {
if (lpBytes(lp) > server.max_listpack_map_bytes) {
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
lpFree(lp);
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
@ -997,7 +997,7 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
unsigned encoding = OBJ_ENCODING_LISTPACK;
void* inner;
if (lpBytes(lp) > server.zset_max_listpack_entries) {
if (lpBytes(lp) >= server.max_listpack_map_bytes) {
inner = detail::SortedMap::FromListPack(CompactObj::memory_resource(), lp).release();
lpFree(lp);
encoding = OBJ_ENCODING_SKIPLIST;

View file

@ -332,11 +332,11 @@ TEST_F(RdbTest, SaveManyDbs) {
TEST_F(RdbTest, HMapBugs) {
// Force kEncodingStrMap2 encoding.
server.hash_max_listpack_value = 0;
server.max_map_field_len = 0;
Run({"hset", "hmap1", "key1", "val", "key2", "val2"});
Run({"hset", "hmap2", "key1", string(690557, 'a')});
server.hash_max_listpack_value = 32;
server.max_map_field_len = 32;
Run({"debug", "reload"});
EXPECT_EQ(2, CheckedInt({"hlen", "hmap1"}));
}
@ -353,10 +353,10 @@ TEST_F(RdbTest, Issue1305) {
*/
// Force kEncodingStrMap2 encoding.
server.hash_max_listpack_value = 0;
server.max_map_field_len = 0;
Run({"hset", "hmap", "key1", "val", "key2", ""});
server.hash_max_listpack_value = 32;
server.max_map_field_len = 32;
Run({"debug", "reload"});
EXPECT_EQ(2, CheckedInt({"hlen", "hmap"}));
}

View file

@ -42,7 +42,6 @@ static const char kFloatRangeErr[] = "min or max is not a float";
static const char kLexRangeErr[] = "min or max not valid string range item";
constexpr string_view kGeoAlphabet = "0123456789bcdefghjkmnpqrstuvwxyz"sv;
constexpr unsigned kMaxListPackValue = 64;
using MScoreResponse = std::vector<std::optional<double>>;
using ScoredMember = std::pair<std::string, double>;
@ -163,14 +162,15 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
PrimeIterator& it = add_res.first;
PrimeValue& pv = it->second;
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
if (add_res.second || zparams.override) {
if (member_len > kMaxListPackValue) {
if (member_len > server.max_map_field_len) {
detail::SortedMap* zs = new detail::SortedMap(CompactObj::memory_resource());
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs);
} else {
unsigned char* lp = lpNew(0);
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
stats->listpack_blob_cnt++;
}
if (!add_res.second) {
@ -942,7 +942,7 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
OpStatus op_status = OpStatus::OK;
AddResult aresult;
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
bool is_list_pack = robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK;
for (size_t j = 0; j < members.size(); j++) {
const auto& m = members[j];
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
@ -970,6 +970,12 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
processed++;
}
// if we migrated to skip_list - update listpack stats.
if (is_list_pack && robj_wrapper->encoding() != OBJ_ENCODING_LISTPACK) {
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
--stats->listpack_blob_cnt;
}
op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key);
if (zparams.flags & ZADD_IN_INCR) {
@ -1707,6 +1713,7 @@ void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) {
}
DCHECK(cntx->transaction);
std::sort(members.begin(), members.end());
absl::Span memb_sp{members.data(), members.size()};
ZAddGeneric(key, zparams, memb_sp, cntx);
}