From da40b6343328c90b600f0cee9f6bc968ae70daaa Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 5 Feb 2023 10:55:20 +0200 Subject: [PATCH] feat(list): Add BLMove command (#753) Also update api_status with ZUNION and BRPOPLPUSH Fixes #751 Signed-off-by: Roman Gershman --- docs/api_status.md | 7 +-- src/server/list_family.cc | 92 ++++++++++++++++++++++++++-------- src/server/list_family_test.cc | 13 +++++ 3 files changed, 89 insertions(+), 23 deletions(-) diff --git a/docs/api_status.md b/docs/api_status.md index 4f40f0fb0..5de962ee3 100644 --- a/docs/api_status.md +++ b/docs/api_status.md @@ -111,7 +111,7 @@ with respect to Memcached and Redis APIs. - [X] List Family - [X] BLPOP - [X] BRPOP - - [ ] BRPOPLPUSH + - [X] BRPOPLPUSH - [X] LINSERT - [X] LPUSHX - [X] RPUSHX @@ -250,12 +250,13 @@ with respect to Memcached and Redis APIs. - [X] List Family - [X] LMOVE - [X] LPOS + - [X] BLMOVE - [ ] Stream Family - [ ] XAUTOCLAIM -- [ ] Sorted Set Family - - [ ] ZUNION +- [X] Sorted Set Family + - [X] ZUNION ## Notes Some commands were implemented as decorators along the way: diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 5332b7d54..e5ff2c8d8 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -97,6 +97,27 @@ string ListPop(ListDir dir, quicklist* ql) { return res; } +optional ParseDir(string_view arg) { + if (arg == "LEFT") { + return ListDir::LEFT; + } + if (arg == "RIGHT") { + return ListDir::RIGHT; + } + + return nullopt; +} + +string_view DirToSv(ListDir dir) { + switch (dir) { + case ListDir::LEFT: + return "LEFT"sv; + case ListDir::RIGHT: + return "RIGHT"sv; + } + return ""sv; +} + bool ElemCompare(const quicklistEntry& entry, string_view elem) { if (entry.value) { return entry.sz == elem.size() && @@ -867,6 +888,47 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) { } } +void BLMove(CmdArgList args, ConnectionContext* cntx) { + string_view src = ArgS(args, 1); + string_view dest = ArgS(args, 2); + string_view timeout_str = ArgS(args, 5); + + float timeout; + if (!absl::SimpleAtof(timeout_str, &timeout)) { + return (*cntx)->SendError("timeout is not a float or out of range"); + } + + if (timeout < 0) { + return (*cntx)->SendError("timeout is negative"); + } + + ToUpper(&args[3]); + ToUpper(&args[4]); + + optional src_dir = ParseDir(ArgS(args, 3)); + optional dest_dir = ParseDir(ArgS(args, 4)); + if (!src_dir || !dest_dir) { + return (*cntx)->SendError(kSyntaxErr); + } + + BPopPusher bpop_pusher(src, dest, *src_dir, *dest_dir); + OpResult op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); + + if (op_res) { + return (*cntx)->SendBulkString(*op_res); + } + + switch (op_res.status()) { + case OpStatus::TIMED_OUT: + return (*cntx)->SendNull(); + break; + + default: + return (*cntx)->SendError(op_res.status()); + break; + } +} + BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir) : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { } @@ -890,10 +952,9 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { auto cb_move = [&](Transaction* t, EngineShard* shard) { OpArgs op_args = t->GetOpArgs(shard); op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_); - if (op_res) { - if (op_args.shard->journal()) { - RecordJournal(op_args, "RPOPLPUSH", ArgSlice{pop_key_, push_key_}, 1); - } + if (op_res && op_args.shard->journal()) { + ArgSlice args{pop_key_, push_key_, DirToSv(popdir_), DirToSv(pushdir_)}; + RecordJournal(op_args, "LMOVE", args, 1); } return OpStatus::OK; }; @@ -1207,24 +1268,13 @@ void ListFamily::LMove(CmdArgList args, ConnectionContext* cntx) { ToUpper(&args[3]); ToUpper(&args[4]); - ListDir src_dir; - ListDir dest_dir; - if (src_dir_str == "LEFT") { - src_dir = ListDir::LEFT; - } else if (src_dir_str == "RIGHT") { - src_dir = ListDir::RIGHT; - } else { - return (*cntx)->SendError(kSyntaxErr); - } - if (dest_dir_str == "LEFT") { - dest_dir = ListDir::LEFT; - } else if (dest_dir_str == "RIGHT") { - dest_dir = ListDir::RIGHT; - } else { + optional src_dir = ParseDir(src_dir_str); + optional dest_dir = ParseDir(dest_dir_str); + if (!src_dir || !dest_dir) { return (*cntx)->SendError(kSyntaxErr); } - MoveGeneric(cntx, src, dest, src_dir, dest_dir); + MoveGeneric(cntx, src, dest, *src_dir, *dest_dir); } void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) { @@ -1364,7 +1414,9 @@ void ListFamily::Register(CommandRegistry* registry) { << CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet) << CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim) << CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem) - << CI{"LMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 5, 1, 2, 1}.HFUNC(LMove); + << CI{"LMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 5, 1, 2, 1}.HFUNC(LMove) + << CI{"BLMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL | CO::BLOCKING, 6, 1, 2, 1} + .SetHandler(BLMove); } } // namespace dfly diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index e643abeb0..c7827dd60 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -759,4 +759,17 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { // the atomicity and causes the first bug as well. } +TEST_F(ListFamilyTest, BLMove) { + EXPECT_THAT(Run({"blmove", "x", "y", "right", "right", "0.05"}), ArgType(RespExpr::NIL)); + ASSERT_EQ(0, NumWatched()); + + EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1)); + EXPECT_THAT(Run({"lpush", "y", "val2"}), IntArg(1)); + + EXPECT_EQ(Run({"blmove", "x", "y", "right", "left", "0.01"}), "val1"); + auto resp = Run({"lrange", "y", "0", "-1"}); + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2")); +} + } // namespace dfly