mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(setfamily): Add SADDEX command (#348)
Add SADDEX <key> <seconds> member member .... Provides expiry semantics for set members with seconds resolution. Important things to note: 1. The expiry is passive only, so if nobody touches that set then its members are being kept. 2. SCARD provides an upper bound estimation of set size for sets holding the expiring members. The reason for this is because SCARD returns a cached size and does not go over all members to check whether they expired. For regular sets it's exact, of course. Fixes #335
This commit is contained in:
parent
cf779c08a4
commit
d2b216077d
4 changed files with 334 additions and 208 deletions
|
@ -492,21 +492,35 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
|
|||
|
||||
uint32_t entries_idx = cursor >> (32 - capacity_log_);
|
||||
|
||||
auto& entries = const_cast<DenseSet*>(this)->entries_;
|
||||
|
||||
// skip empty entries
|
||||
while (entries_idx < entries_.size() && entries_[entries_idx].IsEmpty()) {
|
||||
++entries_idx;
|
||||
}
|
||||
do {
|
||||
while (entries_idx < entries_.size() && entries_[entries_idx].IsEmpty()) {
|
||||
++entries_idx;
|
||||
}
|
||||
|
||||
if (entries_idx == entries_.size()) {
|
||||
return 0;
|
||||
}
|
||||
if (entries_idx == entries_.size()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
const DensePtr* curr = &entries_[entries_idx];
|
||||
ExpireIfNeeded(nullptr, &entries[entries_idx]);
|
||||
} while (entries_[entries_idx].IsEmpty());
|
||||
|
||||
DensePtr* curr = &entries[entries_idx];
|
||||
|
||||
// when scanning add all entries in a given chain
|
||||
while (curr != nullptr && !curr->IsEmpty()) {
|
||||
while (true) {
|
||||
cb(curr->GetObject());
|
||||
curr = curr->Next();
|
||||
if (!curr->IsLink())
|
||||
break;
|
||||
|
||||
DensePtr* mcurr = const_cast<DensePtr*>(curr);
|
||||
|
||||
if (ExpireIfNeeded(mcurr, &mcurr->AsLink()->next) && !mcurr->IsLink()) {
|
||||
break;
|
||||
}
|
||||
curr = &curr->AsLink()->next;
|
||||
}
|
||||
|
||||
// move to the next index for the next scan and check if we are done
|
||||
|
@ -515,6 +529,14 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// In case of displacement, we want to fully cover the bucket we traversed, therefore
|
||||
// we check if the bucket on the right belongs to the home bucket.
|
||||
ExpireIfNeeded(nullptr, &entries[entries_idx]);
|
||||
|
||||
if (entries[entries_idx].GetDisplacedDirection() == 1) { // right of the home bucket
|
||||
cb(entries[entries_idx].GetObject());
|
||||
}
|
||||
|
||||
return entries_idx << (32 - capacity_log_);
|
||||
}
|
||||
|
||||
|
|
|
@ -263,9 +263,8 @@ TEST_F(StringSetTest, IntOnly) {
|
|||
|
||||
size_t expected_seen = 0;
|
||||
auto scan_callback = [&](const sds ptr) {
|
||||
sds s = (sds)ptr;
|
||||
string_view str{s, sdslen(s)};
|
||||
EXPECT_FALSE(removed.count(string(str)));
|
||||
string str{ptr, sdslen(ptr)};
|
||||
EXPECT_FALSE(removed.count(str));
|
||||
|
||||
if (numbers.count(atoi(str.data()))) {
|
||||
++expected_seen;
|
||||
|
@ -281,7 +280,7 @@ TEST_F(StringSetTest, IntOnly) {
|
|||
ss_->Add(to_string(val));
|
||||
} while (cursor != 0);
|
||||
|
||||
EXPECT_TRUE(expected_seen + removed.size() == num_ints);
|
||||
EXPECT_GE(expected_seen + removed.size(), num_ints);
|
||||
}
|
||||
|
||||
TEST_F(StringSetTest, XtremeScanGrow) {
|
||||
|
|
|
@ -17,12 +17,11 @@ extern "C" {
|
|||
#include "core/string_set.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/container_utils.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
#include "server/container_utils.h"
|
||||
|
||||
ABSL_DECLARE_FLAG(bool, use_set2);
|
||||
|
||||
namespace dfly {
|
||||
|
@ -30,7 +29,7 @@ namespace dfly {
|
|||
using namespace std;
|
||||
using absl::GetFlag;
|
||||
|
||||
using ResultStringVec = vector<OpResult<vector<string>>>;
|
||||
using ResultStringVec = vector<OpResult<StringVec>>;
|
||||
using ResultSetView = OpResult<absl::flat_hash_set<std::string_view>>;
|
||||
using SvArray = vector<std::string_view>;
|
||||
|
||||
|
@ -38,6 +37,13 @@ namespace {
|
|||
|
||||
constexpr uint32_t kMaxIntSetEntries = 256;
|
||||
|
||||
// I use relative time from Oct 1, 2022
|
||||
constexpr uint64_t kNowBase = 1664582400ULL;
|
||||
|
||||
uint32_t TimeNowSecRel(uint64_t now_ms) {
|
||||
return (now_ms / 1000) - kNowBase;
|
||||
}
|
||||
|
||||
intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
|
||||
long long llval;
|
||||
*added = false;
|
||||
|
@ -80,14 +86,15 @@ bool dictContains(const dict* d, string_view key) {
|
|||
return false;
|
||||
}
|
||||
|
||||
pair<unsigned, bool> RemoveStrSet(ArgSlice vals, CompactObj* set) {
|
||||
pair<unsigned, bool> RemoveStrSet(uint32_t now_sec, ArgSlice vals, CompactObj* set) {
|
||||
unsigned removed = 0;
|
||||
bool isempty = false;
|
||||
auto* shard = EngineShard::tlocal();
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(set->Encoding(), kEncodingStrMap2);
|
||||
if (set->Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = ((StringSet*)set->RObjPtr());
|
||||
ss->set_time(now_sec);
|
||||
|
||||
for (auto member : vals) {
|
||||
removed += ss->Erase(member);
|
||||
}
|
||||
|
@ -108,17 +115,17 @@ pair<unsigned, bool> RemoveStrSet(ArgSlice vals, CompactObj* set) {
|
|||
return make_pair(removed, isempty);
|
||||
}
|
||||
|
||||
unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
|
||||
unsigned AddStrSet(const DbContext& db_context, ArgSlice vals, uint32_t ttl_sec, CompactObj* dest) {
|
||||
unsigned res = 0;
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(dest->Encoding(), kEncodingStrMap2);
|
||||
if (dest->Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)dest->RObjPtr();
|
||||
uint32_t time_now = TimeNowSecRel(db_context.time_now_ms);
|
||||
|
||||
ss->set_time(time_now);
|
||||
|
||||
for (auto member : vals) {
|
||||
// the temporary variable in the engine shard is not used here since the
|
||||
// SDS pointer allocated is stored in the set and will outlive this function
|
||||
// call
|
||||
res += ss->Add(member);
|
||||
res += ss->Add(member, ttl_sec);
|
||||
}
|
||||
} else {
|
||||
DCHECK_EQ(dest->Encoding(), kEncodingStrMap);
|
||||
|
@ -134,6 +141,7 @@ unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -148,7 +156,7 @@ void InitStrSet(CompactObj* set) {
|
|||
}
|
||||
|
||||
// returns (removed, isempty)
|
||||
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
|
||||
pair<unsigned, bool> RemoveSet(const DbContext& db_context, ArgSlice vals, CompactObj* set) {
|
||||
bool isempty = false;
|
||||
unsigned removed = 0;
|
||||
|
||||
|
@ -168,7 +176,7 @@ pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
|
|||
isempty = (intsetLen(is) == 0);
|
||||
set->SetRObjPtr(is);
|
||||
} else {
|
||||
return RemoveStrSet(vals, set);
|
||||
return RemoveStrSet(TimeNowSecRel(db_context.time_now_ms), vals, set);
|
||||
}
|
||||
return make_pair(removed, isempty);
|
||||
}
|
||||
|
@ -198,12 +206,14 @@ void ScanCallback(void* privdata, const dictEntry* de) {
|
|||
sv->push_back(string(key, sdslen(key)));
|
||||
}
|
||||
|
||||
uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringVec* res) {
|
||||
uint64_t ScanStrSet(const DbContext& db_context, const CompactObj& co, uint64_t curs,
|
||||
unsigned count, StringVec* res) {
|
||||
long maxiterations = count * 10;
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(co.Encoding(), kEncodingStrMap2);
|
||||
if (co.Encoding() == kEncodingStrMap2) {
|
||||
StringSet* set = (StringSet*)co.RObjPtr();
|
||||
set->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
|
||||
do {
|
||||
curs = set->Scan(curs, [&](const sds ptr) { res->push_back(std::string(ptr, sdslen(ptr))); });
|
||||
} while (curs && maxiterations-- && res->size() < count);
|
||||
|
@ -220,37 +230,40 @@ uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringV
|
|||
|
||||
using SetType = pair<void*, unsigned>;
|
||||
|
||||
uint32_t SetTypeLen(const SetType& set) {
|
||||
uint32_t SetTypeLen(const DbContext& db_context, const SetType& set) {
|
||||
if (set.second == kEncodingIntSet) {
|
||||
return intsetLen((const intset*)set.first);
|
||||
}
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(set.second, kEncodingStrMap2);
|
||||
return ((StringSet*)set.first)->Size();
|
||||
} else {
|
||||
DCHECK_EQ(set.second, kEncodingStrMap);
|
||||
return dictSize((const dict*)set.first);
|
||||
if (set.second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)set.first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
return ss->Size();
|
||||
}
|
||||
|
||||
DCHECK_EQ(set.second, kEncodingStrMap);
|
||||
return dictSize((const dict*)set.first);
|
||||
}
|
||||
|
||||
bool IsInSet(const SetType& st, int64_t val) {
|
||||
bool IsInSet(const DbContext& db_context, const SetType& st, int64_t val) {
|
||||
if (st.second == kEncodingIntSet)
|
||||
return intsetFind((intset*)st.first, val);
|
||||
|
||||
char buf[32];
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
|
||||
string_view str{buf, size_t(next - buf)};
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap2);
|
||||
return ((StringSet*)st.first)->Contains(str);
|
||||
} else {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
return dictContains((dict*)st.first, str);
|
||||
|
||||
if (st.second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
return ss->Contains(str);
|
||||
}
|
||||
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
return dictContains((dict*)st.first, str);
|
||||
}
|
||||
|
||||
bool IsInSet(const SetType& st, string_view member) {
|
||||
bool IsInSet(const DbContext& db_context, const SetType& st, string_view member) {
|
||||
if (st.second == kEncodingIntSet) {
|
||||
long long llval;
|
||||
if (!string2ll(member.data(), member.size(), &llval))
|
||||
|
@ -259,9 +272,11 @@ bool IsInSet(const SetType& st, string_view member) {
|
|||
return intsetFind((intset*)st.first, llval);
|
||||
}
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap2);
|
||||
return ((StringSet*)st.first)->Contains(member);
|
||||
if (st.second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
|
||||
return ss->Contains(member);
|
||||
} else {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
return dictContains((dict*)st.first, member);
|
||||
|
@ -269,10 +284,12 @@ bool IsInSet(const SetType& st, string_view member) {
|
|||
}
|
||||
|
||||
// Removes arg from result.
|
||||
void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap2);
|
||||
for (const sds ptr : *(StringSet*)st.first) {
|
||||
void DiffStrSet(const DbContext& db_context, const SetType& st,
|
||||
absl::flat_hash_set<string>* result) {
|
||||
if (st.second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
for (sds ptr : *ss) {
|
||||
result->erase(string_view{ptr, sdslen(ptr)});
|
||||
}
|
||||
} else {
|
||||
|
@ -288,15 +305,15 @@ void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
|
|||
}
|
||||
}
|
||||
|
||||
void InterStrSet(const vector<SetType>& vec, StringVec* result) {
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(vec.front().second, kEncodingStrMap2);
|
||||
StringSet* set = (StringSet*)vec.front().first;
|
||||
for (const sds ptr : *set) {
|
||||
void InterStrSet(const DbContext& db_context, const vector<SetType>& vec, StringVec* result) {
|
||||
if (vec.front().second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)vec.front().first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
for (const sds ptr : *ss) {
|
||||
std::string_view str{ptr, sdslen(ptr)};
|
||||
size_t j = 1;
|
||||
for (j = 1; j < vec.size(); ++j) {
|
||||
if (vec[j].first != set && !IsInSet(vec[j], str)) {
|
||||
if (vec[j].first != ss && !IsInSet(db_context, vec[j], str)) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -316,7 +333,7 @@ void InterStrSet(const vector<SetType>& vec, StringVec* result) {
|
|||
string_view member{key, sdslen(key)};
|
||||
|
||||
for (j = 1; j < vec.size(); j++) {
|
||||
if (vec[j].first != ds && !IsInSet(vec[j], member))
|
||||
if (vec[j].first != ds && !IsInSet(db_context, vec[j], member))
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -329,14 +346,16 @@ void InterStrSet(const vector<SetType>& vec, StringVec* result) {
|
|||
}
|
||||
}
|
||||
|
||||
StringVec PopStrSet(unsigned count, const SetType& st) {
|
||||
StringVec PopStrSet(const DbContext& db_context, unsigned count, const SetType& st) {
|
||||
StringVec result;
|
||||
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap2);
|
||||
StringSet* set = (StringSet*)st.first;
|
||||
for (unsigned i = 0; i < count && !set->Empty(); ++i) {
|
||||
result.push_back(set->Pop().value());
|
||||
if (st.second == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)st.first;
|
||||
ss->set_time(TimeNowSecRel(db_context.time_now_ms));
|
||||
|
||||
// TODO: this loop is inefficient because Pop searches again and again an occupied bucket.
|
||||
for (unsigned i = 0; i < count && !ss->Empty(); ++i) {
|
||||
result.push_back(ss->Pop().value());
|
||||
}
|
||||
} else {
|
||||
DCHECK_EQ(st.second, kEncodingStrMap);
|
||||
|
@ -534,6 +553,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
if (!SetFamily::ConvertToStrSet(is, intsetLen(is), &tmp)) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
// frees 'is' on a way.
|
||||
if (GetFlag(FLAGS_use_set2)) {
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap2, tmp.ptr);
|
||||
|
@ -551,7 +571,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
}
|
||||
|
||||
if (co.Encoding() != kEncodingIntSet) {
|
||||
res = AddStrSet(std::move(vals), &co);
|
||||
res = AddStrSet(op_args.db_cntx, std::move(vals), UINT32_MAX, &co);
|
||||
}
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
|
@ -559,7 +579,52 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
return res;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgSlice& vals) {
|
||||
OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_sec,
|
||||
ArgSlice vals) {
|
||||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
|
||||
try {
|
||||
tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
CompactObj& co = it->second;
|
||||
|
||||
if (new_key) {
|
||||
CHECK(absl::GetFlag(FLAGS_use_set2));
|
||||
InitStrSet(&co);
|
||||
} else {
|
||||
// for non-overwrite case it must be set.
|
||||
if (co.ObjType() != OBJ_SET)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
// Update stats and trigger any handle the old value if needed.
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
if (co.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)co.RObjPtr();
|
||||
robj tmp;
|
||||
if (!SetFamily::ConvertToStrSet(is, intsetLen(is), &tmp)) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
co.InitRobj(OBJ_SET, kEncodingStrMap2, tmp.ptr);
|
||||
}
|
||||
|
||||
CHECK(co.Encoding() == kEncodingStrMap2);
|
||||
}
|
||||
|
||||
uint32_t res = AddStrSet(op_args.db_cntx, std::move(vals), ttl_sec, &co);
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals) {
|
||||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
|
||||
|
@ -570,7 +635,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgS
|
|||
db_slice.PreUpdate(op_args.db_cntx.db_index, *find_res);
|
||||
|
||||
CompactObj& co = find_res.value()->second;
|
||||
auto [removed, isempty] = RemoveSet(vals, &co);
|
||||
auto [removed, isempty] = RemoveSet(op_args.db_cntx, vals, &co);
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *find_res, key);
|
||||
|
||||
|
@ -586,7 +651,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, std::string_view key, const ArgS
|
|||
// and reports the result.
|
||||
class Mover {
|
||||
public:
|
||||
Mover(std::string_view src, std::string_view dest, std::string_view member)
|
||||
Mover(string_view src, string_view dest, string_view member)
|
||||
: src_(src), dest_(dest), member_(member) {
|
||||
}
|
||||
|
||||
|
@ -597,7 +662,7 @@ class Mover {
|
|||
OpStatus OpFind(Transaction* t, EngineShard* es);
|
||||
OpStatus OpMutate(Transaction* t, EngineShard* es);
|
||||
|
||||
std::string_view src_, dest_, member_;
|
||||
string_view src_, dest_, member_;
|
||||
OpResult<bool> found_[2];
|
||||
};
|
||||
|
||||
|
@ -614,7 +679,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
|
|||
DCHECK(!res->is_done());
|
||||
const CompactObj& val = res.value()->second;
|
||||
SetType st{val.RObjPtr(), val.Encoding()};
|
||||
found_[0] = IsInSet(st, member_);
|
||||
found_[0] = IsInSet(t->db_context(), st, member_);
|
||||
} else {
|
||||
found_[index] = res.status();
|
||||
}
|
||||
|
@ -674,10 +739,16 @@ OpResult<StringVec> OpUnion(const OpArgs& op_args, ArgSlice keys) {
|
|||
DCHECK(!keys.empty());
|
||||
absl::flat_hash_set<string> uniques;
|
||||
|
||||
for (std::string_view key : keys) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
|
||||
for (string_view key : keys) {
|
||||
OpResult<PrimeIterator> find_res =
|
||||
op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
|
||||
if (find_res) {
|
||||
container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){
|
||||
PrimeValue& pv = find_res.value()->second;
|
||||
if (pv.Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)pv.RObjPtr();
|
||||
ss->set_time(TimeNowSecRel(op_args.db_cntx.time_now_ms));
|
||||
}
|
||||
container_utils::IterateSet(pv, [&uniques](container_utils::ContainerEntry ce) {
|
||||
uniques.emplace(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
|
@ -704,7 +775,13 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
|
|||
}
|
||||
|
||||
absl::flat_hash_set<string> uniques;
|
||||
container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce) {
|
||||
PrimeValue& pv = find_res.value()->second;
|
||||
if (pv.Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)pv.RObjPtr();
|
||||
ss->set_time(TimeNowSecRel(op_args.db_cntx.time_now_ms));
|
||||
}
|
||||
|
||||
container_utils::IterateSet(pv, [&uniques](container_utils::ContainerEntry ce) {
|
||||
uniques.emplace(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
|
@ -732,7 +809,7 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
|
|||
uniques.erase(string_view{buf, size_t(next - buf)});
|
||||
}
|
||||
} else {
|
||||
DiffStrSet(st2, &uniques);
|
||||
DiffStrSet(op_args.db_cntx, st2, &uniques);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -753,10 +830,17 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
|
|||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
container_utils::IterateSet(find_res.value()->second, [&result](container_utils::ContainerEntry ce) {
|
||||
result.push_back(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
PrimeValue& pv = find_res.value()->second;
|
||||
if (pv.Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)pv.RObjPtr();
|
||||
ss->set_time(TimeNowSecRel(t->db_context().time_now_ms));
|
||||
}
|
||||
|
||||
container_utils::IterateSet(find_res.value()->second,
|
||||
[&result](container_utils::ContainerEntry ce) {
|
||||
result.push_back(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -782,8 +866,8 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
|
|||
if (status != OpStatus::OK)
|
||||
return status;
|
||||
|
||||
auto comp = [](const SetType& left, const SetType& right) {
|
||||
return SetTypeLen(left) < SetTypeLen(right);
|
||||
auto comp = [db_contx = t->db_context()](const SetType& left, const SetType& right) {
|
||||
return SetTypeLen(db_contx, left) < SetTypeLen(db_contx, right);
|
||||
};
|
||||
|
||||
std::sort(sets.begin(), sets.end(), comp);
|
||||
|
@ -797,7 +881,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
|
|||
while (intsetGet(is, ii++, &intele)) {
|
||||
size_t j = 1;
|
||||
for (j = 1; j < sets.size(); j++) {
|
||||
if (sets[j].first != is && !IsInSet(sets[j], intele))
|
||||
if (sets[j].first != is && !IsInSet(t->db_context(), sets[j], intele))
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -807,17 +891,94 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
|
|||
}
|
||||
}
|
||||
} else {
|
||||
InterStrSet(sets, &result);
|
||||
InterStrSet(t->db_context(), sets, &result);
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
// count - how many elements to pop.
|
||||
OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
vector<std::string_view> vals(args.size() - 2);
|
||||
StringVec result;
|
||||
if (count == 0)
|
||||
return result;
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
size_t slen = it->second.Size();
|
||||
|
||||
/* CASE 1:
|
||||
* The number of requested elements is greater than or equal to
|
||||
* the number of elements inside the set: simply return the whole set. */
|
||||
if (count >= slen) {
|
||||
PrimeValue& pv = it->second;
|
||||
if (pv.Encoding() == kEncodingStrMap2) {
|
||||
StringSet* ss = (StringSet*)pv.RObjPtr();
|
||||
ss->set_time(TimeNowSecRel(op_args.db_cntx.time_now_ms));
|
||||
}
|
||||
|
||||
container_utils::IterateSet(it->second, [&result](container_utils::ContainerEntry ce) {
|
||||
result.push_back(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
|
||||
/* Delete the set as it is now empty */
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
|
||||
} else {
|
||||
SetType st{it->second.RObjPtr(), it->second.Encoding()};
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
if (st.second == kEncodingIntSet) {
|
||||
intset* is = (intset*)st.first;
|
||||
int64_t val = 0;
|
||||
|
||||
// copy last count values.
|
||||
for (uint32_t i = slen - count; i < slen; ++i) {
|
||||
intsetGet(is, i, &val);
|
||||
result.push_back(absl::StrCat(val));
|
||||
}
|
||||
|
||||
is = intsetTrimTail(is, count); // now remove last count items
|
||||
it->second.SetRObjPtr(is);
|
||||
} else {
|
||||
result = PopStrSet(op_args.db_cntx, count, st);
|
||||
}
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cursor) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 10;
|
||||
|
||||
if (it->second.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)it->second.RObjPtr();
|
||||
int64_t intele;
|
||||
uint32_t pos = 0;
|
||||
while (intsetGet(is, pos++, &intele)) {
|
||||
res.push_back(absl::StrCat(intele));
|
||||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
*cursor = ScanStrSet(op_args.db_cntx, it->second, *cursor, count, &res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void SAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
vector<string_view> vals(args.size() - 2);
|
||||
for (size_t i = 2; i < args.size(); ++i) {
|
||||
vals[i - 2] = ArgS(args, i);
|
||||
}
|
||||
|
@ -835,16 +996,16 @@ void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
|
||||
void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
std::string_view val = ArgS(args, 2);
|
||||
void SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view val = ArgS(args, 2);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
|
||||
|
||||
if (find_res) {
|
||||
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
|
||||
return IsInSet(st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND;
|
||||
return IsInSet(t->db_context(), st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
return find_res.status();
|
||||
|
@ -859,10 +1020,10 @@ void SetFamily::SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SMove(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view src = ArgS(args, 1);
|
||||
std::string_view dest = ArgS(args, 2);
|
||||
std::string_view member = ArgS(args, 3);
|
||||
void SMove(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view src = ArgS(args, 1);
|
||||
string_view dest = ArgS(args, 2);
|
||||
string_view member = ArgS(args, 3);
|
||||
|
||||
Mover mover{src, dest, member};
|
||||
cntx->transaction->Schedule();
|
||||
|
@ -878,9 +1039,9 @@ void SetFamily::SMove(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendLong(result.value());
|
||||
}
|
||||
|
||||
void SetFamily::SRem(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
vector<std::string_view> vals(args.size() - 2);
|
||||
void SRem(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
vector<string_view> vals(args.size() - 2);
|
||||
for (size_t i = 2; i < args.size(); ++i) {
|
||||
vals[i - 2] = ArgS(args, i);
|
||||
}
|
||||
|
@ -901,8 +1062,8 @@ void SetFamily::SRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
void SCard(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
|
||||
|
@ -925,11 +1086,11 @@ void SetFamily::SCard(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SPop(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
void SPop(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
unsigned count = 1;
|
||||
if (args.size() > 2) {
|
||||
std::string_view arg = ArgS(args, 2);
|
||||
string_view arg = ArgS(args, 2);
|
||||
if (!absl::SimpleAtoi(arg, &count)) {
|
||||
(*cntx)->SendError(kInvalidIntErr);
|
||||
return;
|
||||
|
@ -958,9 +1119,9 @@ void SetFamily::SPop(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
|
||||
void SetFamily::SDiff(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SDiff(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
std::string_view src_key = ArgS(args, 1);
|
||||
string_view src_key = ArgS(args, 1);
|
||||
ShardId src_shard = Shard(src_key, result_set.size());
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -989,11 +1150,11 @@ void SetFamily::SDiff(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendStringArr(arr);
|
||||
}
|
||||
|
||||
void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
std::string_view dest_key = ArgS(args, 1);
|
||||
string_view dest_key = ArgS(args, 1);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
std::string_view src_key = ArgS(args, 2);
|
||||
string_view src_key = ArgS(args, 2);
|
||||
ShardId src_shard = Shard(src_key, result_set.size());
|
||||
|
||||
VLOG(1) << "SDiffStore " << src_key << " " << src_shard;
|
||||
|
@ -1043,7 +1204,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendLong(result.size());
|
||||
}
|
||||
|
||||
void SetFamily::SMembers(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SMembers(CmdArgList args, ConnectionContext* cntx) {
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpInter(t, shard, false); };
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1060,7 +1221,7 @@ void SetFamily::SMembers(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SInter(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SInter(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1082,9 +1243,9 @@ void SetFamily::SInter(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
std::string_view dest_key = ArgS(args, 1);
|
||||
string_view dest_key = ArgS(args, 1);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
atomic_uint32_t inter_shard_cnt{0};
|
||||
|
||||
|
@ -1122,7 +1283,7 @@ void SetFamily::SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendLong(result->size());
|
||||
}
|
||||
|
||||
void SetFamily::SUnion(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SUnion(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size());
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1145,9 +1306,9 @@ void SetFamily::SUnion(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
std::string_view dest_key = ArgS(args, 1);
|
||||
string_view dest_key = ArgS(args, 1);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
|
||||
auto union_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1186,9 +1347,9 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendLong(result.size());
|
||||
}
|
||||
|
||||
void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
std::string_view token = ArgS(args, 2);
|
||||
void SScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view token = ArgS(args, 2);
|
||||
|
||||
uint64_t cursor = 0;
|
||||
|
||||
|
@ -1217,79 +1378,38 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key, unsigned count) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
// Syntax: saddex key ttl_sec member [member...]
|
||||
void SAddEx(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 1);
|
||||
string_view ttl_str = ArgS(args, 2);
|
||||
uint32_t ttl_sec;
|
||||
constexpr uint32_t kMaxTtl = (1UL << 26);
|
||||
|
||||
StringVec result;
|
||||
if (count == 0)
|
||||
return result;
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
size_t slen = it->second.Size();
|
||||
|
||||
/* CASE 1:
|
||||
* The number of requested elements is greater than or equal to
|
||||
* the number of elements inside the set: simply return the whole set. */
|
||||
if (count >= slen) {
|
||||
container_utils::IterateSet(it->second, [&result](container_utils::ContainerEntry ce) {
|
||||
result.push_back(ce.ToString());
|
||||
return true;
|
||||
});
|
||||
|
||||
/* Delete the set as it is now empty */
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
|
||||
} else {
|
||||
SetType st{it->second.RObjPtr(), it->second.Encoding()};
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
if (st.second == kEncodingIntSet) {
|
||||
intset* is = (intset*)st.first;
|
||||
int64_t val = 0;
|
||||
|
||||
// copy last count values.
|
||||
for (uint32_t i = slen - count; i < slen; ++i) {
|
||||
intsetGet(is, i, &val);
|
||||
result.push_back(absl::StrCat(val));
|
||||
}
|
||||
|
||||
is = intsetTrimTail(is, count); // now remove last count items
|
||||
it->second.SetRObjPtr(is);
|
||||
} else {
|
||||
result = PopStrSet(count, st);
|
||||
}
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
OpResult<StringVec> SetFamily::OpScan(const OpArgs& op_args, std::string_view key,
|
||||
uint64_t* cursor) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_SET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 10;
|
||||
|
||||
if (it->second.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)it->second.RObjPtr();
|
||||
int64_t intele;
|
||||
uint32_t pos = 0;
|
||||
while (intsetGet(is, pos++, &intele)) {
|
||||
res.push_back(absl::StrCat(intele));
|
||||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
*cursor = ScanStrSet(it->second, *cursor, count, &res);
|
||||
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
return res;
|
||||
vector<string_view> vals(args.size() - 3);
|
||||
for (size_t i = 3; i < args.size(); ++i) {
|
||||
vals[i - 3] = ArgS(args, i);
|
||||
}
|
||||
|
||||
ArgSlice arg_slice{vals.data(), vals.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, arg_slice);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
return (*cntx)->SendLong(result.value());
|
||||
}
|
||||
|
||||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* dest) {
|
||||
int64_t intele;
|
||||
char buf[32];
|
||||
|
@ -1334,7 +1454,7 @@ bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* des
|
|||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&SetFamily::x)
|
||||
#define HFUNC(x) SetHandler(&x)
|
||||
|
||||
void SetFamily::Register(CommandRegistry* registry) {
|
||||
*registry << CI{"SADD", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SAdd)
|
||||
|
@ -1351,6 +1471,10 @@ void SetFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
|
||||
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
|
||||
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);
|
||||
|
||||
if (absl::GetFlag(FLAGS_use_set2)) {
|
||||
*registry << CI{"SADDEX", CO::WRITE | CO::FAST | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(SAddEx);
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t SetFamily::MaxIntsetEntries() {
|
||||
|
|
|
@ -32,25 +32,6 @@ class SetFamily {
|
|||
static bool ConvertToStrSet(const intset* is, size_t expected_len, robj* dest);
|
||||
|
||||
private:
|
||||
static void SAdd(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SIsMember(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SRem(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SCard(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SPop(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SUnion(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SUnionStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SDiff(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SDiffStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SMembers(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SMove(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SInter(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SInterStore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SScan(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
// count - how many elements to pop.
|
||||
static OpResult<StringVec> OpPop(const OpArgs& op_args, std::string_view key, unsigned count);
|
||||
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor);
|
||||
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue