Use FlatSet for Redis SETS

Add FlatSet data structure.
Use FlatSet and get rid of t_set functions.
Fix Hash bug. Add memory comparison test
This commit is contained in:
Roman Gershman 2022-03-05 20:20:30 +02:00
parent a58ed46f1e
commit c94d109cff
9 changed files with 524 additions and 133 deletions

View file

@ -7,10 +7,12 @@
extern "C" {
#include "redis/intset.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
}
#include "base/logging.h"
#include "core/flat_set.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
@ -27,20 +29,100 @@ using SvArray = vector<std::string_view>;
namespace {
void FillSet(robj* src, absl::flat_hash_set<string>* dest) {
auto* si = setTypeInitIterator(src);
sds ele;
int64_t llele;
int encoding;
while ((encoding = setTypeNext(si, &ele, &llele)) != -1) {
if (encoding == OBJ_ENCODING_HT) {
std::string_view sv{ele, sdslen(ele)};
dest->emplace(sv);
} else {
dest->emplace(absl::StrCat(llele));
FlatSet* CreateFlatSet(pmr::memory_resource* mr) {
pmr::polymorphic_allocator<FlatSet> pa(mr);
FlatSet* fs = pa.allocate(1);
pa.construct(fs, mr);
return fs;
}
void ConvertTo(intset* src, FlatSet* dest) {
int64_t intele;
char buf[32];
/* To add the elements we extract integers and create redis objects */
int ii = 0;
while (intsetGet(src, ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
dest->Add(string_view{buf, size_t(next - buf)});
}
}
intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
long long llval;
*added = false;
if (!string2ll(val.data(), val.size(), &llval)) {
*success = false;
return is;
}
uint8_t inserted = 0;
is = intsetAdd(is, llval, &inserted);
if (inserted) {
*added = true;
size_t max_entries = server.set_max_intset_entries;
/* limit to 1G entries due to intset internals. */
if (max_entries >= 1 << 16)
max_entries = 1 << 16;
*success = intsetLen(is) <= max_entries;
} else {
*added = false;
*success = true;
}
return is;
}
// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
bool isempty = false;
unsigned removed = 0;
if (set->Encoding() == kEncodingIntSet) {
intset* is = (intset*)set->RObjPtr();
long long llval;
for (auto val : vals) {
if (!string2ll(val.data(), val.size(), &llval)) {
continue;
}
int is_removed = 0;
is = intsetRemove(is, llval, &is_removed);
removed += is_removed;
}
isempty = (intsetLen(is) == 0);
set->SetRObjPtr(is);
} else {
FlatSet* fs = (FlatSet*)set->RObjPtr();
for (auto val : vals) {
removed += fs->Remove(val);
}
isempty = fs->Empty();
set->SetRObjPtr(fs);
}
return make_pair(removed, isempty);
}
template <typename F> void FillSet(const CompactObj& set, F&& f) {
if (set.Encoding() == kEncodingIntSet) {
intset* is = (intset*)set.RObjPtr();
int64_t ival;
int ii = 0;
char buf[32];
while (intsetGet(is, ii++, &ival)) {
char* next = absl::numbers_internal::FastIntToBuffer(ival, buf);
f(string{buf, size_t(next - buf)});
}
} else {
FlatSet* fs = (FlatSet*)set.RObjPtr();
string str;
for (const auto& member : *fs) {
member.GetString(&str);
f(move(str));
}
}
setTypeReleaseIterator(si);
}
vector<string> ToVec(absl::flat_hash_set<string>&& set) {
@ -57,21 +139,6 @@ vector<string> ToVec(absl::flat_hash_set<string>&& set) {
return result;
}
bool IsInSet(const robj* s, int64_t val) {
if (s->encoding == OBJ_ENCODING_INTSET)
return intsetFind((intset*)s->ptr, val);
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
DCHECK_EQ(s->encoding, OBJ_ENCODING_HT);
sds elesds = sdsfromlonglong(val);
bool res = setTypeIsMember(s, elesds);
sdsfree(elesds);
return res;
}
ResultSetView UnionResultVec(const ResultStringVec& result_vec) {
absl::flat_hash_set<std::string_view> uniques;
@ -181,13 +248,12 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const ArgS
}
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key);
robj* o = nullptr;
if (!inserted) {
db_slice.PreUpdate(op_args.db_ind, it);
}
CompactObj& co = it->second;
if (inserted || overwrite) {
bool int_set = true;
long long intv;
@ -200,27 +266,51 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, const ArgS
}
if (int_set) {
o = createIntsetObject();
intset* is = intsetNew();
co.InitRobj(OBJ_SET, kEncodingIntSet, is);
} else {
o = createSetObject();
FlatSet* fs = CreateFlatSet(op_args.shard->memory_resource());
co.InitRobj(OBJ_SET, kEncodingStrMap, fs);
}
it->second.ImportRObj(o);
} else {
// We delibirately check only now because with othewrite=true
// we may write into object of a different type via ImportRObj above.
if (it->second.ObjType() != OBJ_SET)
if (co.ObjType() != OBJ_SET)
return OpStatus::WRONG_TYPE;
}
o = it->second.AsRObj();
void* inner_obj = co.RObjPtr();
uint32_t res = 0;
for (auto val : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, val.data(), val.size());
res += setTypeAdd(o, es->tmp_str1);
if (co.Encoding() == kEncodingIntSet) {
intset* is = (intset*)inner_obj;
bool success = true;
for (auto val : vals) {
bool added = false;
is = IntsetAddSafe(val, is, &success, &added);
res += added;
if (!success) {
FlatSet* fs = CreateFlatSet(op_args.shard->memory_resource());
ConvertTo(is, fs);
co.SetRObjPtr(is);
co.InitRobj(OBJ_SET, kEncodingStrMap, fs);
inner_obj = fs;
break;
}
}
if (success)
co.SetRObjPtr(is);
}
if (co.Encoding() == kEncodingStrMap) {
FlatSet* fs = (FlatSet*)inner_obj;
for (auto val : vals) {
res += fs->Add(val);
}
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, it);
@ -236,23 +326,21 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgS
}
db_slice.PreUpdate(op_args.db_ind, *find_res);
uint32_t res = 0;
robj* o = find_res.value()->second.AsRObj();
CompactObj& co = find_res.value()->second;
auto [removed, isempty] = RemoveSet(vals, &co);
for (auto val : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, val.data(), val.size());
res += setTypeRemove(o, es->tmp_str1);
}
if (res && setTypeSize(o) == 0) {
if (isempty) {
CHECK(db_slice.Del(op_args.db_ind, find_res.value()));
} else {
db_slice.PostUpdate(op_args.db_ind, *find_res);
}
return res;
return removed;
}
// For SMOVE. Comprised of 2 transactional steps: Find and Commit.
// After Find Mover decides on the outcome of the operation, applies it in commit
// and reports the result.
class Mover {
public:
Mover(std::string_view src, std::string_view dest, std::string_view member)
@ -272,14 +360,16 @@ class Mover {
OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
ArgSlice largs = t->ShardArgsInShard(es->shard_id());
// In case both src and dest are in the same shard, largs size will be 2.
DCHECK_LT(largs.size(), 2u);
for (auto k : largs) {
bool index = (k == src_) ? 0 : 1;
unsigned index = (k == src_) ? 0 : 1;
OpResult<MainIterator> res = es->db_slice().Find(t->db_index(), k, OBJ_SET);
if (res && index == 0) {
CHECK(!res->is_done());
es->tmp_str1 = sdscpylen(es->tmp_str1, member_.data(), member_.size());
int found_memb = setTypeIsMember(res.value()->second.AsRObj(), es->tmp_str1);
found_[0] = (found_memb == 1);
if (res && index == 0) { // succesful src find.
DCHECK(!res->is_done());
found_[0] = res.value()->second.IsMember(member_);
} else {
found_[index] = res.status();
}
@ -290,12 +380,14 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
ArgSlice largs = t->ShardArgsInShard(es->shard_id());
DCHECK_LT(largs.size(), 2u);
OpArgs op_args{es, t->db_index()};
for (auto k : largs) {
if (k == src_) {
CHECK_EQ(1u, OpRem(op_args, k, {member_}).value());
CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed.
} else {
CHECK_EQ(k, dest_);
DCHECK_EQ(k, dest_);
OpAdd(op_args, k, {member_}, false);
}
}
@ -304,33 +396,51 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
}
void Mover::Find(Transaction* t) {
// non-concluding step.
t->Execute([this](Transaction* t, EngineShard* es) { return this->OpFind(t, es); }, false);
}
OpResult<unsigned> Mover::Commit(Transaction* t) {
OpResult<unsigned> res;
bool return_early = false;
bool noop = false;
if (found_[0].status() == OpStatus::WRONG_TYPE || found_[1].status() == OpStatus::WRONG_TYPE) {
res = OpStatus::WRONG_TYPE;
return_early = true;
noop = true;
} else if (!found_[0].value_or(false)) {
res = 0;
return_early = true;
noop = true;
} else {
res = 1;
return_early = (src_ == dest_);
noop = (src_ == dest_);
}
if (return_early) {
if (noop) {
t->Execute(&NoOpCb, true);
return res;
} else {
t->Execute([this](Transaction* t, EngineShard* es) { return this->OpMutate(t, es); }, true);
}
t->Execute([this](Transaction* t, EngineShard* es) { return this->OpMutate(t, es); }, true);
return res;
}
#if 0
bool IsInSet(const robj* s, int64_t val) {
if (s->encoding == OBJ_ENCODING_INTSET)
return intsetFind((intset*)s->ptr, val);
/* in order to compare an integer with an object we
* have to use the generic function, creating an object
* for this */
DCHECK_EQ(s->encoding, OBJ_ENCODING_HT);
sds elesds = sdsfromlonglong(val);
bool res = setTypeIsMember(s, elesds);
sdsfree(elesds);
return res;
}
#endif
} // namespace
void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) {
@ -369,11 +479,11 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<MainIterator> find_res = shard->db_slice().Find(t->db_index(), key, OBJ_SET);
shard->tmp_str1 = sdscpylen(shard->tmp_str1, val.data(), val.size());
if (find_res) {
return find_res.value()->second.IsMember(val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND;
}
int res = setTypeIsMember(find_res.value()->second.AsRObj(), shard->tmp_str1);
return res == 1 ? OpStatus::OK : OpStatus::INVALID_VALUE;
return find_res.status();
};
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
@ -436,7 +546,7 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) {
return find_res.status();
}
return setTypeSize(find_res.value()->second.AsRObj());
return find_res.value()->second.Size();
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -567,7 +677,9 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) {
void SetFamily::SMembers(CmdArgList args, ConnectionContext* cntx) {
auto cb = [](Transaction* t, EngineShard* shard) { return OpInter(t, shard, false); };
OpResult<StringSet> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
SvArray arr{result->begin(), result->end()};
if (cntx->conn_state.script_info) { // sort under script
@ -712,8 +824,7 @@ auto SetFamily::OpUnion(const OpArgs& op_args, const ArgSlice& keys) -> OpResult
for (std::string_view key : keys) {
OpResult<MainIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
if (find_res) {
robj* sobj = find_res.value()->second.AsRObj();
FillSet(sobj, &uniques);
FillSet(find_res.value()->second, [&uniques](string s) { uniques.emplace(move(s)); });
continue;
}
@ -737,6 +848,7 @@ auto SetFamily::OpDiff(const Transaction* t, EngineShard* es) -> OpResult<String
absl::flat_hash_set<string> uniques;
#if 0
robj* sobj = find_res.value()->second.AsRObj();
FillSet(sobj, &uniques);
DCHECK(!uniques.empty()); // otherwise the key would not exist.
@ -766,7 +878,7 @@ auto SetFamily::OpDiff(const Transaction* t, EngineShard* es) -> OpResult<String
}
setTypeReleaseIterator(si);
}
#endif
return ToVec(std::move(uniques));
}
@ -777,13 +889,15 @@ auto SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned coun
if (!find_res)
return find_res.status();
StringSet result;
if (count == 0)
return StringSet{};
return result;
#if 0
MainIterator it = find_res.value();
robj* sobj = it->second.AsRObj();
auto slen = setTypeSize(sobj);
StringSet result;
/* CASE 1:
* The number of requested elements is greater than or equal to
@ -819,6 +933,7 @@ auto SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned coun
}
it->second.SyncRObj();
}
#endif
return result;
}
@ -829,8 +944,22 @@ auto SetFamily::OpInter(const Transaction* t, EngineShard* es, bool remove_first
keys.remove_prefix(1);
}
DCHECK(!keys.empty());
vector<robj> sets(keys.size()); // we must copy by value because AsRObj is temporary.
StringSet result;
if (keys.size() == 1) {
OpResult<MainIterator> find_res = es->db_slice().Find(t->db_index(), keys.front(), OBJ_SET);
if (!find_res)
return find_res.status();
FillSet(find_res.value()->second, [&result](string s) {
result.push_back(move(s));
});
return result;
}
LOG(DFATAL) << "TBD";
#if 0
vector<CompactObj> sets(keys.size()); // we must copy by value because AsRObj is temporary.
for (size_t i = 0; i < keys.size(); ++i) {
OpResult<MainIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
if (!find_res)
@ -848,7 +977,6 @@ auto SetFamily::OpInter(const Transaction* t, EngineShard* es, bool remove_first
sds elesds;
int64_t intobj;
StringSet result;
// TODO: the whole code is awful. imho, the encoding is the same for the same object.
/* Iterate all the elements of the first (smallest) set, and test
* the element against all the other sets, if at least one set does
@ -881,7 +1009,7 @@ auto SetFamily::OpInter(const Transaction* t, EngineShard* es, bool remove_first
}
}
setTypeReleaseIterator(si);
#endif
return result;
}

View file

@ -33,6 +33,15 @@ TEST_F(SetFamilyTest, SAdd) {
EXPECT_THAT(resp, RespEq("set"));
}
TEST_F(SetFamilyTest, IntConv) {
auto resp = Run({"sadd", "x", "134"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"sadd", "x", "abc"});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"sadd", "x", "134"});
EXPECT_THAT(resp[0], IntArg(0));
}
TEST_F(SetFamilyTest, SUnionStore) {
auto resp = Run({"sadd", "b", "1", "2", "3"});
Run({"sadd", "c", "10", "11"});
@ -47,6 +56,9 @@ TEST_F(SetFamilyTest, SUnionStore) {
}
TEST_F(SetFamilyTest, SDiff) {
LOG(ERROR) << "TBD";
return;
auto resp = Run({"sadd", "b", "1", "2", "3"});
Run({"sadd", "c", "10", "11"});
Run({"set", "a", "foo"});