mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: expose SBF via compact_object (#2797)
* chore: expose SBF via compact_object --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
6d87acfc43
commit
9e23f85e6b
11 changed files with 114 additions and 36 deletions
|
@ -146,6 +146,17 @@ SBF::~SBF() {
|
|||
f.Destroy(mr);
|
||||
}
|
||||
|
||||
SBF& SBF::operator=(SBF&& src) {
|
||||
filters_.clear();
|
||||
filters_.swap(src.filters_);
|
||||
grow_factor_ = src.grow_factor_;
|
||||
fp_prob_ = src.fp_prob_;
|
||||
current_size_ = src.current_size_;
|
||||
max_capacity_ = src.max_capacity_;
|
||||
|
||||
return *this;
|
||||
}
|
||||
|
||||
bool SBF::Add(std::string_view str) {
|
||||
DCHECK_LT(current_size_, max_capacity_);
|
||||
|
||||
|
|
|
@ -72,10 +72,14 @@ class Bloom {
|
|||
* TODO: to test the actual rate of this filter.
|
||||
*/
|
||||
class SBF {
|
||||
SBF(const SBF&) = delete;
|
||||
|
||||
public:
|
||||
SBF(uint64_t initial_capacity, double fp_prob, double grow_factor, PMR_NS::memory_resource* mr);
|
||||
~SBF();
|
||||
|
||||
SBF& operator=(SBF&& src);
|
||||
|
||||
bool Add(std::string_view str);
|
||||
bool Exists(std::string_view str) const;
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "base/pod_array.h"
|
||||
#include "core/bloom.h"
|
||||
#include "core/detail/bitpacking.h"
|
||||
#include "core/sorted_map.h"
|
||||
#include "core/string_map.h"
|
||||
|
@ -631,6 +632,10 @@ unsigned CompactObj::ObjType() const {
|
|||
return OBJ_JSON;
|
||||
}
|
||||
|
||||
if (taglen_ == SBF_TAG) {
|
||||
return OBJ_SBF;
|
||||
}
|
||||
|
||||
LOG(FATAL) << "TBD " << int(taglen_);
|
||||
return 0;
|
||||
}
|
||||
|
@ -649,6 +654,7 @@ string_view CompactObj::ObjTypeToString(unsigned type) {
|
|||
OBJECT_TYPE_CASE(OBJ_MODULE);
|
||||
OBJECT_TYPE_CASE(OBJ_STREAM);
|
||||
OBJECT_TYPE_CASE(OBJ_JSON);
|
||||
OBJECT_TYPE_CASE(OBJ_SBF);
|
||||
default:
|
||||
DCHECK(false) << "Unknown object type " << type;
|
||||
return "OTHER";
|
||||
|
@ -707,6 +713,16 @@ void CompactObj::SetJson(JsonType&& j) {
|
|||
}
|
||||
}
|
||||
|
||||
void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor) {
|
||||
if (taglen_ == SBF_TAG) { // already json
|
||||
*u_.sbf = SBF(initial_capacity, fp_prob, grow_factor, tl.local_mr);
|
||||
} else {
|
||||
SetMeta(SBF_TAG);
|
||||
void* ptr = tl.local_mr->allocate(sizeof(SBF), alignof(SBF));
|
||||
u_.sbf = new (ptr) SBF(initial_capacity, fp_prob, grow_factor, tl.local_mr);
|
||||
}
|
||||
}
|
||||
|
||||
void CompactObj::SetString(std::string_view str) {
|
||||
uint8_t mask = mask_ & ~kEncMask;
|
||||
CHECK(!IsExternal());
|
||||
|
@ -876,7 +892,7 @@ bool CompactObj::HasAllocated() const {
|
|||
(taglen_ == ROBJ_TAG && u_.r_obj.inner_obj() == nullptr))
|
||||
return false;
|
||||
|
||||
DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG);
|
||||
DCHECK(taglen_ == ROBJ_TAG || taglen_ == SMALL_TAG || taglen_ == JSON_TAG || taglen_ == SBF_TAG);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -991,6 +1007,8 @@ void CompactObj::Free() {
|
|||
VLOG(1) << "Freeing JSON object";
|
||||
u_.json_obj.json_ptr->~JsonType();
|
||||
tl.local_mr->deallocate(u_.json_obj.json_ptr, sizeof(JsonType), kAlignSize);
|
||||
} else if (taglen_ == SBF_TAG) {
|
||||
DeleteMR<SBF>(u_.sbf);
|
||||
} else {
|
||||
LOG(FATAL) << "Unsupported tag " << int(taglen_);
|
||||
}
|
||||
|
@ -1015,6 +1033,9 @@ size_t CompactObj::MallocUsed() const {
|
|||
return u_.small_str.MallocUsed();
|
||||
}
|
||||
|
||||
if (taglen_ == SBF_TAG) {
|
||||
return 0; // TODO: to track SBF memory utilization.
|
||||
}
|
||||
LOG(DFATAL) << "should not reach";
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@ constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
|
|||
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
|
||||
constexpr unsigned kEncodingListPack = 3;
|
||||
|
||||
class SBF;
|
||||
|
||||
namespace detail {
|
||||
|
||||
// redis objects or blobs of upto 4GB size.
|
||||
|
@ -101,6 +103,7 @@ class CompactObj {
|
|||
ROBJ_TAG = 19,
|
||||
EXTERNAL_TAG = 20,
|
||||
JSON_TAG = 21,
|
||||
SBF_TAG = 22,
|
||||
};
|
||||
|
||||
enum MaskBit {
|
||||
|
@ -297,6 +300,8 @@ class CompactObj {
|
|||
// pre condition - the type here is OBJ_JSON and was set with SetJson
|
||||
JsonType* GetJson() const;
|
||||
|
||||
void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor);
|
||||
|
||||
// dest must have at least Size() bytes available
|
||||
void GetString(char* dest) const;
|
||||
|
||||
|
@ -388,6 +393,7 @@ class CompactObj {
|
|||
SmallString small_str;
|
||||
detail::RobjWrapper r_obj;
|
||||
JsonWrapper json_obj;
|
||||
SBF* sbf __attribute__((packed));
|
||||
int64_t ival __attribute__((packed));
|
||||
ExternalPtr ext_ptr;
|
||||
|
||||
|
|
|
@ -366,6 +366,12 @@ TEST_F(CompactObjectTest, Hash) {
|
|||
EXPECT_EQ(1, cobj_.Size());
|
||||
}
|
||||
|
||||
TEST_F(CompactObjectTest, SBF) {
|
||||
cobj_.SetSBF(1000, 0.001, 2);
|
||||
EXPECT_EQ(cobj_.ObjType(), OBJ_SBF);
|
||||
EXPECT_EQ(0, cobj_.MallocUsed());
|
||||
}
|
||||
|
||||
TEST_F(CompactObjectTest, MimallocUnderutilzation) {
|
||||
// We are testing with the same object size allocation here
|
||||
// This test is for https://github.com/dragonflydb/dragonfly/issues/448
|
||||
|
|
|
@ -7,31 +7,10 @@
|
|||
/* the last one in object.h is OBJ_STREAM and it is 6,
|
||||
* this will add enough place for Redis types to grow */
|
||||
#define OBJ_JSON 15U
|
||||
#define OBJ_SBF 16U
|
||||
|
||||
/* How many types of objects exist */
|
||||
#define OBJ_TYPE_MAX 16U
|
||||
|
||||
#define HASHTABLE_MIN_FILL 10 /* Minimal hash table fill 10% */
|
||||
#define HASHTABLE_MAX_LOAD_FACTOR 1.618 /* Maximum hash table load factor. */
|
||||
|
||||
/* Redis maxmemory strategies. Instead of using just incremental number
|
||||
* for this defines, we use a set of flags so that testing for certain
|
||||
* properties common to multiple policies is faster. */
|
||||
#define MAXMEMORY_FLAG_LRU (1 << 0)
|
||||
#define MAXMEMORY_FLAG_LFU (1 << 1)
|
||||
#define MAXMEMORY_FLAG_ALLKEYS (1 << 2)
|
||||
#define MAXMEMORY_FLAG_NO_SHARED_INTEGERS (MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_LFU)
|
||||
|
||||
#define LFU_INIT_VAL 5
|
||||
|
||||
#define MAXMEMORY_VOLATILE_LRU ((0 << 8) | MAXMEMORY_FLAG_LRU)
|
||||
#define MAXMEMORY_VOLATILE_LFU ((1 << 8) | MAXMEMORY_FLAG_LFU)
|
||||
#define MAXMEMORY_VOLATILE_TTL (2 << 8)
|
||||
#define MAXMEMORY_VOLATILE_RANDOM (3 << 8)
|
||||
#define MAXMEMORY_ALLKEYS_LRU ((4 << 8) | MAXMEMORY_FLAG_LRU | MAXMEMORY_FLAG_ALLKEYS)
|
||||
#define MAXMEMORY_ALLKEYS_LFU ((5 << 8) | MAXMEMORY_FLAG_LFU | MAXMEMORY_FLAG_ALLKEYS)
|
||||
#define MAXMEMORY_ALLKEYS_RANDOM ((6 << 8) | MAXMEMORY_FLAG_ALLKEYS)
|
||||
#define MAXMEMORY_NO_EVICTION (7 << 8)
|
||||
#define OBJ_TYPE_MAX 17U
|
||||
|
||||
#define CONFIG_RUN_ID_SIZE 40U
|
||||
|
||||
|
|
|
@ -8,21 +8,64 @@
|
|||
#include "facade/error.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
// Bloom interface based on SBFs:
|
||||
// https://www.sciencedirect.com/science/article/abs/pii/S0020019006003127 See
|
||||
// https://www.alibabacloud.com/help/en/tair/developer-reference/bloom for the API documentation.
|
||||
// See c-project for the implementation of bloom filters
|
||||
// https://github.com/armon/bloomd as well as https://github.com/jvirkki/libbloom
|
||||
|
||||
using namespace facade;
|
||||
using namespace std;
|
||||
|
||||
namespace {} // namespace
|
||||
namespace {
|
||||
|
||||
struct SbfParams {
|
||||
uint32_t init_capacity;
|
||||
double error;
|
||||
double grow_factor = 2.0;
|
||||
|
||||
bool ok() const {
|
||||
return error > 0 and error < 0.5;
|
||||
}
|
||||
};
|
||||
|
||||
OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view key) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
if (!op_res)
|
||||
return op_res.status();
|
||||
if (!op_res->is_new)
|
||||
return OpStatus::KEY_EXISTS;
|
||||
|
||||
PrimeValue& pv = op_res->it->second;
|
||||
pv.SetSBF(params.init_capacity, params.error, params.grow_factor);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
|
||||
cntx->SendError(kSyntaxErr);
|
||||
CmdArgParser parser(args);
|
||||
string_view key = parser.Next();
|
||||
SbfParams params;
|
||||
|
||||
tie(params.error, params.init_capacity) = parser.Next<double, uint32_t>();
|
||||
|
||||
if (parser.Error())
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
|
||||
if (!params.ok())
|
||||
return cntx->SendError("error rate is out of range", kSyntaxErrType);
|
||||
|
||||
const auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpReserve(params, t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
OpStatus res = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
if (res == OpStatus::KEY_EXISTS) {
|
||||
return cntx->SendError("item exists");
|
||||
}
|
||||
return cntx->SendError(res);
|
||||
}
|
||||
|
||||
void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
|
@ -4,6 +4,7 @@
|
|||
|
||||
#include "server/bloom_family.h"
|
||||
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
namespace dfly {
|
||||
|
@ -13,6 +14,10 @@ class BloomFamilyTest : public BaseFamilyTest {
|
|||
};
|
||||
|
||||
TEST_F(BloomFamilyTest, Basic) {
|
||||
auto resp = Run({"bf.reserve", "b1", "0.1", "32"});
|
||||
EXPECT_EQ(resp, "OK");
|
||||
resp = Run({"type", "b1"});
|
||||
EXPECT_EQ(resp, "MBbloom--");
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -99,6 +99,9 @@ const char* ObjTypeName(int type) {
|
|||
return "stream";
|
||||
case OBJ_JSON:
|
||||
return "rejson-rl";
|
||||
case OBJ_SBF:
|
||||
return "MBbloom--";
|
||||
|
||||
default:
|
||||
LOG(ERROR) << "Unsupported type " << type;
|
||||
}
|
||||
|
|
|
@ -218,8 +218,8 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
#define ADD(x) (x) += o.x
|
||||
|
||||
DbStats& DbStats::operator+=(const DbStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(DbStats);
|
||||
static_assert(kDbSz == 208);
|
||||
constexpr size_t kDbSz = sizeof(DbStats) - sizeof(DbTableStats);
|
||||
static_assert(kDbSz == 32);
|
||||
|
||||
DbTableStats::operator+=(o);
|
||||
|
||||
|
|
|
@ -30,8 +30,8 @@ void DbTableStats::AddTypeMemoryUsage(unsigned type, int64_t delta) {
|
|||
}
|
||||
|
||||
DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
|
||||
constexpr size_t kDbSz = sizeof(DbTableStats);
|
||||
static_assert(kDbSz == 176);
|
||||
constexpr size_t kDbSz = sizeof(DbTableStats) - sizeof(memory_usage_by_type);
|
||||
static_assert(kDbSz == 48);
|
||||
|
||||
ADD(inline_keys);
|
||||
ADD(obj_memory_usage);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue