diff --git a/docs/differences.md b/docs/differences.md index 46fbbdab4..b6b6acd24 100644 --- a/docs/differences.md +++ b/docs/differences.md @@ -6,6 +6,10 @@ String sizes are limited to 256MB. Indices (say in GETRANGE and SETRANGE commands) should be signed 32 bit integers in range [-2147483647, 2147483648]. +### String handling. + +SORT does not take any locale into account. + ## Expiry ranges. Expirations are limited to 4 years. For commands with millisecond precision like PEXPIRE or PSETEX, expirations greater than 2^27ms are quietly rounded to the nearest second loosing precision of less than 0.001%. diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index c774d1b64..cefddf899 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -262,6 +262,16 @@ size_t RobjWrapper::Size() const { case OBJ_STRING: DCHECK_EQ(OBJ_ENCODING_RAW, encoding_); return sz_; + case OBJ_LIST: + return quicklistCount((quicklist*)inner_obj_); + case OBJ_ZSET: { + robj self{.type = type_, + .encoding = encoding_, + .lru = 0, + .refcount = OBJ_STATIC_REFCOUNT, + .ptr = inner_obj_}; + return zsetLength(&self); + } case OBJ_SET: switch (encoding_) { case kEncodingIntSet: { @@ -278,7 +288,7 @@ size_t RobjWrapper::Size() const { } default: LOG(FATAL) << "Unexpected encoding " << encoding_; - } + }; default:; } return 0; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index ab54931c6..b031c6aee 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -299,6 +299,10 @@ void RedisReplyBuilder::SendNullArray() { SendRaw("*-1\r\n"); } +void RedisReplyBuilder::SendEmptyArray() { + StartArray(0); +} + void RedisReplyBuilder::SendStringArr(absl::Span arr) { if (arr.empty()) { SendRaw("*0\r\n"); diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 60fba486d..0bfe2aa93 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -128,7 +128,10 @@ class RedisReplyBuilder : public SinkReplyBuilder { void SendError(OpStatus status); virtual void SendSimpleStrArr(const std::string_view* arr, uint32_t count); + // Send *-1 virtual void SendNullArray(); + // Send *0 + virtual void SendEmptyArray(); virtual void SendStringArr(absl::Span arr); virtual void SendStringArr(absl::Span arr); diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9fd26161a..a5b5cb8ab 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -12,7 +12,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc - zset_family.cc version.cc bitops_family.cc) + zset_family.cc version.cc bitops_family.cc container_utils.cc) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib absl::random_random TRDP::jsoncons) diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc new file mode 100644 index 000000000..21febb123 --- /dev/null +++ b/src/server/container_utils.cc @@ -0,0 +1,161 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/container_utils.h" + +#include "base/logging.h" + +extern "C" { +#include "redis/intset.h" +#include "redis/listpack.h" +#include "redis/object.h" +#include "redis/redis_aux.h" +#include "redis/util.h" +#include "redis/zset.h" +} + +namespace dfly::container_utils { + +quicklistEntry QLEntry() { + quicklistEntry res{.quicklist = NULL, + .node = NULL, + .zi = NULL, + .value = NULL, + .longval = 0, + .sz = 0, + .offset = 0}; + return res; +} + +bool IterateList(const PrimeValue& pv, const IterateFunc& func, long start, long end) { + quicklist* ql = static_cast(pv.RObjPtr()); + long llen = quicklistCount(ql); + if (end < 0 || end >= llen) + end = llen - 1; + + quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start); + quicklistEntry entry = QLEntry(); + long lrange = end - start + 1; + + bool success = true; + while (success && quicklistNext(qiter, &entry) && lrange-- > 0) { + if (entry.value) { + success = func(ContainerEntry{reinterpret_cast(entry.value), entry.sz}); + } else { + success = func(ContainerEntry{nullptr, .longval=entry.longval}); + } + } + quicklistReleaseIterator(qiter); + return success; +} + +bool IterateSet(const PrimeValue& pv, const IterateFunc& func) { + bool success = true; + if (pv.Encoding() == kEncodingIntSet) { + intset* is = static_cast(pv.RObjPtr()); + int64_t ival; + int ii = 0; + + while (success && intsetGet(is, ii++, &ival)) { + success = func(ContainerEntry{nullptr, .longval=ival}); + } + } else { + if (pv.Encoding() == kEncodingStrMap2) { + for (sds ptr : *static_cast(pv.RObjPtr())) { + if (!func(ContainerEntry{ptr, sdslen(ptr)})) { + success = false; + break; + } + } + } else { + dict* ds = static_cast(pv.RObjPtr()); + dictIterator* di = dictGetIterator(ds); + dictEntry* de = nullptr; + while (success && (de = dictNext(di))) { + sds ptr = static_cast(de->key); + success = func(ContainerEntry{ptr, sdslen(ptr)}); + } + dictReleaseIterator(di); + } + } + + return success; +} + +bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start, int32_t end, + bool reverse, bool use_score) { + unsigned long llen = zsetLength(zobj); + if (end < 0 || unsigned(end) >= llen) + end = llen - 1; + + unsigned rangelen = unsigned(end - start) + 1; + + if (zobj->encoding == OBJ_ENCODING_LISTPACK) { + uint8_t* zl = static_cast(zobj->ptr); + uint8_t *eptr, *sptr; + uint8_t* vstr; + unsigned int vlen; + long long vlong; + double score = 0.0; + + if (reverse) { + eptr = lpSeek(zl, -2 - long(2 * start)); + } else { + eptr = lpSeek(zl, 2 * start); + } + DCHECK(eptr); + + sptr = lpNext(zl, eptr); + + bool success = true; + while (success && rangelen--) { + DCHECK(eptr != NULL && sptr != NULL); + vstr = lpGetValue(eptr, &vlen, &vlong); + + // don't bother to extract the score if it's gonna be ignored. + if (use_score) + score = zzlGetScore(sptr); + + if (vstr == NULL) { + success = func(ContainerEntry{nullptr, .longval=vlong}, score); + } else { + success = func(ContainerEntry{reinterpret_cast(vstr), vlen}, score); + } + + if (reverse) { + zzlPrev(zl, &eptr, &sptr); + } else { + zzlNext(zl, &eptr, &sptr); + }; + } + return success; + } else { + CHECK_EQ(zobj->encoding, OBJ_ENCODING_SKIPLIST); + zset* zs = static_cast(zobj->ptr); + zskiplist* zsl = zs->zsl; + zskiplistNode* ln; + + /* Check if starting point is trivial, before doing log(N) lookup. */ + if (reverse) { + ln = zsl->tail; + unsigned long llen = zsetLength(zobj); + if (start > 0) + ln = zslGetElementByRank(zsl, llen - start); + } else { + ln = zsl->header->level[0].forward; + if (start > 0) + ln = zslGetElementByRank(zsl, start + 1); + } + + bool success = true; + while (success && rangelen--) { + DCHECK(ln != NULL); + success = func(ContainerEntry{ln->ele, sdslen(ln->ele)}, ln->score); + ln = reverse ? ln->backward : ln->level[0].forward; + } + return success; + } + return false; +} + +} // namespace dfly::container_utils diff --git a/src/server/container_utils.h b/src/server/container_utils.h new file mode 100644 index 000000000..d3409e7c7 --- /dev/null +++ b/src/server/container_utils.h @@ -0,0 +1,69 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include "core/compact_object.h" +#include "core/string_set.h" +#include "server/common.h" +#include "server/table.h" + +extern "C" { +#include "redis/object.h" +#include "redis/quicklist.h" +} + +#include + +namespace dfly { + +namespace container_utils { + +// IsContainer returns true if the iterator points to a container type. +inline bool IsContainer(const PrimeValue& pv) { + unsigned type = pv.ObjType(); + return (type == OBJ_LIST || type == OBJ_SET || type == OBJ_ZSET); +} + +// Create empty quicklistEntry +quicklistEntry QLEntry(); + +// Stores either: +// - A single long long value (longval) when value = nullptr +// - A single char* (value) when value != nullptr +struct ContainerEntry { + const char* value; + union { + size_t length; + long long longval; + }; + std::string ToString() { + if (value) + return {value, length}; + else + return absl::StrCat(longval); + } +}; + +using IterateFunc = std::function; +using IterateSortedFunc = std::function; + +// Iterate over all values and call func(val). Iteration stops as soon +// as func return false. Returns true if it successfully processed all elements +// without stopping. +bool IterateList(const PrimeValue& pv, const IterateFunc& func, long start = 0, long end = -1); + +// Iterate over all values and call func(val). Iteration stops as soon +// as func return false. Returns true if it successfully processed all elements +// without stopping. +bool IterateSet(const PrimeValue& pv, const IterateFunc& func); + +// Iterate over all values and call func(val). Iteration stops as soon +// as func return false. Returns true if it successfully processed all elements +// without stopping. +bool IterateSortedSet(robj* zobj, const IterateSortedFunc& func, int32_t start = 0, int32_t end = -1, + bool reverse = false, bool use_score = false); + +}; // namespace container_utils + +} // namespace dfly diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 130ddf4b7..6343d071f 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -14,6 +14,7 @@ extern "C" { #include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" +#include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" #include "server/transaction.h" @@ -208,8 +209,7 @@ bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, Strin return true; } -void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, - StringVec* vec) { +void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, StringVec* vec) { auto& db_slice = op_args.shard->db_slice(); DCHECK(db_slice.IsDbValid(op_args.db_ind)); @@ -463,6 +463,170 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendLong(match_cnt); } +// Used to conditionally store double score +struct SortEntryScore { + double score; +}; + +// SortEntry stores all data required for sorting +template +struct SortEntry + // Store score only if we need it + : public std::conditional_t, SortEntryScore> { + std::string key; + + bool Parse(std::string&& item) { + if constexpr (!ALPHA) { + if (!absl::SimpleAtod(item, &this->score)) + return false; + } + key = std::move(item); + return true; + } + + bool Parse(int64_t item) { + if constexpr (!ALPHA) { + this->score = item; + } + key = absl::StrCat(item); + return true; + } + + std::conditional_t Cmp() const { + if constexpr (ALPHA) { + return key; + } else { + return this->score; + } + } +}; + +// std::variant of all possible vectors of SortEntries +using SortEntryList = std::variant< + // Used when sorting by double values + std::vector>, + // Used when sorting by string values + std::vector>>; + +// Create SortEntryList based on runtime arguments +SortEntryList MakeSortEntryList(bool alpha) { + if (alpha) + return SortEntryList{std::vector>{}}; + else + return SortEntryList{std::vector>{}}; +} + +// Iterate over container with generic function that accepts strings and ints +template bool Iterate(const PrimeValue& pv, F&& func) { + auto cb = [&func](container_utils::ContainerEntry ce) { + if (ce.value) + return func(ce.ToString()); + else + return func(ce.longval); + }; + + switch (pv.ObjType()) { + case OBJ_LIST: + return container_utils::IterateList(pv, cb); + case OBJ_SET: + return container_utils::IterateSet(pv, cb); + case OBJ_ZSET: + return container_utils::IterateSortedSet( + pv.AsRObj(), [&cb](container_utils::ContainerEntry ce, double) { return cb(ce); }); + default: + return false; + } +} + +// Create a SortEntryList from given key +OpResult OpFetchSortEntries(const OpArgs& op_args, std::string_view key, + bool alpha) { + using namespace container_utils; + + auto [it, _] = op_args.shard->db_slice().FindExt(op_args.db_ind, key); + if (!IsValid(it) || !IsContainer(it->second)) { + return OpStatus::KEY_NOTFOUND; + } + + auto result = MakeSortEntryList(alpha); + bool success = std::visit( + [&pv = it->second](auto& entries) { + entries.reserve(pv.Size()); + return Iterate(pv, [&entries](auto&& val) { + return entries.emplace_back().Parse(std::forward(val)); + }); + }, + result); + return success ? OpResult{std::move(result)} : OpStatus::WRONG_TYPE; +} + +void GenericFamily::Sort(CmdArgList args, ConnectionContext* cntx) { + std::string_view key = ArgS(args, 1); + bool alpha = false; + bool reversed = false; + std::optional> bounds; + + for (size_t i = 2; i < args.size(); i++) { + ToUpper(&args[i]); + + std::string_view arg = ArgS(args, i); + if (arg == "ALPHA") { + alpha = true; + } else if (arg == "DESC") { + reversed = true; + } else if (arg == "LIMIT") { + int offset, limit; + if (i + 2 >= args.size()) { + return (*cntx)->SendError(kSyntaxErr); + } + if (!absl::SimpleAtoi(ArgS(args, i + 1), &offset) || + !absl::SimpleAtoi(ArgS(args, i + 2), &limit)) { + return (*cntx)->SendError(kInvalidIntErr); + } + bounds = {offset, limit}; + i += 2; + } + } + + OpResult entries = + cntx->transaction->ScheduleSingleHopT([&](Transaction* t, EngineShard* shard) { + return OpFetchSortEntries(t->GetOpArgs(shard), key, alpha); + }); + + if (entries.status() == OpStatus::WRONG_TYPE) + return (*cntx)->SendError("One or more scores can't be converted into double"); + + if (!entries.ok()) + return (*cntx)->SendEmptyArray(); + + auto sort_call = [cntx, bounds, reversed, key](auto& entries) { + if (bounds) { + auto sort_it = entries.begin() + std::min(bounds->first + bounds->second, entries.size()); + std::partial_sort(entries.begin(), sort_it, entries.end(), + [reversed](const auto& lhs, const auto& rhs) { + return bool(lhs.Cmp() < rhs.Cmp()) ^ reversed; + }); + } else { + std::sort(entries.begin(), entries.end(), [reversed](const auto& lhs, const auto& rhs) { + return bool(lhs.Cmp() < rhs.Cmp()) ^ reversed; + }); + } + + auto start_it = entries.begin(); + auto end_it = entries.end(); + if (bounds) { + start_it += std::min(bounds->first, entries.size()); + end_it = entries.begin() + std::min(bounds->first + bounds->second, entries.size()); + } + + (*cntx)->StartArray(std::distance(start_it, end_it)); + for (auto it = start_it; it != end_it; ++it) { + (*cntx)->SendBulkString(it->key); + } + }; + std::visit(std::move(sort_call), entries.value()); +} + void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); int64_t target_db; @@ -868,6 +1032,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick) + << CI{"SORT", CO::READONLY, -2, 1, 1, 1}.HFUNC(Sort) << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move); } diff --git a/src/server/generic_family.h b/src/server/generic_family.h index dd4b526f0..346e8a8eb 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -49,6 +49,7 @@ class GenericFamily { static void Keys(CmdArgList args, ConnectionContext* cntx); static void PexpireAt(CmdArgList args, ConnectionContext* cntx); static void Stick(CmdArgList args, ConnectionContext* cntx); + static void Sort(CmdArgList args, ConnectionContext* cntx); static void Move(CmdArgList args, ConnectionContext* cntx); static void Rename(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 6ac0f63f5..932be0dae 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -160,7 +160,7 @@ TEST_F(GenericFamilyTest, RenameNx) { Run({"mset", "x", x_val, "b", b_val}); ASSERT_THAT(Run({"renamenx", "z", "b"}), ErrArg("no such key")); - ASSERT_THAT(Run({"renamenx", "x", "b"}), IntArg(0)); // b already exists + ASSERT_THAT(Run({"renamenx", "x", "b"}), IntArg(0)); // b already exists ASSERT_THAT(Run({"renamenx", "x", "y"}), IntArg(1)); ASSERT_EQ(Run({"get", "y"}), x_val); } @@ -169,7 +169,7 @@ TEST_F(GenericFamilyTest, Stick) { // check stick returns zero on non-existent keys ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0)); - for (auto key: {"a", "b", "c", "d"}) { + for (auto key : {"a", "b", "c", "d"}) { Run({"set", key, "."}); } @@ -276,4 +276,59 @@ TEST_F(GenericFamilyTest, Scan) { EXPECT_THAT(vec, Each(StartsWith("zset"))); } +TEST_F(GenericFamilyTest, Sort) { + // Test list sort with params + Run({"del", "list-1"}); + Run({"lpush", "list-1", "3.5", "1.2", "10.1", "2.20", "200"}); + // numeric + ASSERT_THAT(Run({"sort", "list-1"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200")); + // string + ASSERT_THAT(Run({"sort", "list-1", "ALPHA"}).GetVec(), ElementsAre("1.2", "10.1", "2.20", "200", "3.5")); + // desc numeric + ASSERT_THAT(Run({"sort", "list-1", "DESC"}).GetVec(), ElementsAre("200", "10.1", "3.5", "2.20", "1.2")); + // desc strig + ASSERT_THAT(Run({"sort", "list-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("3.5", "200", "2.20", "10.1", "1.2")); + // limits + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "0", "5"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200")); + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "0", "10"}).GetVec(), ElementsAre("1.2", "2.20", "3.5", "10.1", "200")); + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "2", "2"}).GetVec(), ElementsAre("3.5", "10.1")); + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "1", "1"}), "2.20"); + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "4", "2"}), "200"); + ASSERT_THAT(Run({"sort", "list-1", "LIMIT", "5", "2"}), ArrLen(0)); + // limits desc + ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "0", "5"}).GetVec(), ElementsAre("200", "10.1", "3.5", "2.20", "1.2")); + ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "2", "2"}).GetVec(), ElementsAre("3.5", "2.20")); + ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "1", "1"}), "10.1"); + ASSERT_THAT(Run({"sort", "list-1", "DESC", "LIMIT", "5", "2"}), ArrLen(0)); + + // Test set sort + Run({"del", "set-1"}); + Run({"sadd", "set-1", "5.3", "4.4", "60", "99.9", "100", "9"}); + ASSERT_THAT(Run({"sort", "set-1"}).GetVec(), ElementsAre("4.4", "5.3", "9", "60", "99.9", "100")); + ASSERT_THAT(Run({"sort", "set-1", "ALPHA"}).GetVec(), ElementsAre("100", "4.4", "5.3", "60", "9", "99.9")); + ASSERT_THAT(Run({"sort", "set-1", "DESC"}).GetVec(), ElementsAre("100", "99.9", "60", "9", "5.3", "4.4")); + ASSERT_THAT(Run({"sort", "set-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("99.9", "9", "60", "5.3", "4.4", "100")); + + // Test intset sort + Run({"del", "intset-1"}); + Run({"sadd", "intset-1", "5", "4", "3", "2", "1"}); + ASSERT_THAT(Run({"sort", "intset-1"}).GetVec(), ElementsAre("1", "2", "3", "4", "5")); + + // Test sorted set sort + Run({"del", "zset-1"}); + Run({"zadd", "zset-1", "0", "3.3", "0", "30.1", "0", "8.2"}); + ASSERT_THAT(Run({"sort", "zset-1"}).GetVec(), ElementsAre("3.3", "8.2", "30.1")); + ASSERT_THAT(Run({"sort", "zset-1", "ALPHA"}).GetVec(), ElementsAre("3.3", "30.1", "8.2")); + ASSERT_THAT(Run({"sort", "zset-1", "DESC"}).GetVec(), ElementsAre("30.1", "8.2", "3.3")); + ASSERT_THAT(Run({"sort", "zset-1", "DESC", "ALPHA"}).GetVec(), ElementsAre("8.2", "30.1", "3.3")); + + // Test sort with non existent key + Run({"del", "list-2"}); + ASSERT_THAT(Run({"sort", "list-2"}), ArrLen(0)); + + // Test not convertible to double + Run({"lpush", "list-2", "NOTADOUBLE"}); + ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double")); +} + } // namespace dfly diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 08d771dd8..33d8c5aba 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -19,6 +19,7 @@ extern "C" { #include "server/error.h" #include "server/server_state.h" #include "server/transaction.h" +#include "server/container_utils.h" /** * The number of entries allowed per internal list node can be specified @@ -65,17 +66,6 @@ using absl::GetFlag; namespace { -quicklistEntry QLEntry() { - quicklistEntry res{.quicklist = NULL, - .node = NULL, - .zi = NULL, - .value = NULL, - .longval = 0, - .sz = 0, - .offset = 0}; - return res; -} - quicklist* GetQL(const PrimeValue& mv) { return (quicklist*)mv.RObjPtr(); } @@ -358,7 +348,7 @@ OpResult RPeek(const OpArgs& op_args, string_view key, bool fetch) { return OpStatus::OK; quicklist* ql = GetQL(it_res.value()->second); - quicklistEntry entry = QLEntry(); + quicklistEntry entry = container_utils::QLEntry(); quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL); CHECK(quicklistNext(iter, &entry)); quicklistReleaseIterator(iter); @@ -845,7 +835,7 @@ OpResult ListFamily::OpIndex(const OpArgs& op_args, std::string_view key if (!res) return res.status(); quicklist* ql = GetQL(res.value()->second); - quicklistEntry entry = QLEntry(); + quicklistEntry entry = container_utils::QLEntry(); quicklistIter* iter = quicklistGetIteratorAtIdx(ql, AL_START_TAIL, index); if (!iter) return OpStatus::KEY_NOTFOUND; @@ -871,7 +861,7 @@ OpResult ListFamily::OpInsert(const OpArgs& op_args, string_view key, strin return it_res.status(); quicklist* ql = GetQL(it_res.value()->second); - quicklistEntry entry = QLEntry(); + quicklistEntry entry = container_utils::QLEntry(); quicklistIter* qiter = quicklistGetIterator(ql, AL_START_HEAD); bool found = false; @@ -1030,24 +1020,12 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view return StringVec{}; } - if (end >= llen) - end = llen - 1; - - unsigned lrange = end - start + 1; - quicklistIter* qiter = quicklistGetIteratorAtIdx(ql, AL_START_HEAD, start); - quicklistEntry entry = QLEntry(); StringVec str_vec; - - unsigned cnt = 0; - while (cnt < lrange && quicklistNext(qiter, &entry)) { - if (entry.value) - str_vec.emplace_back(reinterpret_cast(entry.value), entry.sz); - else - str_vec.push_back(absl::StrCat(entry.longval)); - ++cnt; - } - quicklistReleaseIterator(qiter); - + container_utils::IterateList(res.value()->second, [&str_vec](container_utils::ContainerEntry ce) { + str_vec.emplace_back(ce.ToString()); + return true; + }, start, end); + return str_vec; } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index acf8226a6..d80e7c3dc 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1439,7 +1439,7 @@ void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) { string_view sub_cmd = ArgS(args, 1); if (sub_cmd == "LATEST") { - return (*cntx)->StartArray(0); + return (*cntx)->SendEmptyArray(); } LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; diff --git a/src/server/set_family.cc b/src/server/set_family.cc index cd36b6c70..5411fefe4 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -21,6 +21,8 @@ extern "C" { #include "server/error.h" #include "server/transaction.h" +#include "server/container_utils.h" + ABSL_DECLARE_FLAG(bool, use_set2); namespace dfly { @@ -146,27 +148,6 @@ void InitStrSet(CompactObj* set) { } } -// f receives a str object. -template void FillFromStrSet(F&& f, void* ptr) { - string str; - if (GetFlag(FLAGS_use_set2)) { - for (sds ptr : *(StringSet*)ptr) { - str.assign(ptr, sdslen(ptr)); - f(move(str)); - } - } else { - dict* ds = (dict*)ptr; - - dictIterator* di = dictGetIterator(ds); - dictEntry* de = nullptr; - while ((de = dictNext(di))) { - str.assign((sds)de->key, sdslen((sds)de->key)); - f(move(str)); - } - dictReleaseIterator(di); - } -} - // returns (removed, isempty) pair RemoveSet(ArgSlice vals, CompactObj* set) { bool isempty = false; @@ -493,22 +474,6 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) { return OpStatus::OK; }; -template void FillSet(const SetType& set, F&& f) { - if (set.second == kEncodingIntSet) { - intset* is = (intset*)set.first; - int64_t ival; - int ii = 0; - char buf[32]; - - while (intsetGet(is, ii++, &ival)) { - char* next = absl::numbers_internal::FastIntToBuffer(ival, buf); - f(string{buf, size_t(next - buf)}); - } - } else { - FillFromStrSet(move(f), set.first); - } -} - // if overwrite is true then OpAdd writes vals into the key and discards its previous value. OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite) { @@ -713,8 +678,10 @@ OpResult OpUnion(const OpArgs& op_args, ArgSlice keys) { for (std::string_view key : keys) { OpResult find_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_SET); if (find_res) { - SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; - FillSet(st, [&uniques](string s) { uniques.emplace(move(s)); }); + container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce){ + uniques.emplace(ce.ToString()); + return true; + }); continue; } @@ -738,9 +705,10 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { } absl::flat_hash_set uniques; - SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; - - FillSet(st, [&uniques](string s) { uniques.insert(move(s)); }); + container_utils::IterateSet(find_res.value()->second, [&uniques](container_utils::ContainerEntry ce) { + uniques.emplace(ce.ToString()); + return true; + }); DCHECK(!uniques.empty()); // otherwise the key would not exist. @@ -786,9 +754,10 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f if (!find_res) return find_res.status(); - SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; - - FillSet(st, [&result](string s) { result.push_back(move(s)); }); + container_utils::IterateSet(find_res.value()->second, [&result](container_utils::ContainerEntry ce) { + result.push_back(ce.ToString()); + return true; + }); return result; } @@ -1261,16 +1230,20 @@ OpResult SetFamily::OpPop(const OpArgs& op_args, std::string_view key PrimeIterator it = find_res.value(); size_t slen = it->second.Size(); - SetType st{it->second.RObjPtr(), it->second.Encoding()}; /* CASE 1: * The number of requested elements is greater than or equal to * the number of elements inside the set: simply return the whole set. */ if (count >= slen) { - FillSet(st, [&result](string s) { result.push_back(move(s)); }); + container_utils::IterateSet(it->second, [&result](container_utils::ContainerEntry ce) { + result.push_back(ce.ToString()); + return true; + }); + /* Delete the set as it is now empty */ CHECK(db_slice.Del(op_args.db_ind, it)); } else { + SetType st{it->second.RObjPtr(), it->second.Encoding()}; db_slice.PreUpdate(op_args.db_ind, it); if (st.second == kEncodingIntSet) { intset* is = (intset*)st.first; diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index be9d6aafe..49c65ea6d 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -847,7 +847,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext } if (result.status() == OpStatus::KEY_NOTFOUND) { - return (*cntx)->StartArray(0); + return (*cntx)->SendEmptyArray(); } return (*cntx)->SendError(result.status()); } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index d57b4de35..4a4c1235f 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -18,6 +18,7 @@ extern "C" { #include "server/conn_context.h" #include "server/engine_shard_set.h" #include "server/transaction.h" +#include "server/container_utils.h" namespace dfly { @@ -244,60 +245,10 @@ void IntervalVisitor::operator()(const ZSetFamily::LexInterval& li) { } void IntervalVisitor::ActionRange(unsigned start, unsigned end) { - unsigned rangelen = (end - start) + 1; - - if (zobj_->encoding == OBJ_ENCODING_LISTPACK) { - uint8_t* zl = (uint8_t*)zobj_->ptr; - uint8_t *eptr, *sptr; - uint8_t* vstr; - unsigned int vlen; - long long vlong; - double score = 0.0; - - if (params_.reverse) - eptr = lpSeek(zl, -2 - long(2 * start)); - else - eptr = lpSeek(zl, 2 * start); - DCHECK(eptr); - - sptr = lpNext(zl, eptr); - - while (rangelen--) { - DCHECK(eptr != NULL && sptr != NULL); - vstr = lpGetValue(eptr, &vlen, &vlong); - - if (params_.with_scores) /* don't bother to extract the score if it's gonna be ignored. */ - score = zzlGetScore(sptr); - - AddResult(vstr, vlen, vlong, score); - - Next(zl, &eptr, &sptr); - } - } else { - CHECK_EQ(zobj_->encoding, OBJ_ENCODING_SKIPLIST); - zset* zs = (zset*)zobj_->ptr; - zskiplist* zsl = zs->zsl; - zskiplistNode* ln; - - /* Check if starting point is trivial, before doing log(N) lookup. */ - if (params_.reverse) { - ln = zsl->tail; - unsigned long llen = zsetLength(zobj_); - if (start > 0) - ln = zslGetElementByRank(zsl, llen - start); - } else { - ln = zsl->header->level[0].forward; - if (start > 0) - ln = zslGetElementByRank(zsl, start + 1); - } - - while (rangelen--) { - DCHECK(ln != NULL); - sds ele = ln->ele; - result_.emplace_back(string(ele, sdslen(ele)), ln->score); - ln = Next(ln); - } - } + container_utils::IterateSortedSet(zobj_, [this](container_utils::ContainerEntry ce, double score){ + result_.emplace_back(ce.ToString(), score); + return true; + }, start, end, params_.reverse, params_.with_scores); } void IntervalVisitor::ActionRange(const zrangespec& range) {