feat: add bf.(m)add and bf.(m)exists commands (#2801)

Adresses #1275

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-03-31 15:50:21 +03:00 committed by GitHub
parent 1d04683c48
commit 7f0f624699
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 158 additions and 6 deletions

View file

@ -723,6 +723,11 @@ void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_f
}
}
SBF* CompactObj::GetSBF() const {
DCHECK_EQ(SBF_TAG, taglen_);
return u_.sbf;
}
void CompactObj::SetString(std::string_view str) {
uint8_t mask = mask_ & ~kEncMask;
CHECK(!IsExternal());

View file

@ -301,6 +301,7 @@ class CompactObj {
JsonType* GetJson() const;
void SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor);
SBF* GetSBF() const;
// dest must have at least Size() bytes available
void GetString(char* dest) const;

View file

@ -4,6 +4,7 @@
#include "server/bloom_family.h"
#include "core/bloom.h"
#include "facade/cmd_arg_parser.h"
#include "facade/error.h"
#include "server/command_registry.h"
@ -18,16 +19,21 @@ using namespace std;
namespace {
constexpr double kDefaultFpProb = 0.01;
constexpr double kDefaultGrowFactor = 2;
struct SbfParams {
uint32_t init_capacity;
double error;
double grow_factor = 2.0;
double grow_factor = kDefaultGrowFactor;
bool ok() const {
return error > 0 and error < 0.5;
}
};
using AddResult = absl::InlinedVector<OpResult<bool>, 4>;
using ExistsResult = absl::InlinedVector<bool, 4>;
OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view key) {
auto& db_slice = op_args.shard->db_slice();
OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key);
@ -42,6 +48,47 @@ OpStatus OpReserve(const SbfParams& params, const OpArgs& op_args, string_view k
return OpStatus::OK;
}
// Returns true, if item was added, false if it was already "present".
OpResult<AddResult> OpAdd(const OpArgs& op_args, string_view key, CmdArgList items) {
auto& db_slice = op_args.shard->db_slice();
OpResult op_res = db_slice.AddOrFind(op_args.db_cntx, key);
if (!op_res)
return op_res.status();
PrimeValue& pv = op_res->it->second;
if (op_res->is_new) {
pv.SetSBF(0, kDefaultFpProb, kDefaultGrowFactor);
} else {
if (op_res->it->second.ObjType() != OBJ_SBF)
return OpStatus::WRONG_TYPE;
}
SBF* sbf = pv.GetSBF();
AddResult result(items.size());
for (size_t i = 0; i < items.size(); ++i) {
result[i] = sbf->Add(ToSV(items[i]));
}
return result;
}
OpResult<ExistsResult> OpExists(const OpArgs& op_args, string_view key, CmdArgList items) {
auto& db_slice = op_args.shard->db_slice();
OpResult op_res = db_slice.FindReadOnly(op_args.db_cntx, key, OBJ_SBF);
if (!op_res)
return op_res.status();
auto it = (*op_res);
const SBF* sbf = it->second.GetSBF();
ExistsResult result(items.size());
for (size_t i = 0; i < items.size(); ++i) {
result[i] = sbf->Exists(ToSV(items[i]));
}
return result;
}
} // namespace
void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
@ -69,11 +116,75 @@ void BloomFamily::Reserve(CmdArgList args, ConnectionContext* cntx) {
}
void BloomFamily::Add(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
OpStatus status = res.status();
if (res) {
if (res->front())
return cntx->SendLong(*res->front());
else
status = res->front().status();
}
return cntx->SendError(status);
}
void BloomFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
return cntx->SendLong(res ? res->front() : 0);
}
void BloomFamily::MAdd(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (!res) {
return cntx->SendError(res.status());
}
const AddResult& add_res = *res;
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
rb->StartArray(add_res.size());
for (const OpResult<bool>& val : add_res) {
if (val) {
cntx->SendLong(*val);
} else {
cntx->SendError(val.status());
}
}
}
void BloomFamily::MExists(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
args.remove_prefix(1);
const auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExists(t->GetOpArgs(shard), key, args);
};
OpResult res = cntx->transaction->ScheduleSingleHopT(std::move(cb));
RedisReplyBuilder* rb = (RedisReplyBuilder*)cntx->reply_builder();
rb->StartArray(args.size());
for (size_t i = 0; i < args.size(); ++i) {
cntx->SendLong(res ? res->at(i) : 0);
}
}
using CI = CommandId;
@ -86,7 +197,9 @@ void BloomFamily::Register(CommandRegistry* registry) {
*registry << CI{"BF.RESERVE", CO::WRITE | CO::DENYOOM | CO::FAST, -4, 1, 1, acl::BLOOM}.HFUNC(
Reserve)
<< CI{"BF.ADD", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Add)
<< CI{"BF.EXISTS", CO::READONLY | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Exists);
<< CI{"BF.MADD", CO::WRITE | CO::DENYOOM | CO::FAST, -3, 1, 1, acl::BLOOM}.HFUNC(MAdd)
<< CI{"BF.EXISTS", CO::READONLY | CO::FAST, 3, 1, 1, acl::BLOOM}.HFUNC(Exists)
<< CI{"BF.MEXISTS", CO::READONLY | CO::FAST, -3, 1, 1, acl::BLOOM}.HFUNC(MExists);
};
} // namespace dfly

