feat(server): Add support for PFADD and PFCOUNT (#1152)

* feat(server): Add support for PFADD and PFCOUNT

This version does not create sparse-encoded HLLs, however it is fully compatible with such ones created by Redis as it converts them to the dense encoding.

Note that PFMERGE is not yet implemented.

* Set small string optimization to be 2^13 instead of 2^15.

This will allow dense-encoded HLL to *not* fit within the small string,
which will make it contiguous in memory, thus GetSlice() will not
allocate.

---------

Signed-off-by: chakaz <chakaz@chakaz>
Co-authored-by: chakaz <chakaz@chakaz>
This commit is contained in:
Chaka 2023-04-30 00:50:11 +03:00 committed by GitHub
parent b09a36d553
commit fa39c1890d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 1509 additions and 5 deletions

View file

@ -678,7 +678,7 @@ void CompactObj::SetString(std::string_view str) {
} }
if (kUseSmallStrings) { if (kUseSmallStrings) {
if ((taglen_ == 0 && encoded.size() < (1 << 15))) { if ((taglen_ == 0 && encoded.size() < (1 << 13))) {
SetMeta(SMALL_TAG, mask); SetMeta(SMALL_TAG, mask);
tl.small_str_bytes += u_.small_str.Assign(encoded); tl.small_str_bytes += u_.small_str.Assign(encoded);
return; return;

View file

@ -11,7 +11,7 @@ endif()
add_library(redis_lib crc64.c crcspeed.c debug.c dict.c intset.c add_library(redis_lib crc64.c crcspeed.c debug.c dict.c intset.c
listpack.c mt19937-64.c object.c lzf_c.c lzf_d.c sds.c listpack.c mt19937-64.c object.c lzf_c.c lzf_d.c sds.c
quicklist.c rax.c redis_aux.c siphash.c t_hash.c t_stream.c t_zset.c quicklist.c rax.c redis_aux.c siphash.c t_hash.c t_stream.c t_zset.c
util.c ziplist.c ${ZMALLOC_SRC}) util.c ziplist.c hyperloglog.c ${ZMALLOC_SRC})
cxx_link(redis_lib ${ZMALLOC_DEPS}) cxx_link(redis_lib ${ZMALLOC_DEPS})

1070
src/redis/hyperloglog.c Normal file

File diff suppressed because it is too large Load diff

54
src/redis/hyperloglog.h Normal file
View file

@ -0,0 +1,54 @@
#ifndef __REDIS_HYPERLOGLOG_H
#define __REDIS_HYPERLOGLOG_H
#include <stddef.h>
#include <stdint.h>
#include "redis/sds.h"
/* This version of hyperloglog, forked from Redis, only supports using the dense format of HLL.
* The reason is that it is of a fixed size, which makes it easier to integrate into Dragonfly.
* We do support converting of existing sprase-encoded HLL into dense-encoded, which can be useful
* for replication, serialization, etc. */
enum HllValidness {
HLL_INVALID,
HLL_VALID_SPARSE,
HLL_VALID_DENSE,
};
/* Convenience struct for pointing to an Hll buffer along with its size */
struct HllBufferPtr {
unsigned char* hll;
size_t size;
};
enum HllValidness isValidHLL(struct HllBufferPtr hll_ptr);
size_t getDenseHllSize();
/* Writes into `hll_ptr` an empty dense-encoded HLL.
* Returns 0 upon success, or a negative number when `hll_ptr.size` is different from
* getDenseHllSize() */
int createDenseHll(struct HllBufferPtr hll_ptr);
/* Converts an existing sparse-encoded HLL pointed by `in_hll`, and writes the converted result into
* `out_hll`.
* Returns 0 upon success, otherwise a negative number.
* Failures can occur when `out_hll.size` is different from getDenseHllSize() or when input is not a
* valid sparse-encoded HLL. */
int convertSparseToDenseHll(struct HllBufferPtr in_hll, struct HllBufferPtr out_hll);
/* Adds `value` of size `size`, to `hll_ptr`.
* If `obj` does not have an underlying type of HLL a negative number is returned. */
int pfadd(struct HllBufferPtr hll_ptr, unsigned char* value, size_t size);
/* Returns the estimated count of elements for `hll_ptr`.
* If `hll_ptr` is not a valid dense-encoded HLL, a negative number is returned. */
int64_t pfcountSingle(struct HllBufferPtr hll_ptr);
/* Returns the estimated count for all HLLs in `hlls` array of size `hlls_count`.
* All `hlls` elements must be valid, dense-encoded HLLs. */
int64_t pfcountMulti(struct HllBufferPtr* hlls, size_t hlls_count);
#endif

View file

@ -22,7 +22,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
set_family.cc stream_family.cc string_family.cc set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
top_keys.cc multi_command_squasher.cc) top_keys.cc multi_command_squasher.cc hll_family.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib
absl::random_random TRDP::jsoncons zstd TRDP::lz4) absl::random_random TRDP::jsoncons zstd TRDP::lz4)
@ -40,7 +40,7 @@ cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(bitops_family_test dfly_test_lib LABELS DFLY) cxx_test(bitops_family_test dfly_test_lib LABELS DFLY)
cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb
testdata/redis6_stream.rdb LABELS DFLY) testdata/redis6_stream.rdb testdata/hll.rdb LABELS DFLY)
cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY) cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY) cxx_test(snapshot_test dragonfly_lib LABELS DFLY)
@ -48,8 +48,10 @@ cxx_test(json_family_test dfly_test_lib LABELS DFLY)
cxx_test(journal_test dfly_test_lib LABELS DFLY) cxx_test(journal_test dfly_test_lib LABELS DFLY)
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY) cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
cxx_test(top_keys_test dfly_test_lib LABELS DFLY) cxx_test(top_keys_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test journal_test generic_family_test memcache_parser_test rdb_test journal_test
redis_parser_test snapshot_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test) redis_parser_test snapshot_test stream_family_test string_family_test
bitops_family_test set_family_test zset_family_test hll_family_test)

