chore: improve doubles encoding for listpacks (#1938)

chore: reduce double encoding for listpacks

Following memory improvements by Redis 7, use double convrsion library to represent
double values with less space for listpacks.

The change is to use double conversion library instead of plain sprintf inside
zzlInsertAt. This requires to move zzlInsertAt to Dragonfly codebase.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-09-26 22:24:37 +03:00 committed by GitHub
parent 589ee1cf40
commit b67e4d8bb6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 144 additions and 171 deletions

View file

@ -8,7 +8,7 @@ add_library(dfly_core compact_object.cc dragonfly_core.cc extent_tree.cc
string_set.cc string_map.cc detail/bitpacking.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules
fibers2 ${SEARCH_LIB} TRDP::jsoncons OpenSSL::Crypto)
fibers2 ${SEARCH_LIB} TRDP::jsoncons OpenSSL::Crypto TRDP::dconv)
add_executable(dash_bench dash_bench.cc)
cxx_link(dash_bench dfly_core)

View file

@ -86,7 +86,7 @@ size_t MallocUsedSet(unsigned encoding, void* ptr) {
size_t MallocUsedHSet(unsigned encoding, void* ptr) {
switch (encoding) {
case kEncodingListPack:
return lpBytes(reinterpret_cast<uint8_t*>(ptr));
return zmalloc_usable_size(reinterpret_cast<uint8_t*>(ptr));
case kEncodingStrMap2: {
StringMap* sm = (StringMap*)ptr;
return sm->ObjMallocUsed() + sm->SetMallocUsed();
@ -99,7 +99,7 @@ size_t MallocUsedHSet(unsigned encoding, void* ptr) {
size_t MallocUsedZSet(unsigned encoding, void* ptr) {
switch (encoding) {
case OBJ_ENCODING_LISTPACK:
return lpBytes(reinterpret_cast<uint8_t*>(ptr));
return zmalloc_usable_size(reinterpret_cast<uint8_t*>(ptr));
case OBJ_ENCODING_SKIPLIST: {
detail::SortedMap* ss = (detail::SortedMap*)ptr;
return ss->MallocSize(); // DictMallocSize(zs->dict);
@ -458,7 +458,7 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do
/* Remove and re-insert when score changed. */
if (score != curscore) {
lp = lpDeleteRangeWithEntry(lp, &eptr, 2);
lp = zzlInsert(lp, ele, score);
lp = detail::ZzlInsert(lp, ele, score);
inner_obj_ = lp;
*out_flags |= ZADD_OUT_UPDATED;
}
@ -476,7 +476,8 @@ int RobjWrapper::ZsetAdd(double score, sds ele, int in_flags, int* out_flags, do
inner_obj_ = ss.release();
encoding_ = OBJ_ENCODING_SKIPLIST;
} else {
inner_obj_ = zzlInsert(lp, ele, score);
lp = detail::ZzlInsert(lp, ele, score);
inner_obj_ = lp;
if (newscore)
*newscore = score;
*out_flags |= ZADD_OUT_ADDED;

View file

@ -13,6 +13,8 @@ extern "C" {
#include "redis/zmalloc.h"
}
#include <double-conversion/double-to-string.h>
#include "base/endian.h"
#include "base/flags.h"
#include "base/logging.h"
@ -21,9 +23,6 @@ using namespace std;
ABSL_FLAG(bool, use_zset_tree, true, "If true use b+tree for zset implementation");
extern "C" unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele,
double score);
namespace dfly {
namespace detail {
@ -80,8 +79,141 @@ void* BuildScoredKey(double score, bool is_str_inf, char buf[]) {
return key;
}
// Copied from t_zset.c
/* Returns 1 if the double value can safely be represented in long long without
* precision loss, in which case the corresponding long long is stored in the out variable. */
static int double2ll(double d, long long* out) {
#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL)
/* Check if the float is in a safe range to be casted into a
* long long. We are assuming that long long is 64 bit here.
* Also we are assuming that there are no implementations around where
* double has precision < 52 bit.
*
* Under this assumptions we test if a double is inside a range
* where casting to long long is safe. Then using two castings we
* make sure the decimal part is zero. If all this is true we can use
* integer without precision loss.
*
* Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part,
* and the exponent bits are positive, which means the "decimal" part must be 0.
* i.e. all double values in that range are representable as a long without precision loss,
* but not all long values in that range can be represented as a double.
* we only care about the first part here. */
if (d < (double)(-LLONG_MAX / 2) || d > (double)(LLONG_MAX / 2))
return 0;
long long ll = d;
if (ll == d) {
*out = ll;
return 1;
}
#endif
return 0;
}
/* Compare element in sorted set with given element. */
int zzlCompareElements(unsigned char* eptr, unsigned char* cstr, unsigned int clen) {
unsigned char* vstr;
unsigned int vlen;
long long vlong;
unsigned char vbuf[32];
int minlen, cmp;
vstr = lpGetValue(eptr, &vlen, &vlong);
if (vstr == NULL) {
/* Store string representation of long long in buf. */
vlen = ll2string((char*)vbuf, sizeof(vbuf), vlong);
vstr = vbuf;
}
minlen = (vlen < clen) ? vlen : clen;
cmp = memcmp(vstr, cstr, minlen);
if (cmp == 0)
return vlen - clen;
return cmp;
}
using double_conversion::DoubleToStringConverter;
constexpr unsigned kConvFlags = DoubleToStringConverter::UNIQUE_ZERO;
DoubleToStringConverter score_conv(kConvFlags, "inf", "nan", 'e', -6, 21, 6, 0);
// Copied from redis code but uses double_conversion to encode double values.
unsigned char* zzlInsertAt(unsigned char* zl, unsigned char* eptr, sds ele, double score) {
unsigned char* sptr;
char scorebuf[128];
unsigned scorelen = 0;
long long lscore;
int score_is_long = double2ll(score, &lscore);
if (!score_is_long) {
// Use double converter to get the shortest representation.
double_conversion::StringBuilder sb(scorebuf, sizeof(scorebuf));
score_conv.ToShortest(score, &sb);
scorelen = sb.position();
sb.Finalize();
DCHECK_EQ(scorelen, strlen(scorebuf));
}
if (eptr == NULL) {
zl = lpAppend(zl, (unsigned char*)ele, sdslen(ele));
if (score_is_long)
zl = lpAppendInteger(zl, lscore);
else
zl = lpAppend(zl, (unsigned char*)scorebuf, scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl, (unsigned char*)ele, sdslen(ele), eptr, LP_BEFORE, &sptr);
/* Insert score after the member. */
if (score_is_long)
zl = lpInsertInteger(zl, lscore, sptr, LP_AFTER, NULL);
else
zl = lpInsertString(zl, (unsigned char*)scorebuf, scorelen, sptr, LP_AFTER, NULL);
}
return zl;
}
} // namespace
/* Insert (element,score) pair in listpack. This function assumes the element is
* not yet present in the list. */
unsigned char* ZzlInsert(unsigned char* zl, sds ele, double score) {
unsigned char *eptr = NULL, *sptr = lpSeek(zl, -1);
double s;
// Optimization: check first whether the new element should be the last.
if (sptr != NULL) {
s = zzlGetScore(sptr);
if (s >= score) {
// It should not be the last, so fallback to the forward iteration.
eptr = lpSeek(zl, 0);
}
}
while (eptr != NULL) {
sptr = lpNext(zl, eptr);
serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
return zzlInsertAt(zl, eptr, ele, score);
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr, (unsigned char*)ele, sdslen(ele)) > 0) {
return zzlInsertAt(zl, eptr, ele, score);
}
}
/* Move to next element. */
eptr = lpNext(zl, sptr);
}
/* Push on tail of list when it was not yet inserted. */
return zzlInsertAt(zl, NULL, ele, score);
}
void SortedMap::RdImpl::Init() {
dict = dictCreate(&zsetDictType);
zsl = zslCreate();

View file

@ -270,5 +270,8 @@ class SortedMap {
PMR_NS::memory_resource* mr_res_;
};
// Used by CompactObject.
unsigned char* ZzlInsert(unsigned char* zl, sds ele, double score);
} // namespace detail
} // namespace dfly

View file

@ -56,7 +56,6 @@
* pointers being only at "level 1". This allows to traverse the list
* from tail to head, useful for ZREVRANGE. */
#include <float.h>
#include <math.h>
#include <stdlib.h>
#include <string.h>
@ -80,37 +79,6 @@ static char kMaxStrData[] = "\110" "maxstring";
sds cminstring = kMinStrData + 1;
sds cmaxstring = kMaxStrData + 1;
/* Returns 1 if the double value can safely be represented in long long without
* precision loss, in which case the corresponding long long is stored in the out variable. */
static int double2ll(double d, long long *out) {
#if (DBL_MANT_DIG >= 52) && (DBL_MANT_DIG <= 63) && (LLONG_MAX == 0x7fffffffffffffffLL)
/* Check if the float is in a safe range to be casted into a
* long long. We are assuming that long long is 64 bit here.
* Also we are assuming that there are no implementations around where
* double has precision < 52 bit.
*
* Under this assumptions we test if a double is inside a range
* where casting to long long is safe. Then using two castings we
* make sure the decimal part is zero. If all this is true we can use
* integer without precision loss.
*
* Note that numbers above 2^52 and below 2^63 use all the fraction bits as real part,
* and the exponent bits are positive, which means the "decimal" part must be 0.
* i.e. all double values in that range are representable as a long without precision loss,
* but not all long values in that range can be represented as a double.
* we only care about the first part here. */
if (d < (double)(-LLONG_MAX/2) || d > (double)(LLONG_MAX/2))
return 0;
long long ll = d;
if (ll == d) {
*out = ll;
return 1;
}
#endif
return 0;
}
int zslLexValueGteMin(sds value, const zlexrangespec *spec);
int zslLexValueLteMax(sds value, const zlexrangespec *spec);
@ -772,27 +740,6 @@ sds lpGetObject(unsigned char *sptr) {
}
}
/* Compare element in sorted set with given element. */
int zzlCompareElements(unsigned char *eptr, unsigned char *cstr, unsigned int clen) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
unsigned char vbuf[32];
int minlen, cmp;
vstr = lpGetValue(eptr,&vlen,&vlong);
if (vstr == NULL) {
/* Store string representation of long long in buf. */
vlen = ll2string((char*)vbuf,sizeof(vbuf),vlong);
vstr = vbuf;
}
minlen = (vlen < clen) ? vlen : clen;
cmp = memcmp(vstr,cstr,minlen);
if (cmp == 0) return vlen-clen;
return cmp;
}
unsigned int zzlLength(unsigned char *zl) {
return lpLength(zl)/2;
}
@ -1035,73 +982,6 @@ unsigned char *zzlDelete(unsigned char *zl, unsigned char *eptr) {
return lpDeleteRangeWithEntry(zl,&eptr,2);
}
unsigned char *zzlInsertAt(unsigned char *zl, unsigned char *eptr, sds ele, double score) {
unsigned char *sptr;
char scorebuf[128];
int scorelen;
long long lscore;
int score_is_long = double2ll(score, &lscore);
if (!score_is_long)
scorelen = d2string(scorebuf,sizeof(scorebuf),score);
if (eptr == NULL) {
zl = lpAppend(zl,(unsigned char*)ele,sdslen(ele));
if (score_is_long)
zl = lpAppendInteger(zl,lscore);
else
zl = lpAppend(zl,(unsigned char*)scorebuf,scorelen);
} else {
/* Insert member before the element 'eptr'. */
zl = lpInsertString(zl,(unsigned char*)ele,sdslen(ele),eptr,LP_BEFORE,&sptr);
/* Insert score after the member. */
if (score_is_long)
zl = lpInsertInteger(zl,lscore,sptr,LP_AFTER,NULL);
else
zl = lpInsertString(zl,(unsigned char*)scorebuf,scorelen,sptr,LP_AFTER,NULL);
}
return zl;
}
/* Insert (element,score) pair in listpack. This function assumes the element is
* not yet present in the list. */
unsigned char *zzlInsert(unsigned char *zl, sds ele, double score) {
unsigned char *eptr = NULL, *sptr = lpSeek(zl,-1);
double s;
// Optimization: check first whether the new element should be the last.
if (sptr != NULL) {
s = zzlGetScore(sptr);
if (s >= score) {
// It should not be the last, so fallback to the forward iteration.
eptr = lpSeek(zl,0);
}
}
while (eptr != NULL) {
sptr = lpNext(zl,eptr);
serverAssert(sptr != NULL);
s = zzlGetScore(sptr);
if (s > score) {
/* First element with score larger than score for element to be
* inserted. This means we should take its spot in the list to
* maintain ordering. */
return zzlInsertAt(zl,eptr,ele,score);
} else if (s == score) {
/* Ensure lexicographical ordering for elements. */
if (zzlCompareElements(eptr,(unsigned char*)ele,sdslen(ele)) > 0) {
return zzlInsertAt(zl,eptr,ele,score);
}
}
/* Move to next element. */
eptr = lpNext(zl,sptr);
}
/* Push on tail of list when it was not yet inserted. */
return zzlInsertAt(zl,NULL,ele,score);
}
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, const zrangespec *range, unsigned long *deleted) {
unsigned char *eptr, *sptr;
double score;

View file

@ -40,7 +40,6 @@
#include <stdint.h>
#include <errno.h>
#include <time.h>
#include "util.h"
@ -513,48 +512,6 @@ int string2ld(const char *s, size_t slen, long double *dp) {
return 1;
}
/* Convert a double to a string representation. Returns the number of bytes
* required. The representation should always be parsable by strtod(3).
* This function does not support human-friendly formatting like ld2string
* does. It is intended mainly to be used inside t_zset.c when writing scores
* into a ziplist representing a sorted set. */
int d2string(char *buf, size_t len, double value) {
if (isnan(value)) {
len = snprintf(buf,len,"nan");
} else if (isinf(value)) {
if (value < 0)
len = snprintf(buf,len,"-inf");
else
len = snprintf(buf,len,"inf");
} else if (value == 0) {
/* See: http://en.wikipedia.org/wiki/Signed_zero, "Comparisons". */
if (1.0/value < 0)
len = snprintf(buf,len,"-0");
else
len = snprintf(buf,len,"0");
} else {
#if (DBL_MANT_DIG >= 52) && (LLONG_MAX == 0x7fffffffffffffffLL)
/* Check if the float is in a safe range to be casted into a
* long long. We are assuming that long long is 64 bit here.
* Also we are assuming that there are no implementations around where
* double has precision < 52 bit.
*
* Under this assumptions we test if a double is inside an interval
* where casting to long long is safe. Then using two castings we
* make sure the decimal part is zero. If all this is true we use
* integer printing function that is much faster. */
double min = -4503599627370495; /* (2^52)-1 */
double max = 4503599627370496; /* -(2^52) */
if (value > min && value < max && value == ((double)((long long)value)))
len = ll2string(buf,len,(long long)value);
else
#endif
len = snprintf(buf,len,"%.17g",value);
}
return len;
}
/* Create a string object from a long double.
* If mode is humanfriendly it does not use exponential format and trims trailing
* zeroes at the end (may result in loss of precision).