View file

@ -18,7 +18,9 @@ class BloomFamily {
private:
static void Reserve(CmdArgList args, ConnectionContext* cntx);
static void Add(CmdArgList args, ConnectionContext* cntx);
static void MAdd(CmdArgList args, ConnectionContext* cntx);
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void MExists(CmdArgList args, ConnectionContext* cntx);
};
} // namespace dfly

View file

@ -9,6 +9,8 @@
namespace dfly {
using testing::ElementsAre;
class BloomFamilyTest : public BaseFamilyTest {
protected:
};
@ -16,8 +18,37 @@ class BloomFamilyTest : public BaseFamilyTest {
TEST_F(BloomFamilyTest, Basic) {
auto resp = Run({"bf.reserve", "b1", "0.1", "32"});
EXPECT_EQ(resp, "OK");
resp = Run({"type", "b1"});
EXPECT_EQ(resp, "MBbloom--");
EXPECT_EQ(Run({"type", "b1"}), "MBbloom--");
EXPECT_THAT(Run({"bf.add", "b1", "a"}), IntArg(1));
EXPECT_THAT(Run({"bf.add", "b1", "b"}), IntArg(1));
EXPECT_THAT(Run({"bf.add", "b1", "b"}), IntArg(0));
EXPECT_THAT(Run({"bf.add", "b2", "b"}), IntArg(1));
EXPECT_EQ(Run({"type", "b2"}), "MBbloom--");
EXPECT_THAT(Run({"bf.exists", "b2", "c"}), IntArg(0));
EXPECT_THAT(Run({"bf.exists", "b3", "c"}), IntArg(0));
EXPECT_THAT(Run({"bf.exists", "b2", "b"}), IntArg(1));
Run({"set", "str", "foo"});
EXPECT_THAT(Run({"bf.exists", "str", "b"}), IntArg(0));
}
TEST_F(BloomFamilyTest, Multiple) {
auto resp = Run({"bf.mexists", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));
Run({"set", "str", "foo"});
resp = Run({"bf.mexists", "str", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));
resp = Run({"bf.madd", "str", "a"});
EXPECT_THAT(resp, ErrArg("WRONG"));
resp = Run({"bf.madd", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(1), IntArg(1), IntArg(1))));
resp = Run({"bf.madd", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(0), IntArg(0), IntArg(0))));
resp = Run({"bf.mexists", "bf1", "a", "b", "c"});
EXPECT_THAT(resp, RespArray(ElementsAre(IntArg(1), IntArg(1), IntArg(1))));
}
} // namespace dfly