Add ZREMRANGEBYLEX and ZREVRANGEBYSCORE commands.

This commit is contained in:
Roman Gershman 2022-04-20 23:51:48 +03:00
parent d8697463dc
commit 69911a95ac
6 changed files with 81 additions and 10 deletions

View file

@ -208,9 +208,9 @@ API 2.0
- [X] ZLEXCOUNT
- [X] ZRANGEBYLEX
- [X] ZRANK
- [ ] ZREMRANGEBYLEX
- [X] ZREMRANGEBYLEX
- [X] ZREMRANGEBYRANK
- [ ] ZREVRANGEBYSCORE
- [X] ZREVRANGEBYSCORE
- [X] ZREVRANK
- [ ] ZUNIONSTORE
- [ ] ZSCAN

View file

@ -426,7 +426,7 @@ unsigned long zslDeleteRangeByScore(zskiplist *zsl, const zrangespec *range, dic
return removed;
}
unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict) {
unsigned long zslDeleteRangeByLex(zskiplist *zsl, const zlexrangespec *range, dict *dict) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long removed = 0;
int i;
@ -1125,7 +1125,7 @@ unsigned char *zzlDeleteRangeByScore(unsigned char *zl, const zrangespec *range,
return zl;
}
unsigned char *zzlDeleteRangeByLex(unsigned char *zl, zlexrangespec *range, unsigned long *deleted) {
unsigned char *zzlDeleteRangeByLex(unsigned char *zl, const zlexrangespec *range, unsigned long *deleted) {
unsigned char *eptr, *sptr;
unsigned long num = 0;

View file

@ -102,6 +102,10 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
unsigned long zslDeleteRangeByScore(zskiplist *zsl, const zrangespec *range, dict *dict);
unsigned char *zzlDeleteRangeByScore(unsigned char *zl, const zrangespec *range, unsigned long *deleted);
unsigned long zslDeleteRangeByLex(zskiplist *zsl, const zlexrangespec *range, dict *dict);
unsigned char *zzlDeleteRangeByLex(unsigned char *zl, const zlexrangespec *range, unsigned long *deleted);
extern sds cmaxstring;
extern sds cminstring;

View file

@ -22,7 +22,7 @@ namespace dfly {
using namespace std;
using namespace facade;
using absl::SimpleAtoi;
namespace {
using CI = CommandId;
@ -321,7 +321,17 @@ void IntervalVisitor::ActionRem(const zrangespec& range) {
}
void IntervalVisitor::ActionRem(const zlexrangespec& range) {
LOG(FATAL) << "TBD";
if (zobj_->encoding == OBJ_ENCODING_LISTPACK) {
uint8_t* zl = (uint8_t*)zobj_->ptr;
unsigned long deleted = 0;
zl = zzlDeleteRangeByLex(zl, &range, &deleted);
zobj_->ptr = zl;
removed_ = deleted;
} else {
CHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj_->encoding);
zset* zs = (zset*)zobj_->ptr;
removed_ = zslDeleteRangeByLex(zs->zsl, &range, zs->dict);
}
}
void IntervalVisitor::ExtractListPack(const zrangespec& range) {
@ -764,6 +774,38 @@ void ZSetFamily::ZRevRange(CmdArgList args, ConnectionContext* cntx) {
ZRangeGeneric(std::move(args), true, cntx);
}
void ZSetFamily::ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);
RangeParams range_params;
range_params.reverse = true;
for (size_t i = 4; i < args.size(); ++i) {
ToUpper(&args[i]);
string_view cur_arg = ArgS(args, i);
if (cur_arg == "WITHSCORES") {
range_params.with_scores = true;
} else if (cur_arg == "LIMIT") {
if (i + 3 != args.size())
return (*cntx)->SendError(kSyntaxErr);
string_view os = ArgS(args, i + 1);
string_view cs = ArgS(args, i + 2);
if (!SimpleAtoi(os, &range_params.offset) || !SimpleAtoi(cs, &range_params.limit))
return (*cntx)->SendError(kSyntaxErr);
i += 3;
} else {
return (*cntx)->SendError(absl::StrCat("unsupported option ", cur_arg), kSyntaxErrType);
}
}
ZRangeByScoreInternal(key, min_s, max_s, range_params, cntx);
}
void ZSetFamily::ZRevRank(CmdArgList args, ConnectionContext* cntx) {
ZRankGeneric(std::move(args), true, cntx);
}
@ -784,7 +826,7 @@ void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kSyntaxErr);
string_view os = ArgS(args, 5);
string_view cs = ArgS(args, 6);
if (!absl::SimpleAtoi(os, &count) || !absl::SimpleAtoi(cs, &count)) {
if (!SimpleAtoi(os, &count) || !SimpleAtoi(cs, &count)) {
return (*cntx)->SendError(kInvalidIntErr);
}
}
@ -835,7 +877,7 @@ void ZSetFamily::ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx) {
string_view max_s = ArgS(args, 3);
IndexInterval ii;
if (!absl::SimpleAtoi(min_s, &ii.first) || !absl::SimpleAtoi(max_s, &ii.second)) {
if (!SimpleAtoi(min_s, &ii.first) || !SimpleAtoi(max_s, &ii.second)) {
return (*cntx)->SendError(kInvalidIntErr);
}
@ -861,6 +903,23 @@ void ZSetFamily::ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx) {
ZRemRangeGeneric(key, range_spec, cntx);
}
void ZSetFamily::ZRemRangeByLex(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view min_s = ArgS(args, 2);
string_view max_s = ArgS(args, 3);
LexInterval li;
if (!ParseLexBound(min_s, &li.first) || !ParseLexBound(max_s, &li.second)) {
return (*cntx)->SendError(kLexRangeErr);
}
ZRangeSpec range_spec;
range_spec.interval = li;
ZRemRangeGeneric(key, range_spec, cntx);
}
void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
@ -985,7 +1044,7 @@ void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext*
IndexInterval ii;
if (!absl::SimpleAtoi(min_s, &ii.first) || !absl::SimpleAtoi(max_s, &ii.second)) {
if (!SimpleAtoi(min_s, &ii.first) || !SimpleAtoi(max_s, &ii.second)) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
@ -1317,7 +1376,9 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZScore)
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByRank)
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore)
<< CI{"ZREVRANGE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRevRange)
<< CI{"ZREMRANGEBYLEX", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByLex)
<< CI{"ZREVRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZRevRange)
<< CI{"ZREVRANGEBYSCORE", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZRevRangeByScore)
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank);
}

