mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Add SSCAN command.
Update README with 2.x API commands.
This commit is contained in:
parent
f1ea69c0b4
commit
cb14df0e6b
5 changed files with 156 additions and 17 deletions
|
@ -28,6 +28,8 @@ using SvArray = vector<std::string_view>;
|
|||
|
||||
namespace {
|
||||
|
||||
constexpr uint32_t kMaxIntSetEntries = 256;
|
||||
|
||||
void ConvertTo(intset* src, dict* dest) {
|
||||
int64_t intele;
|
||||
char buf[32];
|
||||
|
@ -53,11 +55,7 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
|
|||
is = intsetAdd(is, llval, &inserted);
|
||||
if (inserted) {
|
||||
*added = true;
|
||||
size_t max_entries = server.set_max_intset_entries;
|
||||
/* limit to 1G entries due to intset internals. */
|
||||
if (max_entries >= 1 << 16)
|
||||
max_entries = 1 << 16;
|
||||
*success = intsetLen(is) <= max_entries;
|
||||
*success = intsetLen(is) <= kMaxIntSetEntries;
|
||||
} else {
|
||||
*added = false;
|
||||
*success = true;
|
||||
|
@ -482,6 +480,12 @@ OpResult<unsigned> Mover::Commit(Transaction* t) {
|
|||
return res;
|
||||
}
|
||||
|
||||
void ScanCallback(void* privdata, const dictEntry* de) {
|
||||
StringVec* sv = (StringVec*)privdata;
|
||||
sds key = (sds)de->key;
|
||||
sv->push_back(string(key, sdslen(key)));
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -859,6 +863,37 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
(*cntx)->SendLong(result.size());
|
||||
}
|
||||
|
||||
void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
std::string_view key = ArgS(args, 1);
|
||||
std::string_view token = ArgS(args, 2);
|
||||
|
||||
uint64_t cursor = 0;
|
||||
|
||||
if (!absl::SimpleAtoi(token, &cursor)) {
|
||||
return (*cntx)->SendError("invalid cursor");
|
||||
}
|
||||
|
||||
if (args.size() > 3) {
|
||||
return (*cntx)->SendError("scan options are not supported yet");
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpScan(OpArgs{shard, t->db_index()}, key, &cursor);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() != OpStatus::WRONG_TYPE) {
|
||||
(*cntx)->StartArray(2);
|
||||
(*cntx)->SendSimpleString(absl::StrCat(cursor));
|
||||
(*cntx)->StartArray(result->size());
|
||||
for (const auto& k : *result) {
|
||||
(*cntx)->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
(*cntx)->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
OpResult<StringVec> SetFamily::OpUnion(const OpArgs& op_args, const ArgSlice& keys) {
|
||||
DCHECK(!keys.empty());
|
||||
absl::flat_hash_set<string> uniques;
|
||||
|
@ -944,7 +979,7 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
|
|||
|
||||
MainIterator it = find_res.value();
|
||||
size_t slen = it->second.Size();
|
||||
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
|
||||
SetType st{it->second.RObjPtr(), it->second.Encoding()};
|
||||
|
||||
/* CASE 1:
|
||||
* The number of requested elements is greater than or equal to
|
||||
|
@ -1061,6 +1096,40 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
|
|||
return result;
|
||||
}
|
||||
|
||||
OpResult<StringVec> SetFamily::OpScan(const OpArgs& op_args, std::string_view key,
|
||||
uint64_t* cursor) {
|
||||
OpResult<MainIterator> find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
MainIterator it = find_res.value();
|
||||
StringVec res;
|
||||
uint32_t count = 10;
|
||||
|
||||
if (it->second.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)it->second.RObjPtr();
|
||||
int64_t intele;
|
||||
uint32_t pos = 0;
|
||||
while (intsetGet(is, pos++, &intele)) {
|
||||
res.push_back(absl::StrCat(intele));
|
||||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
DCHECK_EQ(kEncodingStrMap, it->second.Encoding());
|
||||
long maxiterations = count * 10;
|
||||
|
||||
dict* ds = (dict*)it->second.RObjPtr();
|
||||
uint64_t cur = *cursor;
|
||||
do {
|
||||
cur = dictScan(ds, cur, ScanCallback, NULL, &res);
|
||||
} while (cur && maxiterations-- && res.size() < count);
|
||||
*cursor = cur;
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&SetFamily::x)
|
||||
|
@ -1078,7 +1147,8 @@ void SetFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
|
||||
<< CI{"SPOP", CO::WRITE | CO::RANDOM | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop)
|
||||
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
|
||||
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore);
|
||||
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
|
||||
<< CI{"SSCAN", CO::READONLY | CO::RANDOM, -3, 1, 1, 1}.HFUNC(SScan);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -33,7 +33,7 @@ class SetFamily {
|
|||
static void SMove(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SInter(CmdArgList args, ConnectionContext* cntx);
|
||||
static void SInterStore(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static void SScan(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static OpResult<StringVec> OpUnion(const OpArgs& op_args, const ArgSlice& args);
|
||||
static OpResult<StringVec> OpDiff(const Transaction* t, EngineShard* es);
|
||||
|
@ -41,6 +41,7 @@ class SetFamily {
|
|||
|
||||
// count - how many elements to pop.
|
||||
static OpResult<StringVec> OpPop(const OpArgs& op_args, std::string_view key, unsigned count);
|
||||
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor);
|
||||
|
||||
};
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue