fix: LMPOP implementation was added (#4504)

This commit is contained in:
Volodymyr Yavdoshenko 2025-01-28 10:52:38 +02:00 committed by GitHub
parent 44506ad2a9
commit efb7861cee
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 376 additions and 0 deletions

View file

@ -69,6 +69,7 @@ using namespace std;
using namespace facade;
using absl::GetFlag;
using time_point = Transaction::time_point;
using absl::SimpleAtoi;
namespace {
@ -1173,8 +1174,159 @@ void BPopGeneric(ListDir dir, CmdArgList args, Transaction* tx, SinkReplyBuilder
return rb->SendNullArray();
}
struct LMPopParams {
uint32_t num_keys;
ListDir dir;
int pop_count;
};
facade::ErrorReply ParseLMPop(CmdArgList args, LMPopParams* params) {
CmdArgParser parser{args};
if (!SimpleAtoi(parser.Next(), &params->num_keys)) {
return facade::ErrorReply(kUintErr);
}
if (params->num_keys <= 0 || !parser.HasAtLeast(params->num_keys + 1)) {
return facade::ErrorReply(kSyntaxErr);
}
parser.Skip(params->num_keys);
if (parser.Check("LEFT")) {
params->dir = ListDir::LEFT;
} else if (parser.Check("RIGHT")) {
params->dir = ListDir::RIGHT;
} else {
return facade::ErrorReply(kSyntaxErr);
}
params->pop_count = 1;
if (parser.HasNext()) {
if (!parser.Check("COUNT", &params->pop_count)) {
return facade::ErrorReply(kSyntaxErr);
}
}
if (!parser.Finalize()) {
return facade::ErrorReply(parser.Error()->MakeReply());
}
return facade::ErrorReply(OpStatus::OK);
}
// Returns the first non-empty key found in the shard arguments along with its type validity.
// Returns a pair of (key, is_valid_type) where is_valid_type is true if the key exists
// and has the correct type (LIST). If a wrong type is found, returns that key with false.
// Returns nullopt if no suitable key is found.
optional<pair<string_view, bool>> GetFirstNonEmptyKeyFound(EngineShard* shard, Transaction* t) {
ShardArgs keys = t->GetShardArgs(shard->shard_id());
DCHECK(!keys.Empty());
auto& db_slice = t->GetDbSlice(shard->shard_id());
optional<pair<string_view, bool>> result;
for (string_view key : keys) {
auto res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_LIST);
if (res) {
result = {key, true};
break;
}
// If the key is not found, check if it's a wrong type error
if (res.status() == OpStatus::WRONG_TYPE) {
result = {key, false};
break;
}
}
return result;
}
} // namespace
void ListFamily::LMPop(CmdArgList args, const CommandContext& cmd_cntx) {
auto* response_builder = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
LMPopParams params;
facade::ErrorReply parse_result = ParseLMPop(args, &params);
if (parse_result.status != OpStatus::OK) {
response_builder->SendError(parse_result);
return;
}
// Create a vector to store first found key for each shard
vector<optional<pair<string_view, bool>>> found_keys_per_shard(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
// Each shard writes results to its own space
found_keys_per_shard[shard->shard_id()] = GetFirstNonEmptyKeyFound(shard, t);
return OpStatus::OK;
};
cmd_cntx.tx->Execute(std::move(cb), false /* followed by another hop */);
// Find the first existing key from command arguments
optional<string_view> key_to_pop;
bool found_wrong_type = false;
size_t min_index = numeric_limits<size_t>::max();
// Iterate over each shard to find the key with the smallest index
for (ShardId sid = 0; sid < found_keys_per_shard.size(); ++sid) {
if (!found_keys_per_shard[sid])
continue;
const auto& [found_key, is_valid_type] = *found_keys_per_shard[sid];
ShardArgs shard_args = cmd_cntx.tx->GetShardArgs(sid);
for (auto it = shard_args.begin(); it != shard_args.end(); ++it) {
if (found_key == *it && it.index() < min_index) {
min_index = it.index();
key_to_pop = found_key;
found_wrong_type = !is_valid_type;
break;
}
}
}
// Handle errors and empty cases first
if (!key_to_pop || found_wrong_type) {
cmd_cntx.tx->Conclude();
if (found_wrong_type) {
response_builder->SendError(kWrongTypeErr);
} else {
response_builder->SendNull();
}
return;
}
// Pop values from the found key
optional<ShardId> key_shard = Shard(*key_to_pop, shard_set->size());
OpResult<StringVec> result;
auto cb_pop = [&params, key_shard, &result, key = *key_to_pop](Transaction* t,
EngineShard* shard) {
if (*key_shard == shard->shard_id()) {
result = OpPop(t->GetOpArgs(shard), key, params.dir, params.pop_count, true, false);
}
return OpStatus::OK;
};
cmd_cntx.tx->Execute(std::move(cb_pop), true);
if (result) {
response_builder->StartArray(2);
response_builder->SendBulkString(*key_to_pop);
response_builder->StartArray(result->size());
for (const auto& val : *result) {
response_builder->SendBulkString(val);
}
} else {
response_builder->SendNull();
}
}
void ListFamily::LPush(CmdArgList args, const CommandContext& cmd_cntx) {
return PushGeneric(ListDir::LEFT, false, std::move(args), cmd_cntx.tx, cmd_cntx.rb);
}
@ -1442,6 +1594,7 @@ namespace acl {
constexpr uint32_t kLPush = WRITE | LIST | FAST;
constexpr uint32_t kLPushX = WRITE | LIST | FAST;
constexpr uint32_t kLPop = WRITE | LIST | FAST;
constexpr uint32_t kLMPop = WRITE | LIST | FAST;
constexpr uint32_t kRPush = WRITE | LIST | FAST;
constexpr uint32_t kRPushX = WRITE | LIST | FAST;
constexpr uint32_t kRPop = WRITE | LIST | FAST;
@ -1467,6 +1620,7 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kLPush}.HFUNC(LPush)
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kLPushX}.HFUNC(LPushX)
<< CI{"LPOP", CO::WRITE | CO::FAST, -2, 1, 1, acl::kLPop}.HFUNC(LPop)
<< CI{"LMPOP", CO::WRITE | CO::SLOW | CO::VARIADIC_KEYS, -4, 2, 2, acl::kLMPop}.HFUNC(LMPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kRPush}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, acl::kRPushX}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST, -2, 1, 1, acl::kRPop}.HFUNC(RPop)

View file

@ -33,6 +33,7 @@ class ListFamily {
static void RPop(CmdArgList args, const CommandContext& cmd_cntx);
static void BLPop(CmdArgList args, const CommandContext& cmd_cntx);
static void BRPop(CmdArgList args, const CommandContext& cmd_cntx);
static void LMPop(CmdArgList args, const CommandContext& cmd_cntx);
static void LLen(CmdArgList args, const CommandContext& cmd_cntx);
static void LPos(CmdArgList args, const CommandContext& cmd_cntx);
static void LIndex(CmdArgList args, const CommandContext& cmd_cntx);

View file

@ -1086,5 +1086,226 @@ TEST_F(ListFamilyTest, ContendExpire) {
}
}
TEST_F(ListFamilyTest, LMPopInvalidSyntax) {
// Not enough arguments
auto resp = Run({"lmpop", "1", "a"});
EXPECT_THAT(resp, ErrArg("wrong number of arguments"));
// Zero keys
resp = Run({"lmpop", "0", "LEFT", "COUNT", "1"});
EXPECT_THAT(resp, ErrArg("syntax error"));
// Number of keys is not uint
resp = Run({"lmpop", "aa", "a", "LEFT"});
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
// Missing LEFT/RIGHT
resp = Run({"lmpop", "1", "a", "COUNT", "1"});
EXPECT_THAT(resp, ErrArg("syntax error"));
// Wrong number of keys
resp = Run({"lmpop", "1", "a", "b", "LEFT"});
EXPECT_THAT(resp, ErrArg("syntax error"));
// COUNT without number
resp = Run({"lmpop", "1", "a", "LEFT", "COUNT"});
EXPECT_THAT(resp, ErrArg("syntax error"));
// COUNT is not uint
resp = Run({"lmpop", "1", "a", "LEFT", "COUNT", "boo"});
EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
// Too many arguments
resp = Run({"lmpop", "1", "c", "LEFT", "COUNT", "2", "foo"});
EXPECT_THAT(resp, ErrArg("syntax error"));
}
TEST_F(ListFamilyTest, LMPop) {
// All lists are empty
auto resp = Run({"lmpop", "1", "e", "LEFT"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
// LEFT operation
resp = Run({"lpush", "a", "a1", "a2"});
EXPECT_THAT(resp, IntArg(2));
resp = Run({"lmpop", "1", "a", "LEFT"});
EXPECT_THAT(resp, RespArray(ElementsAre("a", RespArray(ElementsAre("a2")))));
// RIGHT operation
resp = Run({"lpush", "b", "b1", "b2"});
EXPECT_THAT(resp, IntArg(2));
resp = Run({"lmpop", "1", "b", "RIGHT"});
EXPECT_THAT(resp, RespArray(ElementsAre("b", RespArray(ElementsAre("b1")))));
// COUNT > 1
resp = Run({"lpush", "c", "c1", "c2"});
EXPECT_THAT(resp, IntArg(2));
resp = Run({"lmpop", "1", "c", "RIGHT", "COUNT", "2"});
EXPECT_THAT(resp, RespArray(ElementsAre("c", RespArray(ElementsAre("c1", "c2")))));
resp = Run({"llen", "c"});
EXPECT_THAT(resp, IntArg(0));
// COUNT > number of elements in list
resp = Run({"lpush", "d", "d1", "d2"});
EXPECT_THAT(resp, IntArg(2));
resp = Run({"lmpop", "1", "d", "RIGHT", "COUNT", "3"});
EXPECT_THAT(resp, RespArray(ElementsAre("d", RespArray(ElementsAre("d1", "d2")))));
resp = Run({"llen", "d"});
EXPECT_THAT(resp, IntArg(0));
// First non-empty list is not the first list
resp = Run({"lpush", "x", "x1"});
EXPECT_THAT(resp, IntArg(1));
resp = Run({"lpush", "y", "y1"});
EXPECT_THAT(resp, IntArg(1));
resp = Run({"lmpop", "3", "empty", "x", "y", "RIGHT"});
EXPECT_THAT(resp, RespArray(ElementsAre("x", RespArray(ElementsAre("x1")))));
resp = Run({"llen", "x"});
EXPECT_THAT(resp, IntArg(0));
}
TEST_F(ListFamilyTest, LMPopMultipleElements) {
// Test removing multiple elements from left end
Run({"rpush", "list1", "a", "b", "c", "d", "e"});
auto resp = Run({"lmpop", "1", "list1", "LEFT", "COUNT", "3"});
EXPECT_THAT(resp, RespArray(ElementsAre("list1", RespArray(ElementsAre("a", "b", "c")))));
resp = Run({"lrange", "list1", "0", "-1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("d", "e"));
// Test removing multiple elements from right end
Run({"rpush", "list2", "v", "w", "x", "y", "z"});
resp = Run({"lmpop", "1", "list2", "RIGHT", "COUNT", "2"});
EXPECT_THAT(resp, RespArray(ElementsAre("list2", RespArray(ElementsAre("z", "y")))));
resp = Run({"lrange", "list2", "0", "-1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("v", "w", "x"));
}
TEST_F(ListFamilyTest, LMPopMultipleLists) {
// Test finding first non-empty list
Run({"rpush", "list1", "a", "b"});
Run({"rpush", "list2", "c", "d"});
Run({"rpush", "list3", "e", "f"});
// Pop from first non-empty list
auto resp = Run({"lmpop", "3", "list1", "list2", "list3", "LEFT"});
EXPECT_THAT(resp, RespArray(ElementsAre("list1", RespArray(ElementsAre("a")))));
// Pop from second list after first becomes empty
Run({"lmpop", "1", "list1", "LEFT"}); // Empty list1
resp = Run({"lmpop", "3", "list1", "list2", "list3", "RIGHT", "COUNT", "2"});
EXPECT_THAT(resp, RespArray(ElementsAre("list2", RespArray(ElementsAre("d", "c")))));
// Verify third list remains untouched
resp = Run({"lrange", "list3", "0", "-1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("e", "f"));
}
TEST_F(ListFamilyTest, LMPopEdgeCases) {
// Test with empty list
Run({"rpush", "empty_list", "a"});
Run({"lpop", "empty_list"});
auto resp = Run({"lmpop", "1", "empty_list", "LEFT"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
// Test with non-existent list
resp = Run({"lmpop", "1", "nonexistent", "LEFT"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
// Test with wrong type key
Run({"set", "string_key", "value"});
resp = Run({"lmpop", "1", "string_key", "LEFT"});
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
// Test without COUNT parameter - should return 1 element by default
Run({"rpush", "list", "a", "b"});
resp = Run({"lmpop", "1", "list", "LEFT"});
EXPECT_THAT(resp,
RespArray(ElementsAre(
"list", RespArray(ElementsAre("a"))))); // Should return 1 element by default
// Test with COUNT = 0 - should return error
resp = Run({"lmpop", "1", "list", "LEFT", "COUNT", "0"});
EXPECT_THAT(resp, RespArray(ElementsAre("list", RespArray(ElementsAre()))));
// Test with negative COUNT - should return error
resp = Run({"lmpop", "1", "list", "LEFT", "COUNT", "-1"});
EXPECT_THAT(resp, RespArray(ElementsAre("list", RespArray(ElementsAre("b")))));
}
TEST_F(ListFamilyTest, LMPopDocExample) {
// Try to pop from non-existing lists
auto resp = Run({"LMPOP", "2", "non1", "non2", "LEFT", "COUNT", "10"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
// Create first list and test basic pop
resp = Run({"LPUSH", "mylist", "one", "two", "three", "four", "five"});
EXPECT_THAT(resp, IntArg(5));
resp = Run({"LMPOP", "1", "mylist", "LEFT"});
EXPECT_THAT(resp, RespArray(ElementsAre("mylist", RespArray(ElementsAre("five")))));
resp = Run({"LRANGE", "mylist", "0", "-1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("four", "three", "two", "one"));
// Test RIGHT pop with COUNT
resp = Run({"LMPOP", "1", "mylist", "RIGHT", "COUNT", "10"});
EXPECT_THAT(resp, RespArray(ElementsAre("mylist",
RespArray(ElementsAre("one", "two", "three", "four")))));
// Create two lists and test multi-key pop
resp = Run({"LPUSH", "mylist", "one", "two", "three", "four", "five"});
EXPECT_THAT(resp, IntArg(5));
resp = Run({"LPUSH", "mylist2", "a", "b", "c", "d", "e"});
EXPECT_THAT(resp, IntArg(5));
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "3"});
EXPECT_THAT(resp,
RespArray(ElementsAre("mylist", RespArray(ElementsAre("one", "two", "three")))));
resp = Run({"LRANGE", "mylist", "0", "-1"});
EXPECT_THAT(resp.GetVec(), ElementsAre("five", "four"));
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "5"});
EXPECT_THAT(resp, RespArray(ElementsAre("mylist", RespArray(ElementsAre("four", "five")))));
resp = Run({"LMPOP", "2", "mylist", "mylist2", "RIGHT", "COUNT", "10"});
EXPECT_THAT(resp,
RespArray(ElementsAre("mylist2", RespArray(ElementsAre("a", "b", "c", "d", "e")))));
// Verify both lists are now empty
resp = Run({"EXISTS", "mylist", "mylist2"});
EXPECT_THAT(resp, IntArg(0));
}
TEST_F(ListFamilyTest, LMPopWrongType) {
// Setup: create a list and a hash
Run({"lpush", "l1", "e1"});
Run({"hset", "foo", "k1", "v1"});
// Test: first key is wrong type
auto resp = Run({"lmpop", "2", "foo", "l1", "left"});
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
// Test: second key is wrong type but first doesn't exist
resp = Run({"lmpop", "2", "nonexistent", "foo", "left"});
EXPECT_THAT(resp, ErrArg("WRONGTYPE Operation against a key holding the wrong kind of value"));
// Test: second key is wrong type but first is a valid list
resp = Run({"lmpop", "2", "l1", "foo", "left"});
EXPECT_THAT(resp, RespArray(ElementsAre("l1", RespArray(ElementsAre("e1")))));
}
#pragma GCC diagnostic pop
} // namespace dfly