diff --git a/src/core/bloom.cc b/src/core/bloom.cc index 91a4615eb..9fc91025c 100644 --- a/src/core/bloom.cc +++ b/src/core/bloom.cc @@ -146,6 +146,17 @@ SBF::~SBF() { f.Destroy(mr); } +SBF& SBF::operator=(SBF&& src) { + filters_.clear(); + filters_.swap(src.filters_); + grow_factor_ = src.grow_factor_; + fp_prob_ = src.fp_prob_; + current_size_ = src.current_size_; + max_capacity_ = src.max_capacity_; + + return *this; +} + bool SBF::Add(std::string_view str) { DCHECK_LT(current_size_, max_capacity_); diff --git a/src/core/bloom.h b/src/core/bloom.h index f8a73afe7..98111adb1 100644 --- a/src/core/bloom.h +++ b/src/core/bloom.h @@ -72,10 +72,14 @@ class Bloom { * TODO: to test the actual rate of this filter. */ class SBF { + SBF(const SBF&) = delete; + public: SBF(uint64_t initial_capacity, double fp_prob, double grow_factor, PMR_NS::memory_resource* mr); ~SBF(); + SBF& operator=(SBF&& src); + bool Add(std::string_view str); bool Exists(std::string_view str) const; diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 77852ec25..cf2aaff7c 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -25,6 +25,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "base/pod_array.h" +#include "core/bloom.h" #include "core/detail/bitpacking.h" #include "core/sorted_map.h" #include "core/string_map.h" @@ -631,6 +632,10 @@ unsigned CompactObj::ObjType() const { return OBJ_JSON; } + if (taglen_ == SBF_TAG) { + return OBJ_SBF; + } + LOG(FATAL) << "TBD " << int(taglen_); return 0; } @@ -649,6 +654,7 @@ string_view CompactObj::ObjTypeToString(unsigned type) { OBJECT_TYPE_CASE(OBJ_MODULE); OBJECT_TYPE_CASE(OBJ_STREAM); OBJECT_TYPE_CASE(OBJ_JSON); + OBJECT_TYPE_CASE(OBJ_SBF); default: DCHECK(false) << "Unknown object type " << type; return "OTHER"; @@ -707,6 +713,16 @@ void CompactObj::SetJson(JsonType&& j) { } } +void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor) { + if (taglen_ == SBF_TAG) { // already json + *u_.sbf = SBF(initial_capacity, fp_prob, grow_factor, tl.local_mr); + } else { + SetMeta(SBF_TAG); + void* ptr = tl.local_mr->allocate(sizeof(SBF), alignof(SBF)); + u_.sbf = new (ptr) SBF(initial_capacity, fp_prob, grow_factor, tl.local_mr); + } +} + void CompactObj::SetString(std::string_view str) { uint8_t mask = mask_ & ~kEncMask; CHECK(!IsExternal()); @@ -876,7 +892,7 @@ bool CompactObj::HasAllocated() const { (taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr)) return false; - DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG); + DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG); return true; } @@ -991,6 +1007,8 @@ void CompactObj::Free() { VLOG(1) << "Freeing JSON object"; u_.json_obj.json_ptr->~JsonType(); tl.local_mr->deallocate(u_.json_obj.json_ptr, sizeof(JsonType), kAlignSize); + } else if (taglen_ == SBF_TAG) { + DeleteMR(u_.sbf); } else { LOG(FATAL) << "Unsupported tag " << int(taglen_); } @@ -1015,6 +1033,9 @@ size_t CompactObj::MallocUsed() const { return u_.small_str.MallocUsed(); } + if (taglen_ == SBF_TAG) { + return 0; // TODO: to track SBF memory utilization. + } LOG(DFATAL) << "should not reach"; return 0; } diff --git a/src/core/compact_object.h b/src/core/compact_object.h index c1c0ab25b..2393ee676 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -19,6 +19,8 @@ constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet constexpr unsigned kEncodingListPack = 3; +class SBF; + namespace detail { // redis objects or blobs of upto 4GB size. @@ -101,6 +103,7 @@ class CompactObj { ROBJ_TAG = 19, EXTERNAL_TAG = 20, JSON_TAG = 21, + SBF_TAG = 22, }; enum MaskBit { @@ -297,6 +300,8 @@ class CompactObj { // pre condition - the type here is OBJ_JSON and was set with SetJson JsonType* GetJson() const; + void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor); + // dest must have at least Size() bytes available void GetString(char* dest) const; @@ -388,6 +393,7 @@ class CompactObj { SmallString small_str; detail::RobjWrapper r_obj; JsonWrapper json_obj; + SBF* sbf __attribute__((packed)); int64_t ival __attribute__((packed)); ExternalPtr ext_ptr; diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 3dd7db9c4..18d94daf4 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -366,6 +366,12 @@ TEST_F(CompactObjectTest, Hash) { EXPECT_EQ(1, cobj_.Size()); } +TEST_F(CompactObjectTest, SBF) { + cobj_.SetSBF(1000, 0.001, 2); + EXPECT_EQ(cobj_.ObjType(), OBJ_SBF); + EXPECT_EQ(0, cobj_.MallocUsed()); +} + TEST_F(CompactObjectTest, MimallocUnderutilzation) { // We are testing with the same object size allocation here // This test is for https://github.com/dragonflydb/dragonfly/issues/448 diff --git a/src/redis/redis_aux.h b/src/redis/redis_aux.h index 621ec5de0..e10ec55c2 100644 --- a/src/redis/redis_aux.h +++ b/src/redis/redis_aux.h @@ -7,31 +7,10 @@ /* the last one in object.h is OBJ_STREAM and it is 6, * this will add enough place for Redis types to grow */ #define OBJ_JSON 15U +#define OBJ_SBF 16U /* How many types of objects exist */ -#define OBJ_TYPE_MAX 16U - -#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */ -#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */ - -/* Redis maxmemory strategies. Instead of using just incremental number - * for this defines, we use a set of flags so that testing for certain - * properties common to multiple policies is faster. */ -#define MAXMEMORY_FLAG_LRU (1 << 0) -#define MAXMEMORY_FLAG_LFU (1 << 1) -#define MAXMEMORY_FLAG_ALLKEYS (1 << 2) -#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU) - -#define LFU_INIT_VAL 5 - -#define MAXMEMORY_VOLATILE_LRU ((0 << 8) | MAXMEMORY_FLAG_LRU) -#define MAXMEMORY_VOLATILE_LFU ((1 << 8) | MAXMEMORY_FLAG_LFU) -#define MAXMEMORY_VOLATILE_TTL (2 << 8) -#define MAXMEMORY_VOLATILE_RANDOM (3 << 8) -#define MAXMEMORY_ALLKEYS_LRU ((4 << 8) | MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_ALLKEYS) -#define MAXMEMORY_ALLKEYS_LFU ((5 << 8) | MAXMEMORY_FLAG_LFU | MAXMEMORY_FLAG_ALLKEYS) -#define MAXMEMORY_ALLKEYS_RANDOM ((6 << 8) | MAXMEMORY_FLAG_ALLKEYS) -#define MAXMEMORY_NO_EVICTION (7 << 8) +#define OBJ_TYPE_MAX 17U #define CONFIG_RUN_ID_SIZE 40U diff --git a/src/server/bloom_family.cc b/src/server/bloom_family.cc index d63cb78b4..59bcac4ba 100644 --- a/src/server/bloom_family.cc +++ b/src/server/bloom_family.cc @@ -8,21 +8,64 @@ #include "facade/error.h" #include "server/command_registry.h" #include "server/conn_context.h" +#include "server/engine_shard_set.h" +#include "server/transaction.h" namespace dfly { -// Bloom interface based on SBFs: -// https://www.sciencedirect.com/science/article/abs/pii/S0020019006003127 See -// https://www.alibabacloud.com/help/en/tair/developer-reference/bloom for the API documentation. -// See c-project for the implementation of bloom filters -// https://github.com/armon/bloomd as well as https://github.com/jvirkki/libbloom - using namespace facade; +using namespace std; -namespace {} // namespace +namespace { + +struct SbfParams { + uint32_t init_capacity; + double error; + double grow_factor = 2.0; + + bool ok() const { + return error > 0 and error < 0.5; + } +}; + +OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view key) { + auto& db_slice = op_args.shard->db_slice(); + OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key); + if (!op_res) + return op_res.status(); + if (!op_res->is_new) + return OpStatus::KEY_EXISTS; + + PrimeValue& pv = op_res->it->second; + pv.SetSBF(params.init_capacity, params.error, params.grow_factor); + + return OpStatus::OK; +} + +} // namespace void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) { - cntx->SendError(kSyntaxErr); + CmdArgParser parser(args); + string_view key = parser.Next(); + SbfParams params; + + tie(params.error, params.init_capacity) = parser.Next(); + + if (parser.Error()) + return cntx->SendError(kSyntaxErr); + + if (!params.ok()) + return cntx->SendError("error rate is out of range", kSyntaxErrType); + + const auto cb = [&](Transaction* t, EngineShard* shard) { + return OpReserve(params, t->GetOpArgs(shard), key); + }; + + OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb)); + if (res == OpStatus::KEY_EXISTS) { + return cntx->SendError("item exists"); + } + return cntx->SendError(res); } void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) { diff --git a/src/server/bloom_family_test.cc b/src/server/bloom_family_test.cc index 62df5fbcd..2c1fd01a9 100644 --- a/src/server/bloom_family_test.cc +++ b/src/server/bloom_family_test.cc @@ -4,6 +4,7 @@ #include "server/bloom_family.h" +#include "facade/facade_test.h" #include "server/test_utils.h" namespace dfly { @@ -13,6 +14,10 @@ class BloomFamilyTest : public BaseFamilyTest { }; TEST_F(BloomFamilyTest, Basic) { + auto resp = Run({"bf.reserve", "b1", "0.1", "32"}); + EXPECT_EQ(resp, "OK"); + resp = Run({"type", "b1"}); + EXPECT_EQ(resp, "MBbloom--"); } } // namespace dfly diff --git a/src/server/common.cc b/src/server/common.cc index 87f37c5c5..c2fbc5179 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -99,6 +99,9 @@ const char* ObjTypeName(int type) { return "stream"; case OBJ_JSON: return "rejson-rl"; + case OBJ_SBF: + return "MBbloom--"; + default: LOG(ERROR) << "Unsupported type " << type; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 0461c30f9..206dab698 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -218,8 +218,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT #define ADD(x) (x) += o.x DbStats& DbStats::operator+=(const DbStats& o) { - constexpr size_t kDbSz = sizeof(DbStats); - static_assert(kDbSz == 208); + constexpr size_t kDbSz = sizeof(DbStats) - sizeof(DbTableStats); + static_assert(kDbSz == 32); DbTableStats::operator+=(o); diff --git a/src/server/table.cc b/src/server/table.cc index 1d5d2205d..64de53de9 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -30,8 +30,8 @@ void DbTableStats::AddTypeMemoryUsage(unsigned type, int64_t delta) { } DbTableStats& DbTableStats::operator+=(const DbTableStats& o) { - constexpr size_t kDbSz = sizeof(DbTableStats); - static_assert(kDbSz == 176); + constexpr size_t kDbSz = sizeof(DbTableStats) - sizeof(memory_usage_by_type); + static_assert(kDbSz == 48); ADD(inline_keys); ADD(obj_memory_usage);