225
src/server/hll_family.cc Normal file
View file

@ -0,0 +1,225 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/hll_family.h"
extern "C" {
#include "redis/hyperloglog.h"
}
#include "base/logging.h"
#include "base/stl_util.h"
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"
namespace dfly {
using namespace std;
using namespace facade;
namespace {
template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) {
static_assert(std::is_integral<T>::value,
"we are only handling types that are integral types in the return types from "
"here");
if (result) {
(*cntx)->SendLong(result.value());
} else {
switch (result.status()) {
case OpStatus::WRONG_TYPE:
(*cntx)->SendError(kWrongTypeErr);
break;
case OpStatus::OUT_OF_MEMORY:
(*cntx)->SendError(kOutOfMemory);
break;
case OpStatus::INVALID_VALUE:
(*cntx)->SendError(HllFamily::kInvalidHllErr);
break;
default:
(*cntx)->SendLong(0); // in case we don't have the value we should just send 0
break;
}
}
}
HllBufferPtr StringToHllPtr(string_view hll) {
return {.hll = (unsigned char*)hll.data(), .size = hll.size()};
}
void ConvertToDenseIfNeeded(string* hll) {
if (isValidHLL(StringToHllPtr(*hll)) == HLL_VALID_SPARSE) {
string new_hll;
new_hll.resize(getDenseHllSize());
int result = convertSparseToDenseHll(StringToHllPtr(*hll), StringToHllPtr(new_hll));
DCHECK_EQ(result, 0);
*hll = std::move(new_hll);
}
}
OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values) {
auto& db_slice = op_args.shard->db_slice();
string hll;
try {
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
if (inserted) {
hll.resize(getDenseHllSize());
createDenseHll(StringToHllPtr(hll));
} else if (it->second.ObjType() != OBJ_STRING) {
return OpStatus::WRONG_TYPE;
} else {
it->second.GetString(&hll);
ConvertToDenseIfNeeded(&hll);
}
int updated = 0;
for (const auto& value : values) {
int added = pfadd(StringToHllPtr(hll), (unsigned char*)value.data(), value.size());
if (added < 0) {
return OpStatus::INVALID_VALUE;
}
updated += added;
}
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
it->second.SetString(hll);
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !inserted);
return std::min(updated, 1);
} catch (const std::bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
}
void PFAdd(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
auto cb = [&](Transaction* t, EngineShard* shard) {
return AddToHll(t->GetOpArgs(shard), key, args);
};
Transaction* trans = cntx->transaction;
OpResult<int> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
}
OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it = db_slice.Find(op_args.db_cntx, key, OBJ_STRING);
if (it.ok()) {
string hll;
string_view hll_view = it.value()->second.GetSlice(&hll);
switch (isValidHLL(StringToHllPtr(hll_view))) {
case HLL_VALID_DENSE:
break;
case HLL_VALID_SPARSE:
// Even in the case of a read - we still want to convert the hll to dense format, as it
// could originate in Redis (like in replication or rdb load).
hll = hll_view;
ConvertToDenseIfNeeded(&hll);
hll_view = hll;
break;
case HLL_INVALID:
default:
return OpStatus::INVALID_VALUE;
}
return pfcountSingle(StringToHllPtr(hll_view));
} else if (it.status() == OpStatus::WRONG_TYPE) {
return it.status();
} else {
// Non existing keys count as 0.
return 0;
}
}
vector<OpResult<string>> ReadValues(const OpArgs& op_args, ArgSlice keys) {
vector<OpResult<string>> values;
for (size_t i = 0; i < keys.size(); ++i) {
OpResult<PrimeIterator> it =
op_args.shard->db_slice().Find(op_args.db_cntx, keys[i], OBJ_STRING);
if (it.ok()) {
string hll;
it.value()->second.GetString(&hll);
ConvertToDenseIfNeeded(&hll);
if (isValidHLL(StringToHllPtr(hll)) != HLL_VALID_DENSE) {
values.push_back(OpStatus::INVALID_VALUE);
} else {
values.push_back(std::move(hll));
}
} else if (it.status() == OpStatus::WRONG_TYPE) {
values.push_back(OpStatus::WRONG_TYPE);
}
}
return values;
}
OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
vector<vector<OpResult<string>>> hlls;
hlls.resize(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
ArgSlice shard_args = t->GetShardArgs(shard->shard_id());
hlls[sid] = ReadValues(t->GetOpArgs(shard), shard_args);
return OpStatus::OK;
};
Transaction* trans = cntx->transaction;
trans->ScheduleSingleHop(std::move(cb));
vector<HllBufferPtr> ptrs;
ptrs.reserve(hlls.size());
for (auto& shard_hlls : hlls) {
for (auto& hll : shard_hlls) {
if (!hll.ok()) {
return hll.status();
}
ptrs.push_back(StringToHllPtr(hll.value()));
}
}
int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size());
if (pf_count < 0) {
return OpStatus::INVALID_VALUE;
} else {
return pf_count;
}
}
void PFCount(CmdArgList args, ConnectionContext* cntx) {
if (args.size() == 1) {
string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) {
return CountHllsSingle(t->GetOpArgs(shard), key);
};
Transaction* trans = cntx->transaction;
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
HandleOpValueResult(res, cntx);
} else {
HandleOpValueResult(PFCountMulti(args, cntx), cntx);
}
}
} // namespace
void HllFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
*registry << CI{"PFADD", CO::WRITE, -3, 1, 1, 1}.SetHandler(PFAdd)
<< CI{"PFCOUNT", CO::WRITE, -2, 1, -1, 1}.SetHandler(PFCount);
}
const char HllFamily::kInvalidHllErr[] = "Key is not a valid HyperLogLog string value.";
} // namespace dfly

