diff --git a/src/redis/hyperloglog.c b/src/redis/hyperloglog.c index dfdb31e23..83c899d2f 100644 --- a/src/redis/hyperloglog.c +++ b/src/redis/hyperloglog.c @@ -181,8 +181,9 @@ * involved in updating the sparse representation is not justified by the * memory savings. The exact maximum length of the sparse representation * when this implementation switches to the dense representation is - * configured via the define server.hll_sparse_max_bytes. + * configured via the define HLL_SPARSE_MAX_BYTES. */ +size_t HLL_SPARSE_MAX_BYTES = 3000; struct hllhdr { char magic[4]; /* "HYLL" */ @@ -590,6 +591,353 @@ void hllDenseRegHisto(uint8_t* registers, int* reghisto) { /* ================== Sparse representation implementation ================= */ + +/* Convert the HLL with sparse representation given as input in its dense + * representation. Both representations are represented by SDS strings, and + * the input representation is freed as a side effect. + * + * The function returns C_OK if the sparse representation was valid, + * otherwise C_ERR is returned if the representation was corrupted. */ +int hllSparseToDense(sds* hll_ptr) { + sds sparse = *hll_ptr, dense; + struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse; + int idx = 0, runlen, regval; + uint8_t *p = (uint8_t*)sparse, *end = p+sdslen(sparse); + + /* If the representation is already the right one return ASAP. */ + hdr = (struct hllhdr*) sparse; + if (hdr->encoding == HLL_DENSE) return C_OK; + + /* Create a string of the right size filled with zero bytes. + * Note that the cached cardinality is set to 0 as a side effect + * that is exactly the cardinality of an empty HLL. */ + dense = sdsnewlen(NULL,HLL_DENSE_SIZE); + hdr = (struct hllhdr*) dense; + *hdr = *oldhdr; /* This will copy the magic and cached cardinality. */ + hdr->encoding = HLL_DENSE; + + /* Now read the sparse representation and set non-zero registers + * accordingly. */ + p += HLL_HDR_SIZE; + while(p < end) { + if (HLL_SPARSE_IS_ZERO(p)) { + runlen = HLL_SPARSE_ZERO_LEN(p); + idx += runlen; + p++; + } else if (HLL_SPARSE_IS_XZERO(p)) { + runlen = HLL_SPARSE_XZERO_LEN(p); + idx += runlen; + p += 2; + } else { + runlen = HLL_SPARSE_VAL_LEN(p); + regval = HLL_SPARSE_VAL_VALUE(p); + if ((runlen + idx) > HLL_REGISTERS) break; /* Overflow. */ + while(runlen--) { + HLL_DENSE_SET_REGISTER(hdr->registers,idx,regval); + idx++; + } + p++; + } + } + + /* If the sparse representation was valid, we expect to find idx + * set to HLL_REGISTERS. */ + if (idx != HLL_REGISTERS) { + sdsfree(dense); + return C_ERR; + } + + /* Free the old representation and set the new one. */ + sdsfree(*hll_ptr); + *hll_ptr = dense; + return C_OK; +} + +/* Low level function to set the sparse HLL register at 'index' to the + * specified value if the current value is smaller than 'count'. + * + * The object 'hll' is the SDS object holding the HLL. The function requires + * a reference to the object in order to be able to enlarge the string if + * needed. + * + * On success, the function returns 1 if the cardinality changed, or 0 + * if the register for this element was not updated. + * On error (if the representation is invalid) -1 is returned. + * + * As a side effect the function may promote the HLL representation from + * sparse to dense: this happens when a register requires to be set to a value + * not representable with the sparse representation, or when the resulting + * size would be greater than HLL_SPARSE_MAX_BYTES. */ +int hllSparseSet(sds* hll_ptr, long index, uint8_t count, int* promoted) { + struct hllhdr *hdr; + uint8_t oldcount, *sparse, *end, *p, *prev, *next; + long first, span; + long is_zero = 0, is_xzero = 0, is_val = 0, runlen = 0; + + /* If the count is too big to be representable by the sparse representation + * switch to dense representation. */ + if (count > HLL_SPARSE_VAL_MAX_VALUE) goto promote; + + /* When updating a sparse representation, sometimes we may need to enlarge the + * buffer for up to 3 bytes in the worst case (XZERO split into XZERO-VAL-XZERO), + * and the following code does the enlarge job. + * Actually, we use a greedy strategy, enlarge more than 3 bytes to avoid the need + * for future reallocates on incremental growth. But we do not allocate more than + * 'HLL_SPARSE_MAX_BYTES' bytes for the sparse representation. + * If the available size of hyperloglog sds string is not enough for the increment + * we need, we promote the hypreloglog to dense representation in 'step 3'. + */ + sds hll = *hll_ptr; + if (sdsalloc(hll) < HLL_SPARSE_MAX_BYTES && sdsavail(hll) < 3) { + size_t newlen = sdslen(hll) + 3; + newlen += min(newlen, 300); /* Greediness: double 'newlen' if it is smaller than 300, or add 300 to it when it exceeds 300 */ + if (newlen > HLL_SPARSE_MAX_BYTES) + newlen = HLL_SPARSE_MAX_BYTES; + *hll_ptr = sdsResize(hll, newlen); + hll = *hll_ptr; + } + + /* Step 1: we need to locate the opcode we need to modify to check + * if a value update is actually needed. */ + sparse = p = ((uint8_t*)hll) + HLL_HDR_SIZE; + end = p + sdslen(hll) - HLL_HDR_SIZE; + + first = 0; + prev = NULL; /* Points to previous opcode at the end of the loop. */ + next = NULL; /* Points to the next opcode at the end of the loop. */ + span = 0; + while(p < end) { + long oplen; + + /* Set span to the number of registers covered by this opcode. + * + * This is the most performance critical loop of the sparse + * representation. Sorting the conditionals from the most to the + * least frequent opcode in many-bytes sparse HLLs is faster. */ + oplen = 1; + if (HLL_SPARSE_IS_ZERO(p)) { + span = HLL_SPARSE_ZERO_LEN(p); + } else if (HLL_SPARSE_IS_VAL(p)) { + span = HLL_SPARSE_VAL_LEN(p); + } else { /* XZERO. */ + span = HLL_SPARSE_XZERO_LEN(p); + oplen = 2; + } + /* Break if this opcode covers the register as 'index'. */ + if (index <= first+span-1) break; + prev = p; + p += oplen; + first += span; + } + if (span == 0 || p >= end) return -1; /* Invalid format. */ + + next = HLL_SPARSE_IS_XZERO(p) ? p+2 : p+1; + if (next >= end) next = NULL; + + /* Cache current opcode type to avoid using the macro again and + * again for something that will not change. + * Also cache the run-length of the opcode. */ + if (HLL_SPARSE_IS_ZERO(p)) { + is_zero = 1; + runlen = HLL_SPARSE_ZERO_LEN(p); + } else if (HLL_SPARSE_IS_XZERO(p)) { + is_xzero = 1; + runlen = HLL_SPARSE_XZERO_LEN(p); + } else { + is_val = 1; + runlen = HLL_SPARSE_VAL_LEN(p); + } + + /* Step 2: After the loop: + * + * 'first' stores to the index of the first register covered + * by the current opcode, which is pointed by 'p'. + * + * 'next' ad 'prev' store respectively the next and previous opcode, + * or NULL if the opcode at 'p' is respectively the last or first. + * + * 'span' is set to the number of registers covered by the current + * opcode. + * + * There are different cases in order to update the data structure + * in place without generating it from scratch: + * + * A) If it is a VAL opcode already set to a value >= our 'count' + * no update is needed, regardless of the VAL run-length field. + * In this case PFADD returns 0 since no changes are performed. + * + * B) If it is a VAL opcode with len = 1 (representing only our + * register) and the value is less than 'count', we just update it + * since this is a trivial case. */ + if (is_val) { + oldcount = HLL_SPARSE_VAL_VALUE(p); + /* Case A. */ + if (oldcount >= count) return 0; + + /* Case B. */ + if (runlen == 1) { + HLL_SPARSE_VAL_SET(p,count,1); + goto updated; + } + } + + /* C) Another trivial to handle case is a ZERO opcode with a len of 1. + * We can just replace it with a VAL opcode with our value and len of 1. */ + if (is_zero && runlen == 1) { + HLL_SPARSE_VAL_SET(p,count,1); + goto updated; + } + + /* D) General case. + * + * The other cases are more complex: our register requires to be updated + * and is either currently represented by a VAL opcode with len > 1, + * by a ZERO opcode with len > 1, or by an XZERO opcode. + * + * In those cases the original opcode must be split into multiple + * opcodes. The worst case is an XZERO split in the middle resulting into + * XZERO - VAL - XZERO, so the resulting sequence max length is + * 5 bytes. + * + * We perform the split writing the new sequence into the 'new' buffer + * with 'newlen' as length. Later the new sequence is inserted in place + * of the old one, possibly moving what is on the right a few bytes + * if the new sequence is longer than the older one. */ + uint8_t seq[5], *n = seq; + int last = first+span-1; /* Last register covered by the sequence. */ + int len; + + if (is_zero || is_xzero) { + /* Handle splitting of ZERO / XZERO. */ + if (index != first) { + len = index-first; + if (len > HLL_SPARSE_ZERO_MAX_LEN) { + HLL_SPARSE_XZERO_SET(n,len); + n += 2; + } else { + HLL_SPARSE_ZERO_SET(n,len); + n++; + } + } + HLL_SPARSE_VAL_SET(n,count,1); + n++; + if (index != last) { + len = last-index; + if (len > HLL_SPARSE_ZERO_MAX_LEN) { + HLL_SPARSE_XZERO_SET(n,len); + n += 2; + } else { + HLL_SPARSE_ZERO_SET(n,len); + n++; + } + } + } else { + /* Handle splitting of VAL. */ + int curval = HLL_SPARSE_VAL_VALUE(p); + + if (index != first) { + len = index-first; + HLL_SPARSE_VAL_SET(n,curval,len); + n++; + } + HLL_SPARSE_VAL_SET(n,count,1); + n++; + if (index != last) { + len = last-index; + HLL_SPARSE_VAL_SET(n,curval,len); + n++; + } + } + + /* Step 3: substitute the new sequence with the old one. + * + * Note that we already allocated space on the sds string + * calling sdsResize(). */ + int seqlen = n-seq; + int oldlen = is_xzero ? 2 : 1; + int deltalen = seqlen-oldlen; + + if (deltalen > 0 && + sdslen(hll) + deltalen > HLL_SPARSE_MAX_BYTES) goto promote; + serverAssert(sdslen(hll) + deltalen <= sdsalloc(hll)); + if (deltalen && next) memmove(next+deltalen,next,end-next); + sdsIncrLen(hll,deltalen); + memcpy(p,seq,seqlen); + end += deltalen; + +updated: + /* Step 4: Merge adjacent values if possible. + * + * The representation was updated, however the resulting representation + * may not be optimal: adjacent VAL opcodes can sometimes be merged into + * a single one. */ + p = prev ? prev : sparse; + int scanlen = 5; /* Scan up to 5 upcodes starting from prev. */ + while (p < end && scanlen--) { + if (HLL_SPARSE_IS_XZERO(p)) { + p += 2; + continue; + } else if (HLL_SPARSE_IS_ZERO(p)) { + p++; + continue; + } + /* We need two adjacent VAL opcodes to try a merge, having + * the same value, and a len that fits the VAL opcode max len. */ + if (p+1 < end && HLL_SPARSE_IS_VAL(p+1)) { + int v1 = HLL_SPARSE_VAL_VALUE(p); + int v2 = HLL_SPARSE_VAL_VALUE(p+1); + if (v1 == v2) { + int len = HLL_SPARSE_VAL_LEN(p)+HLL_SPARSE_VAL_LEN(p+1); + if (len <= HLL_SPARSE_VAL_MAX_LEN) { + HLL_SPARSE_VAL_SET(p+1,v1,len); + memmove(p,p+1,end-p); + sdsIncrLen(hll,-1); + end--; + /* After a merge we reiterate without incrementing 'p' + * in order to try to merge the just merged value with + * a value on its right. */ + continue; + } + } + } + p++; + } + + /* Invalidate the cached cardinality. */ + hdr = (struct hllhdr *)hll; + HLL_INVALIDATE_CACHE(hdr); + return 1; + +promote: /* Promote to dense representation. */ + if (hllSparseToDense(&hll) == C_ERR) return -1; /* Corrupted HLL. */ + *hll_ptr = hll; + hdr = (struct hllhdr *)hll; + + /* We need to call hllDenseAdd() to perform the operation after the + * conversion. However the result must be 1, since if we need to + * convert from sparse to dense a register requires to be updated. + * + * Note that this in turn means that PFADD will make sure the command + * is propagated to slaves / AOF, so if there is a sparse -> dense + * conversion, it will be performed in all the slaves as well. */ + int dense_retval = hllDenseSet(hdr->registers,index,count); + serverAssert(dense_retval == 1); + *promoted = 1; + return dense_retval; +} + +/* "Add" the element in the sparse hyperloglog data structure. + * Actually nothing is added, but the max 0 pattern counter of the subset + * the element belongs to is incremented if needed. + * + * This function is actually a wrapper for hllSparseSet(), it only performs + * the hashing of the element to obtain the index and zeros run length. */ +int hllSparseAdd(sds* hll_ptr, unsigned char *ele, size_t elesize, int* promoted) { + long index; + uint8_t count = hllPatLen(ele,elesize,&index); + /* Update the register if this element produced a longer run of zeroes. */ + return hllSparseSet(hll_ptr,index,count, promoted); +} /* Compute the register histogram in the sparse representation. */ void hllSparseRegHisto(uint8_t* sparse, int sparselen, int* invalid, int* reghisto) { int idx = 0, runlen, regval; @@ -858,6 +1206,37 @@ size_t getDenseHllSize() { return HLL_DENSE_SIZE; } +size_t getSparseHllInitSize() { + return HLL_HDR_SIZE + (((HLL_REGISTERS+(HLL_SPARSE_XZERO_MAX_LEN-1)) / + HLL_SPARSE_XZERO_MAX_LEN)*2); +} + +int initSparseHll(struct HllBufferPtr hll_ptr) { + if (hll_ptr.size != getSparseHllInitSize()) { + return C_ERR; + } + + memset(hll_ptr.hll, 0, hll_ptr.size); + + /* Populate the sparse representation with as many XZERO opcodes as + * needed to represent all the registers. */ + int aux = HLL_REGISTERS; + uint8_t* p = (uint8_t*)hll_ptr.hll + HLL_HDR_SIZE; + while(aux) { + int xzero = HLL_SPARSE_XZERO_MAX_LEN; + if (xzero > aux) xzero = aux; + HLL_SPARSE_XZERO_SET(p,xzero); + p += 2; + aux -= xzero; + } + + struct hllhdr* hdr = (struct hllhdr*)hll_ptr.hll; + + memcpy(hdr->magic, "HYLL", 4); + hdr->encoding = HLL_SPARSE; + return C_OK; +} + int createDenseHll(struct HllBufferPtr hll_ptr) { if (hll_ptr.size != getDenseHllSize()) { return C_ERR; @@ -922,7 +1301,19 @@ int convertSparseToDenseHll(struct HllBufferPtr in_hll, struct HllBufferPtr out_ return C_OK; } -int pfadd(struct HllBufferPtr hll_ptr, unsigned char* value, size_t size) { +int pfadd_sparse(sds* hll_ptr, unsigned char* value, size_t size, int* promoted) { + struct hllhdr* hdr = (struct hllhdr*)(*hll_ptr); + int retval = hllSparseAdd(hll_ptr, value, size, promoted); + switch (retval) { + case 1: + HLL_INVALIDATE_CACHE(hdr); + return 1; + default: + return retval; + } +} + +int pfadd_dense(struct HllBufferPtr hll_ptr, unsigned char* value, size_t size) { if (isValidHLL(hll_ptr) != HLL_VALID_DENSE) return C_ERR; diff --git a/src/redis/hyperloglog.h b/src/redis/hyperloglog.h index 8eeb81635..2fd3fd9ee 100644 --- a/src/redis/hyperloglog.h +++ b/src/redis/hyperloglog.h @@ -23,10 +23,15 @@ struct HllBufferPtr { size_t size; }; +extern size_t HLL_SPARSE_MAX_BYTES; + enum HllValidness isValidHLL(struct HllBufferPtr hll_ptr); size_t getDenseHllSize(); +size_t getSparseHllInitSize(); + +int initSparseHll(struct HllBufferPtr hll_ptr); /* Writes into `hll_ptr` an empty dense-encoded HLL. * Returns 0 upon success, or a negative number when `hll_ptr.size` is different from * getDenseHllSize() */ @@ -41,7 +46,8 @@ int convertSparseToDenseHll(struct HllBufferPtr in_hll, struct HllBufferPtr out_ /* Adds `value` of size `size`, to `hll_ptr`. * If `obj` does not have an underlying type of HLL a negative number is returned. */ -int pfadd(struct HllBufferPtr hll_ptr, unsigned char* value, size_t size); +int pfadd_sparse(sds* hll_ptr, unsigned char* value, size_t size, int* promoted); +int pfadd_dense(struct HllBufferPtr hll_ptr, unsigned char* value, size_t size); /* Returns the estimated count of elements for `hll_ptr`. * If `hll_ptr` is not a valid dense-encoded HLL, a negative number is returned. */ diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index 5526a72c2..71ad0b586 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -75,26 +75,52 @@ OpResult AddToHll(const OpArgs& op_args, string_view key, CmdArgList values RETURN_ON_BAD_STATUS(op_res); auto& res = *op_res; if (res.is_new) { - hll.resize(getDenseHllSize()); - createDenseHll(StringToHllPtr(hll)); + hll.resize(getSparseHllInitSize()); + initSparseHll(StringToHllPtr(hll)); } else if (res.it->second.ObjType() != OBJ_STRING) { return OpStatus::WRONG_TYPE; } else { res.it->second.GetString(&hll); - ConvertToDenseIfNeeded(&hll); + } + if (isValidHLL(StringToHllPtr(hll)) == HLL_INVALID) { + return OpStatus::INVALID_VALUE; } int updated = 0; + bool is_sparse = isValidHLL(StringToHllPtr(hll)) == HLL_VALID_SPARSE; + sds hll_sds; + if (is_sparse) { + hll_sds = sdsnewlen(hll.data(), hll.size()); + } + for (const auto& value : values) { - int added = pfadd(StringToHllPtr(hll), (unsigned char*)value.data(), value.size()); + int added; + if (is_sparse) { + // Inserting to sparse hll might extend it. + // We can't use std::string with sds + // `promoted` will be assigned 1 if sparse hll was promoted to dense + int promoted = 0; + added = pfadd_sparse(&hll_sds, (unsigned char*)value.data(), value.size(), &promoted); + if (promoted == 1) { + is_sparse = false; + hll = string{hll_sds, sdslen(hll_sds)}; + sdsfree(hll_sds); + DCHECK_EQ(isValidHLL(StringToHllPtr(hll)), HLL_VALID_DENSE); + } + } else { + added = pfadd_dense(StringToHllPtr(hll), (unsigned char*)value.data(), value.size()); + } if (added < 0) { return OpStatus::INVALID_VALUE; } updated += added; } + if (is_sparse) { + hll = string{hll_sds, sdslen(hll_sds)}; + sdsfree(hll_sds); + } res.it->second.SetString(hll); - return std::min(updated, 1); } diff --git a/src/server/hll_family_test.cc b/src/server/hll_family_test.cc index 33117ccdb..3c6dac7c3 100644 --- a/src/server/hll_family_test.cc +++ b/src/server/hll_family_test.cc @@ -18,6 +18,9 @@ namespace dfly { class HllFamilyTest : public BaseFamilyTest { protected: + std::string GenerateUniqueValue(int index) { + return "Value_{" + std::to_string(index) + "}"; + } }; TEST_F(HllFamilyTest, Simple) { @@ -26,6 +29,29 @@ TEST_F(HllFamilyTest, Simple) { EXPECT_EQ(CheckedInt({"pfcount", "key"}), 1); } +TEST_F(HllFamilyTest, Promote) { + int unique_values = 20000; + // Sparse hll is promoted to dense at the 1660th+- insertion + // This value varies if any parameter in hyperloglog.c changes. + int promote_i = 1660; + // Keep consistent with hyperloglog.c + int kHllSparseMaxBytes = 3000; + int kHllDenseSize = 12304; + for (int i = 0; i < unique_values; ++i) { + std::string newkey = GenerateUniqueValue(i); + Run({"pfadd", "key", newkey}); + if (i < promote_i) { + EXPECT_LT(CheckedInt({"strlen", "key"}), kHllSparseMaxBytes + 1); + } else { + EXPECT_EQ(CheckedInt({"strlen", "key"}), kHllDenseSize); + } + } + // HyperLogLog computations come with a + // margin of error, with a standard error rate of 0.81%. + // Set it to 5% so this test won't fail unless something went wrong badly. + EXPECT_LT(std::abs(CheckedInt({"pfcount", "key"}) - unique_values * 1.0) / unique_values, 0.05); +} + TEST_F(HllFamilyTest, MultipleValues) { EXPECT_EQ(CheckedInt({"pfadd", "key", "1", "2", "3"}), 1); EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3); @@ -45,6 +71,41 @@ TEST_F(HllFamilyTest, MultipleValues) { EXPECT_EQ(CheckedInt({"pfcount", "key"}), 5); } +TEST_F(HllFamilyTest, MultipleValues_random) { + int insertions = 20000; + int unique_values = 0; + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(1, 20); + // cumulated pfadd result + for (int i = 0; i < insertions; ++i) { + // Number of values to insert + int num_values = dis(gen); + unique_values += num_values; + + // Prepare the command + std::vector values; + values.reserve(num_values + 2); + values.push_back("pfadd"); + values.push_back("key"); + + // Generate and add unique values to the command + for (int j = 0; j < num_values; ++j) { + values.push_back(GenerateUniqueValue(i * 20 + j)); + } + + std::vector commandViews; + for (const auto& val : values) { + commandViews.push_back(val); + } + Run(commandViews); + } + // HyperLogLog computations come with a + // margin of error, with a standard error rate of 0.81%. + // Set it to 5% so this test won't fail unless something went wrong badly. + EXPECT_LT(std::abs(CheckedInt({"pfcount", "key"}) - unique_values * 1.0) / unique_values, 0.05); +} + TEST_F(HllFamilyTest, AddInvalid) { EXPECT_EQ(Run({"set", "key", "..."}), "OK"); EXPECT_THAT(Run({"pfadd", "key", "1"}), ErrArg(HllFamily::kInvalidHllErr));