From 15725f49b91cb97b721464ef1ae8055ca333161c Mon Sep 17 00:00:00 2001 From: Nemo <42828324+YuxuanChen98@users.noreply.github.com> Date: Sun, 16 Oct 2022 15:06:01 +0800 Subject: [PATCH] feat(server): implement LMOVE #369 (#391) Signed-off-by: chenyuxuan.allen Co-authored-by: chenyuxuan.allen --- CONTRIBUTORS.md | 1 + src/server/list_family.cc | 195 ++++++++++++++++++++------------- src/server/list_family.h | 3 + src/server/list_family_test.cc | 162 +++++++++++++++++++++++++++ 4 files changed, 283 insertions(+), 78 deletions(-) diff --git a/CONTRIBUTORS.md b/CONTRIBUTORS.md index e8f1c4455..fb4b2f580 100644 --- a/CONTRIBUTORS.md +++ b/CONTRIBUTORS.md @@ -3,6 +3,7 @@ * **[Amir Alperin](https://github.com/iko1)** * **[Philipp Born](https://github.com/tamcore)** * Helm Chart +* **[Yuxuan Chen](https://github.com/YuxuanChen98)** * **[Redha Lhimeur](https://github.com/redhal)** * **[Braydn Moore](https://github.com/braydnm)** * **[Logan Raarup](https://github.com/logandk)** diff --git a/src/server/list_family.cc b/src/server/list_family.cc index eed2dfdf2..2eb909266 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -276,7 +276,8 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { return OpStatus::OK; } -OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) { +OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest, + ListDir src_dir, ListDir dest_dir) { auto& db_slice = op_args.shard->db_slice(); auto src_res = db_slice.Find(op_args.db_cntx, src, OBJ_LIST); if (!src_res) @@ -287,9 +288,10 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, if (src == dest) { // simple case. db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); - string val = ListPop(ListDir::RIGHT, src_ql); + string val = ListPop(src_dir, src_ql); - quicklistPushHead(src_ql, val.data(), val.size()); + int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; + quicklistPush(src_ql, val.data(), val.size(), pos); db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); return val; @@ -323,8 +325,9 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, db_slice.PreUpdate(op_args.db_cntx.db_index, src_it); - string val = ListPop(ListDir::RIGHT, src_ql); - quicklistPushHead(dest_ql, val.data(), val.size()); + string val = ListPop(src_dir, src_ql); + int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; + quicklistPush(dest_ql, val.data(), val.size(), pos); db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src); db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key); @@ -336,9 +339,9 @@ OpResult OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, return val; } -// Read-only peek operation that determines wether the list exists and optionally -// returns the first from right value without popping it from the list. -OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { +// Read-only peek operation that determines whether the list exists and optionally +// returns the first from left/right value without popping it from the list. +OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool fetch) { auto it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!it_res) { return it_res.status(); @@ -349,7 +352,8 @@ OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { quicklist* ql = GetQL(it_res.value()->second); quicklistEntry entry = container_utils::QLEntry(); - quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL); + quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD) : + quicklistGetIterator(ql, AL_START_TAIL); CHECK(quicklistNext(iter, &entry)); quicklistReleaseIterator(iter); @@ -480,74 +484,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { string_view src = ArgS(args, 1); string_view dest = ArgS(args, 2); - OpResult result; - - if (cntx->transaction->unique_shard_cnt() == 1) { - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRPopLPushSingleShard(t->GetOpArgs(shard), src, dest); - }; - - result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); - } else { - CHECK_EQ(2u, cntx->transaction->unique_shard_cnt()); - - OpResult find_res[2]; - - // Transaction is comprised of 2 hops: - // 1 - check for entries existence, their types and if possible - - // read the value we may rpop from the source list. - // 2. If everything is ok, rpop from source and lpush the peeked value into - // the destination. - // - cntx->transaction->Schedule(); - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - DCHECK_EQ(1u, args.size()); - bool is_dest = args.front() == dest; - find_res[is_dest] = RPeek(t->GetOpArgs(shard), args.front(), !is_dest); - return OpStatus::OK; - }; - - cntx->transaction->Execute(move(cb), false); - - if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { - auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - cntx->transaction->Execute(move(cb), true); - result = find_res[0] ? find_res[1] : find_res[0]; - } else { - // Everything is ok, lets proceed with the mutations. - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - bool is_dest = args.front() == dest; - OpArgs op_args = t->GetOpArgs(shard); - - if (is_dest) { - string_view val{find_res[0].value()}; - absl::Span span{&val, 1}; - OpPush(op_args, args.front(), ListDir::LEFT, false, span); - } else { - OpPop(op_args, args.front(), ListDir::RIGHT, 1, false); - } - return OpStatus::OK; - }; - cntx->transaction->Execute(move(cb), true); - result = std::move(find_res[0].value()); - } - } - - if (result) { - return (*cntx)->SendBulkString(*result); - } - - switch (result.status()) { - case OpStatus::KEY_NOTFOUND: - (*cntx)->SendNull(); - break; - - default: - (*cntx)->SendError(result.status()); - break; - } + MoveGeneric(cntx, src, dest, ListDir::RIGHT, ListDir::LEFT); } void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) { @@ -767,6 +704,35 @@ void ListFamily::BRPop(CmdArgList args, ConnectionContext* cntx) { BPopGeneric(ListDir::RIGHT, std::move(args), cntx); } +void ListFamily::LMove(CmdArgList args, ConnectionContext* cntx) { + std::string_view src = ArgS(args, 1); + std::string_view dest = ArgS(args, 2); + std::string_view src_dir_str = ArgS(args, 3); + std::string_view dest_dir_str = ArgS(args, 4); + + ToUpper(&args[3]); + ToUpper(&args[4]); + + ListDir src_dir; + ListDir dest_dir; + if (src_dir_str == "LEFT") { + src_dir = ListDir::LEFT; + } else if (src_dir_str == "RIGHT") { + src_dir = ListDir::RIGHT; + } else { + return (*cntx)->SendError(kSyntaxErr); + } + if (dest_dir_str == "LEFT") { + dest_dir = ListDir::LEFT; + } else if (dest_dir_str == "RIGHT") { + dest_dir = ListDir::RIGHT; + } else { + return (*cntx)->SendError(kSyntaxErr); + } + + MoveGeneric(cntx, src, dest, src_dir, dest_dir); +} + void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) { DCHECK_GE(args.size(), 3u); @@ -876,6 +842,78 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } } +void ListFamily::MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, + ListDir src_dir, ListDir dest_dir) { + OpResult result; + + if (cntx->transaction->unique_shard_cnt() == 1) { + auto cb = [&](Transaction* t, EngineShard* shard) { + return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); + }; + + result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + } else { + CHECK_EQ(2u, cntx->transaction->unique_shard_cnt()); + + OpResult find_res[2]; + + // Transaction is comprised of 2 hops: + // 1 - check for entries existence, their types and if possible - + // read the value we may move from the source list. + // 2. If everything is ok, pop from source and push the peeked value into + // the destination. + // + cntx->transaction->Schedule(); + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + DCHECK_EQ(1u, args.size()); + bool is_dest = args.front() == dest; + find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); + return OpStatus::OK; + }; + + cntx->transaction->Execute(move(cb), false); + + if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { + auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + cntx->transaction->Execute(move(cb), true); + result = find_res[0] ? find_res[1] : find_res[0]; + } else { + // Everything is ok, lets proceed with the mutations. + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + bool is_dest = args.front() == dest; + OpArgs op_args = t->GetOpArgs(shard); + + if (is_dest) { + string_view val{find_res[0].value()}; + absl::Span span{&val, 1}; + OpPush(op_args, args.front(), dest_dir, false, span); + } else { + OpPop(op_args, args.front(), src_dir, 1, false); + } + return OpStatus::OK; + }; + cntx->transaction->Execute(move(cb), true); + result = std::move(find_res[0].value()); + } + } + + if (result) { + return (*cntx)->SendBulkString(*result); + } + + switch (result.status()) { + case OpStatus::KEY_NOTFOUND: + (*cntx)->SendNull(); + break; + + default: + (*cntx)->SendError(result.status()); + break; + } +} + OpResult ListFamily::OpLen(const OpArgs& op_args, std::string_view key) { auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST); if (!res) @@ -1153,7 +1191,8 @@ void ListFamily::Register(CommandRegistry* registry) { << CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange) << CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet) << CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim) - << CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem); + << CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem) + << CI{"LMOVE", CO::WRITE | CO::DENYOOM, 5, 1, 2, 1}.HFUNC(LMove); } } // namespace dfly diff --git a/src/server/list_family.h b/src/server/list_family.h index 00df8f479..9bc2b4e9d 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -37,10 +37,13 @@ class ListFamily { static void LRem(CmdArgList args, ConnectionContext* cntx); static void LSet(CmdArgList args, ConnectionContext* cntx); static void RPopLPush(CmdArgList args, ConnectionContext* cntx); + static void LMove(CmdArgList args, ConnectionContext* cntx); static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args, ConnectionContext* cntx); + static void MoveGeneric(ConnectionContext* cntx, std::string_view src, std::string_view dest, + ListDir src_dir, ListDir dest_dir); static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index f9d9eb67a..01e074c0c 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -460,4 +460,166 @@ TEST_F(ListFamilyTest, LPos) { ASSERT_THAT(resp.GetVec(), ElementsAre(IntArg(0), IntArg(3), IntArg(4))); } +TEST_F(ListFamilyTest, RPopLPush) { + // src and dest are diffrent keys + auto resp = Run({"rpush", kKey1, "1", "a", "b", "1", "2", "3", "4"}); + ASSERT_THAT(resp, IntArg(7)); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "4"); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "3"); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "2"); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(3)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "a", "b")); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(4)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2", "3", "4")); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "b"); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "a"); + + resp = Run({"rpoplpush", kKey1, kKey2}); + ASSERT_THAT(resp, "1"); + + ASSERT_THAT(Run({"lrange", kKey1, "0", "-1"}), ArrLen(0)); + EXPECT_THAT(Run({"exists", kKey1}), IntArg(0)); + ASSERT_THAT(Run({"rpoplpush", kKey1, kKey2}), ArgType(RespExpr::NIL)); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(7)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "a", "b", "1", "2", "3", "4")); + + // src and dest are the same key + resp = Run({"rpush", kKey1, "1", "a", "b", "1", "2", "3", "4"}); + ASSERT_THAT(resp, IntArg(7)); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "4"); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "3"); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "2"); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(7)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2", "3", "4", "1", "a", "b")); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "b"); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "a"); + + resp = Run({"rpoplpush", kKey1, kKey1}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(7)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "a", "b", "1", "2", "3", "4")); +} + +TEST_F(ListFamilyTest, LMove) { + // src and dest are different keys + auto resp = Run({"rpush", kKey1, "1", "2", "3", "4", "5"}); + ASSERT_THAT(resp, IntArg(5)); + + resp = Run({"lmove", kKey1, kKey2, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lmove", kKey1, kKey2, "LEFT", "LEFT"}); + ASSERT_THAT(resp, "2"); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("2", "1")); + + resp = Run({"lmove", kKey1, kKey2, "RIGHT", "LEFT"}); + ASSERT_THAT(resp, "5"); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(3)); + ASSERT_THAT(resp.GetVec(), ElementsAre("5", "2", "1")); + + resp = Run({"lmove", kKey1, kKey2, "RIGHT", "RIGHT"}); + ASSERT_THAT(resp, "4"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_EQ(resp, "3"); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(4)); + ASSERT_THAT(resp.GetVec(), ElementsAre("5", "2", "1", "4")); + + resp = Run({"lmove", kKey1, kKey2, "RIGHT", "RIGHT"}); + ASSERT_THAT(resp, "3"); + + ASSERT_THAT(Run({"lrange", kKey1, "0", "-1"}), ArrLen(0)); + EXPECT_THAT(Run({"exists", kKey1}), IntArg(0)); + ASSERT_THAT(Run({"lmove", kKey1, kKey2, "LEFT", "RIGHT"}), ArgType(RespExpr::NIL)); + ASSERT_THAT(Run({"lmove", kKey1, kKey2, "RIGHT", "RIGHT"}), ArgType(RespExpr::NIL)); + + resp = Run({"lrange", kKey2, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(5)); + ASSERT_THAT(resp.GetVec(), ElementsAre("5", "2", "1", "4", "3")); + + // src and dest are the same key + resp = Run({"rpush", kKey1, "1", "2", "3", "4", "5"}); + ASSERT_THAT(resp, IntArg(5)); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "LEFT"}); + ASSERT_THAT(resp, "2"); + + resp = Run({"lmove", kKey1, kKey1, "RIGHT", "LEFT"}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lmove", kKey1, kKey1, "RIGHT", "RIGHT"}); + ASSERT_THAT(resp, "5"); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "1"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(5)); + ASSERT_THAT(resp.GetVec(), ElementsAre("2", "3", "4", "5", "1")); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "2"); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "3"); + + resp = Run({"lmove", kKey1, kKey1, "RIGHT", "RIGHT"}); + ASSERT_THAT(resp, "3"); + + resp = Run({"lmove", kKey1, kKey1, "LEFT", "RIGHT"}); + ASSERT_THAT(resp, "4"); + + resp = Run({"lrange", kKey1, "0", "-1"}); + ASSERT_THAT(resp, ArrLen(5)); + ASSERT_THAT(resp.GetVec(), ElementsAre("5", "1", "2", "3", "4")); + + ASSERT_THAT(Run({"lmove", kKey1, kKey1, "LEFT", "R"}), ArgType(RespExpr::ERROR)); +} + } // namespace dfly