27
src/server/hll_family.h Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
/// @brief This would implement HLL (HyperLogLog, aka PF) related commands: PFADD, PFCOUNT, PFMERGE
/// For more details about these command see:
/// PFADD: https://redis.io/commands/pfadd/
/// PFCOUNT: https://redis.io/commands/pfcount/
/// PFMERGE: https://redis.io/commands/pfmerge/
namespace dfly {
class CommandRegistry;
class HllFamily {
public:
/// @brief Register the function that would be called to operate on user commands.
/// @param registry The location to which the handling functions would be registered.
///
/// We are assuming that this would have a valid registry to work on (i.e this do not point to
/// null!).
static void Register(CommandRegistry* registry);
static const char kInvalidHllErr[];
};
} // namespace dfly

View file

@ -0,0 +1,98 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/hll_family.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/command_registry.h"
#include "server/test_utils.h"
using namespace testing;
using namespace std;
using namespace util;
namespace dfly {
class HllFamilyTest : public BaseFamilyTest {
protected:
};
TEST_F(HllFamilyTest, Simple) {
EXPECT_EQ(CheckedInt({"pfadd", "key", "1"}), 1);
EXPECT_EQ(CheckedInt({"pfadd", "key", "1"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 1);
}
TEST_F(HllFamilyTest, MultipleValues) {
EXPECT_EQ(CheckedInt({"pfadd", "key", "1", "2", "3"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key", "1", "2", "3"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key", "1"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key", "2"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key", "3"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key", "3", "4"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 4);
EXPECT_EQ(CheckedInt({"pfadd", "key", "5"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 5);
EXPECT_EQ(CheckedInt({"pfadd", "key", "1", "2", "3", "4", "5"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key"}), 5);
}
TEST_F(HllFamilyTest, AddInvalid) {
EXPECT_EQ(Run({"set", "key", "..."}), "OK");
EXPECT_THAT(Run({"pfadd", "key", "1"}), ErrArg(HllFamily::kInvalidHllErr));
EXPECT_THAT(Run({"pfcount", "key"}), ErrArg(HllFamily::kInvalidHllErr));
}
TEST_F(HllFamilyTest, OtherType) {
Run({"zadd", "key", "1", "a"});
EXPECT_THAT(Run({"pfadd", "key", "1"}),
ErrArg("Operation against a key holding the wrong kind of value"));
EXPECT_THAT(Run({"pfcount", "key"}),
ErrArg("Operation against a key holding the wrong kind of value"));
}
TEST_F(HllFamilyTest, CountEmpty) {
EXPECT_EQ(CheckedInt({"pfcount", "nonexisting"}), 0);
}
TEST_F(HllFamilyTest, CountInvalid) {
EXPECT_EQ(Run({"set", "key", "..."}), "OK");
EXPECT_THAT(Run({"pfcount", "key"}), ErrArg(HllFamily::kInvalidHllErr));
}
TEST_F(HllFamilyTest, CountMultiple) {
EXPECT_EQ(CheckedInt({"pfadd", "key1", "1", "2", "3"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key1"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key2", "1", "2", "3"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key2"}), 3);
EXPECT_EQ(CheckedInt({"pfadd", "key3", "2", "3"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key3"}), 2);
EXPECT_EQ(CheckedInt({"pfadd", "key4", "4", "5"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", "key4"}), 2);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key4"}), 5);
EXPECT_EQ(CheckedInt({"pfcount", "non-existing-key1", "non-existing-key2"}), 0);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "non-existing-key"}), 3);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key2"}), 3);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key3"}), 3);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key2", "key3"}), 3);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key2", "key3", "key4"}), 5);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key2", "key3", "key4", "non-existing"}), 5);
EXPECT_EQ(CheckedInt({"pfcount", "key1", "key4"}), 5);
}
} // namespace dfly

View file

@ -25,6 +25,7 @@ extern "C" {
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/error.h" #include "server/error.h"
#include "server/generic_family.h" #include "server/generic_family.h"
#include "server/hll_family.h"
#include "server/hset_family.h" #include "server/hset_family.h"
#include "server/json_family.h" #include "server/json_family.h"
#include "server/list_family.h" #include "server/list_family.h"
@ -1761,6 +1762,7 @@ void Service::RegisterCommands() {
ZSetFamily::Register(&registry_); ZSetFamily::Register(&registry_);
JsonFamily::Register(&registry_); JsonFamily::Register(&registry_);
BitOpsFamily::Register(&registry_); BitOpsFamily::Register(&registry_);
HllFamily::Register(&registry_);
server_family_.Register(&registry_); server_family_.Register(&registry_);

View file

@ -346,4 +346,30 @@ TEST_F(RdbTest, JsonTest) {
} }
} }
// hll.rdb has 2 keys: "key-dense" and "key-sparse", both are HLL with a single added value "1".
class HllRdbTest : public RdbTest, public testing::WithParamInterface<string> {};
TEST_P(HllRdbTest, Hll) {
io::FileSource fs = GetSource("hll.rdb");
RdbLoader loader{service_.get()};
// must run in proactor thread in order to avoid polluting the serverstate
// in the main, testing thread.
auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); });
ASSERT_FALSE(ec) << ec.message();
EXPECT_EQ(CheckedInt({"pfcount", GetParam()}), 1);
EXPECT_EQ(CheckedInt({"pfcount", GetParam(), "non-existing"}), 1);
EXPECT_EQ(CheckedInt({"pfadd", "key2", "2"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", GetParam(), "key2"}), 2);
EXPECT_EQ(CheckedInt({"pfadd", GetParam(), "2"}), 1);
EXPECT_EQ(CheckedInt({"pfcount", GetParam()}), 2);
}
INSTANTIATE_TEST_SUITE_P(HllRdbTest, HllRdbTest, Values("key-sparse", "key-dense"));
} // namespace dfly } // namespace dfly

BIN
src/server/testdata/hll.rdb vendored Normal file

Binary file not shown.