chore: Implement LMOVE over QList (#4104)

* chore: Implement LMOVE over QList

All tests in list_family_test besides LTrim pass.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-10 15:47:01 +02:00 committed by GitHub
parent 1eef773d0a
commit f745f3133d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 84 additions and 35 deletions

View file

@ -84,6 +84,10 @@ void* listPopSaver(unsigned char* data, size_t sz) {
return new string((char*)data, sz);
}
QList::Where ToWhere(ListDir dir) {
return dir == ListDir::LEFT ? QList::HEAD : QList::TAIL;
}
enum InsertParam { INSERT_BEFORE, INSERT_AFTER };
string ListPop(ListDir dir, quicklist* ql) {
@ -166,7 +170,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis
len = quicklistCount(ql);
} else {
QList* ql = GetQLV2(it->second);
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
value = ql->Pop(where);
len = ql->Size();
}
@ -195,19 +199,37 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
return src_res.status();
auto src_it = src_res->it;
quicklist* src_ql = GetQL(src_it->second);
quicklist* src_ql = nullptr;
QList* srcql_v2 = nullptr;
quicklist* dest_ql = nullptr;
QList* destql_v2 = nullptr;
string val;
size_t prev_len = 0;
if (src_it->second.Encoding() == OBJ_ENCODING_QUICKLIST) {
src_ql = GetQL(src_it->second);
prev_len = quicklistCount(src_ql);
} else {
DCHECK_EQ(src_it->second.Encoding(), kEncodingQL2);
srcql_v2 = GetQLV2(src_it->second);
prev_len = srcql_v2->Size();
}
if (src == dest) { // simple case.
string val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(src_ql, val.data(), val.size(), pos);
if (src_ql) {
val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(src_ql, val.data(), val.size(), pos);
} else {
val = srcql_v2->Pop(ToWhere(src_dir));
srcql_v2->Push(val, ToWhere(dest_dir));
}
return val;
}
quicklist* dest_ql = nullptr;
src_res->post_updater.Run();
auto op_res = db_slice.AddOrFind(op_args.db_cntx, dest);
RETURN_ON_BAD_STATUS(op_res);
auto& dest_res = *op_res;
@ -217,26 +239,43 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
src_it = src_res->it;
if (dest_res.is_new) {
dest_ql = quicklistCreate();
quicklistSetOptions(dest_ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, dest_ql);
DCHECK(IsValid(src_it));
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
destql_v2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, kEncodingQL2, destql_v2);
} else {
dest_ql = quicklistCreate();
quicklistSetOptions(dest_ql, GetFlag(FLAGS_list_max_listpack_size),
GetFlag(FLAGS_list_compress_depth));
dest_res.it->second.InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, dest_ql);
}
} else {
if (dest_res.it->second.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
dest_ql = GetQL(dest_res.it->second);
if (dest_res.it->second.Encoding() == kEncodingQL2) {
destql_v2 = GetQLV2(dest_res.it->second);
} else {
DCHECK_EQ(dest_res.it->second.Encoding(), OBJ_ENCODING_QUICKLIST);
dest_ql = GetQL(dest_res.it->second);
}
}
string val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(dest_ql, val.data(), val.size(), pos);
if (src_ql) {
DCHECK(dest_ql);
val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(dest_ql, val.data(), val.size(), pos);
} else {
DCHECK(srcql_v2);
DCHECK(destql_v2);
val = srcql_v2->Pop(ToWhere(src_dir));
destql_v2->Push(val, ToWhere(dest_dir));
}
src_res->post_updater.Run();
dest_res.post_updater.Run();
if (quicklistCount(src_ql) == 0) {
if (prev_len == 1) {
CHECK(db_slice.Del(op_args.db_cntx, src_it));
}
@ -254,17 +293,28 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
if (!fetch)
return OpStatus::OK;
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD)
: quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);
const PrimeValue& pv = it_res.value()->second;
DCHECK_GT(pv.Size(), 0u); // should be not-empty.
if (entry.value)
return string(reinterpret_cast<char*>(entry.value), entry.sz);
else
return absl::StrCat(entry.longval);
if (pv.Encoding() == OBJ_ENCODING_QUICKLIST) {
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter =
quicklistGetIterator(ql, (dir == ListDir::LEFT) ? AL_START_HEAD : AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);
return (entry.value) ? string(reinterpret_cast<char*>(entry.value), entry.sz)
: absl::StrCat(entry.longval);
}
DCHECK_EQ(pv.Encoding(), kEncodingQL2);
QList* ql = GetQLV2(pv);
auto it = ql->GetIterator(ToWhere(dir));
CHECK(it.Next());
return it.Get().to_string();
}
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
@ -319,7 +369,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
}
len = quicklistCount(ql);
} else {
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
for (string_view v : vals) {
ql_v2->Push(v, where);
}
@ -373,10 +423,11 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
res.reserve(count);
}
QList::Where where = (dir == ListDir::LEFT) ? QList::HEAD : QList::TAIL;
QList::Where where = ToWhere(dir);
for (unsigned i = 0; i < count; ++i) {
string val = ql->Pop(where);
if (return_results) {
res.push_back(ql->Pop(where));
res.push_back(std::move(val));
}
}
} else {
@ -625,10 +676,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, string_view ele
auto it = ql->GetIterator(where);
auto is_match = [&](const QList::Entry& entry) {
if (is_int) {
return entry.is_int() && entry.ival() == ival;
}
return entry == elem;
return is_int ? entry.is_int() && entry.ival() == ival : entry == elem;
};
while (it.Next()) {

View file

@ -533,6 +533,7 @@ TEST_F(ListFamilyTest, LMove) {
resp = Run({"lmove", kKey1, kKey2, "LEFT", "RIGHT"});
ASSERT_THAT(resp, "1");
ASSERT_THAT(Run({"llen", kKey1}), IntArg(4));
resp = Run({"lmove", kKey1, kKey2, "LEFT", "LEFT"});
ASSERT_THAT(resp, "2");