View file

@ -65,7 +65,9 @@ class ZSetFamily {
static void ZRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByRank(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRemRangeByLex(CmdArgList args, ConnectionContext* cntx);
static void ZRevRange(CmdArgList args, ConnectionContext* cntx);
static void ZRevRangeByScore(CmdArgList args, ConnectionContext* cntx);
static void ZRevRank(CmdArgList args, ConnectionContext* cntx);
static void ZRangeByScoreInternal(std::string_view key, std::string_view min_s,

View file

@ -69,6 +69,7 @@ TEST_F(ZSetFamilyTest, ZRangeRank) {
Run({"zadd", "x", "1.1", "a", "2.1", "b"});
EXPECT_THAT(Run({"zrangebyscore", "x", "0", "(1.1"}), ElementsAre(ArrLen(0)));
EXPECT_THAT(Run({"zrangebyscore", "x", "-inf", "1.1"}), ElementsAre("a"));
EXPECT_THAT(Run({"zrevrangebyscore", "x", "-inf", "+inf"}), ElementsAre("b", "a"));
EXPECT_EQ(2, CheckedInt({"zcount", "x", "1.1", "2.1"}));
EXPECT_EQ(1, CheckedInt({"zcount", "x", "(1.1", "2.1"}));
@ -121,6 +122,9 @@ TEST_F(ZSetFamilyTest, ByLex) {
auto resp = Run({"zrangebylex", "key", "-", "[cool"});
EXPECT_THAT(resp, ElementsAre("alpha", "bar", "cool"));
EXPECT_EQ(3, CheckedInt({"ZLEXCOUNT", "key", "(foo", "+"}));
EXPECT_EQ(3, CheckedInt({"ZREMRANGEBYLEX", "key", "(foo", "+"}));
EXPECT_THAT(Run({"zrangebylex", "key", "[a", "+"}),
ElementsAre("alpha", "bar", "cool", "down", "elephant", "foo"));
}
} // namespace dfly