feat(list): Add BLMove command (#753)

Also update api_status with ZUNION and BRPOPLPUSH
Fixes #751

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-02-05 10:55:20 +02:00 committed by GitHub
parent 8d095d00fa
commit da40b63433
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 89 additions and 23 deletions

View file

@ -111,7 +111,7 @@ with respect to Memcached and Redis APIs.
- [X] List Family
- [X] BLPOP
- [X] BRPOP
- [ ] BRPOPLPUSH
- [X] BRPOPLPUSH
- [X] LINSERT
- [X] LPUSHX
- [X] RPUSHX
@ -250,12 +250,13 @@ with respect to Memcached and Redis APIs.
- [X] List Family
- [X] LMOVE
- [X] LPOS
- [X] BLMOVE
- [ ] Stream Family
- [ ] XAUTOCLAIM
- [ ] Sorted Set Family
- [ ] ZUNION
- [X] Sorted Set Family
- [X] ZUNION
## Notes
Some commands were implemented as decorators along the way:

View file

@ -97,6 +97,27 @@ string ListPop(ListDir dir, quicklist* ql) {
return res;
}
optional<ListDir> ParseDir(string_view arg) {
if (arg == "LEFT") {
return ListDir::LEFT;
}
if (arg == "RIGHT") {
return ListDir::RIGHT;
}
return nullopt;
}
string_view DirToSv(ListDir dir) {
switch (dir) {
case ListDir::LEFT:
return "LEFT"sv;
case ListDir::RIGHT:
return "RIGHT"sv;
}
return ""sv;
}
bool ElemCompare(const quicklistEntry& entry, string_view elem) {
if (entry.value) {
return entry.sz == elem.size() &&
@ -867,6 +888,47 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) {
}
}
void BLMove(CmdArgList args, ConnectionContext* cntx) {
string_view src = ArgS(args, 1);
string_view dest = ArgS(args, 2);
string_view timeout_str = ArgS(args, 5);
float timeout;
if (!absl::SimpleAtof(timeout_str, &timeout)) {
return (*cntx)->SendError("timeout is not a float or out of range");
}
if (timeout < 0) {
return (*cntx)->SendError("timeout is negative");
}
ToUpper(&args[3]);
ToUpper(&args[4]);
optional<ListDir> src_dir = ParseDir(ArgS(args, 3));
optional<ListDir> dest_dir = ParseDir(ArgS(args, 4));
if (!src_dir || !dest_dir) {
return (*cntx)->SendError(kSyntaxErr);
}
BPopPusher bpop_pusher(src, dest, *src_dir, *dest_dir);
OpResult<string> op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000));
if (op_res) {
return (*cntx)->SendBulkString(*op_res);
}
switch (op_res.status()) {
case OpStatus::TIMED_OUT:
return (*cntx)->SendNull();
break;
default:
return (*cntx)->SendError(op_res.status());
break;
}
}
BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir)
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
}
@ -890,10 +952,9 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
auto cb_move = [&](Transaction* t, EngineShard* shard) {
OpArgs op_args = t->GetOpArgs(shard);
op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_);
if (op_res) {
if (op_args.shard->journal()) {
RecordJournal(op_args, "RPOPLPUSH", ArgSlice{pop_key_, push_key_}, 1);
}
if (op_res && op_args.shard->journal()) {
ArgSlice args{pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)};
RecordJournal(op_args, "LMOVE", args, 1);
}
return OpStatus::OK;
};
@ -1207,24 +1268,13 @@ void ListFamily::LMove(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[3]);
ToUpper(&args[4]);
ListDir src_dir;
ListDir dest_dir;
if (src_dir_str == "LEFT") {
src_dir = ListDir::LEFT;
} else if (src_dir_str == "RIGHT") {
src_dir = ListDir::RIGHT;
} else {
return (*cntx)->SendError(kSyntaxErr);
}
if (dest_dir_str == "LEFT") {
dest_dir = ListDir::LEFT;
} else if (dest_dir_str == "RIGHT") {
dest_dir = ListDir::RIGHT;
} else {
optional<ListDir> src_dir = ParseDir(src_dir_str);
optional<ListDir> dest_dir = ParseDir(dest_dir_str);
if (!src_dir || !dest_dir) {
return (*cntx)->SendError(kSyntaxErr);
}
MoveGeneric(cntx, src, dest, src_dir, dest_dir);
MoveGeneric(cntx, src, dest, *src_dir, *dest_dir);
}
void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) {
@ -1364,7 +1414,9 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet)
<< CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim)
<< CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem)
<< CI{"LMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 5, 1, 2, 1}.HFUNC(LMove);
<< CI{"LMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 5, 1, 2, 1}.HFUNC(LMove)
<< CI{"BLMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL | CO::BLOCKING, 6, 1, 2, 1}
.SetHandler(BLMove);
}
} // namespace dfly

View file

@ -759,4 +759,17 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
// the atomicity and causes the first bug as well.
}
TEST_F(ListFamilyTest, BLMove) {
EXPECT_THAT(Run({"blmove", "x", "y", "right", "right", "0.05"}), ArgType(RespExpr::NIL));
ASSERT_EQ(0, NumWatched());
EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1));
EXPECT_THAT(Run({"lpush", "y", "val2"}), IntArg(1));
EXPECT_EQ(Run({"blmove", "x", "y", "right", "left", "0.01"}), "val1");
auto resp = Run({"lrange", "y", "0", "-1"});
ASSERT_THAT(resp, ArrLen(2));
ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2"));
}
} // namespace dfly