From beb2ec2b2950f27e41f6bdfcacba0f4eba27680d Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 1 Oct 2024 15:29:22 +0300 Subject: [PATCH] chore: WrapSds from family_utils.h (#3818) * chore: WrapSds from family_utils.h --------- Signed-off-by: Roman Gershman Signed-off-by: Kostas Kyrimis Co-authored-by: Roman Gershman Co-authored-by: Kostas Kyrimis --- src/server/CMakeLists.txt | 2 +- src/server/common.cc | 31 ------- src/server/common.h | 44 ---------- src/server/engine_shard.cc | 6 -- src/server/engine_shard.h | 6 -- src/server/family_utils.cc | 43 ++++++++++ src/server/family_utils.h | 63 ++++++++++++++ src/server/list_family.cc | 6 +- src/server/set_family.cc | 2 + src/server/stream_family.cc | 158 ++++++++++++------------------------ src/server/zset_family.cc | 31 +++---- 11 files changed, 176 insertions(+), 216 deletions(-) create mode 100644 src/server/family_utils.cc create mode 100644 src/server/family_utils.h diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 0efa495b2..433b76724 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -45,7 +45,7 @@ endif() add_library(dragonfly_lib bloom_family.cc config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc engine_shard.cc - engine_shard_set.cc + engine_shard_set.cc family_utils.cc generic_family.cc hset_family.cc http_api.cc json_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc protocol_client.cc diff --git a/src/server/common.cc b/src/server/common.cc index 137872f7e..b9fe3bfc5 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -423,37 +423,6 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state) { return os << GlobalStateName(state); } -NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) { - CHECK_GT(max_range, RandomPick(0)); -} - -RandomPick NonUniquePicksGenerator::Generate() { - return absl::Uniform(bitgen_, 0u, max_range_); -} - -UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range) - : remaining_picks_count_(picks_count), picked_indexes_(picks_count) { - CHECK_GE(max_range, picks_count); - current_random_limit_ = max_range - picks_count; -} - -RandomPick UniquePicksGenerator::Generate() { - DCHECK_GT(remaining_picks_count_, 0u); - - remaining_picks_count_--; - - const RandomPick max_index = current_random_limit_++; - const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u); - - const bool random_index_is_picked = picked_indexes_.emplace(random_index).second; - if (random_index_is_picked) { - return random_index; - } - - picked_indexes_.insert(max_index); - return max_index; -} - ThreadLocalMutex::ThreadLocalMutex() { shard_ = EngineShard::tlocal(); } diff --git a/src/server/common.h b/src/server/common.h index a509b57b9..957d52994 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -4,7 +4,6 @@ #pragma once -#include #include #include #include @@ -318,49 +317,6 @@ struct MemoryBytesFlag { bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err); std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag); -using RandomPick = std::uint32_t; - -class PicksGenerator { - public: - virtual RandomPick Generate() = 0; - virtual ~PicksGenerator() = default; -}; - -class NonUniquePicksGenerator : public PicksGenerator { - public: - /* The generated value will be within the closed-open interval [0, max_range) */ - NonUniquePicksGenerator(RandomPick max_range); - - RandomPick Generate() override; - - private: - const RandomPick max_range_; - absl::BitGen bitgen_{}; -}; - -/* - * Generates unique index in O(1). - * - * picks_count specifies the number of random indexes to be generated. - * In other words, this is the number of times the Generate() function is called. - * - * The class uses Robert Floyd's sampling algorithm - * https://dl.acm.org/doi/pdf/10.1145/30401.315746 - * */ -class UniquePicksGenerator : public PicksGenerator { - public: - /* The generated value will be within the closed-open interval [0, max_range) */ - UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range); - - RandomPick Generate() override; - - private: - RandomPick current_random_limit_; - std::uint32_t remaining_picks_count_; - std::unordered_set picked_indexes_; - absl::BitGen bitgen_{}; -}; - // Helper class used to guarantee atomicity between serialization of buckets class ABSL_LOCKABLE ThreadLocalMutex { public: diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index a10292828..7353cacd9 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -376,17 +376,11 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), shard_id_(pb->GetPoolIndex()) { - tmp_str1 = sdsempty(); - defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); queue_.Start(absl::StrCat("shard_queue_", shard_id())); queue2_.Start(absl::StrCat("l2_queue_", shard_id())); } -EngineShard::~EngineShard() { - sdsfree(tmp_str1); -} - void EngineShard::Shutdown() { DVLOG(1) << "EngineShard::Shutdown"; diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 18af18792..262914b2c 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -41,9 +41,6 @@ class EngineShard { Stats& operator+=(const Stats&); }; - // EngineShard() is private down below. - ~EngineShard(); - // Sets up a new EngineShard in the thread. // If update_db_time is true, initializes periodic time update for its db_slice. static void InitThreadLocal(util::ProactorBase* pb); @@ -122,9 +119,6 @@ class EngineShard { return shard_search_indices_.get(); } - // for everyone to use for string transformations during atomic cpu sequences. - sds tmp_str1; - // Moving average counters. enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL }; diff --git a/src/server/family_utils.cc b/src/server/family_utils.cc new file mode 100644 index 000000000..066447334 --- /dev/null +++ b/src/server/family_utils.cc @@ -0,0 +1,43 @@ +#include "server/family_utils.h" + +#include "base/logging.h" + +namespace dfly { + +sds WrapSds(std::string_view s) { + static thread_local sds tmp_sds = sdsempty(); + return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length()); +} + +NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) { + CHECK_GT(max_range, RandomPick(0)); +} + +RandomPick NonUniquePicksGenerator::Generate() { + return absl::Uniform(bitgen_, 0u, max_range_); +} + +UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range) + : remaining_picks_count_(picks_count), picked_indexes_(picks_count) { + CHECK_GE(max_range, picks_count); + current_random_limit_ = max_range - picks_count; +} + +RandomPick UniquePicksGenerator::Generate() { + DCHECK_GT(remaining_picks_count_, 0u); + + remaining_picks_count_--; + + const RandomPick max_index = current_random_limit_++; + const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u); + + const bool random_index_is_picked = picked_indexes_.emplace(random_index).second; + if (random_index_is_picked) { + return random_index; + } + + picked_indexes_.insert(max_index); + return max_index; +} + +} // namespace dfly diff --git a/src/server/family_utils.h b/src/server/family_utils.h new file mode 100644 index 000000000..2fdd9920a --- /dev/null +++ b/src/server/family_utils.h @@ -0,0 +1,63 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +#include + +extern "C" { +#include "redis/sds.h" +} +namespace dfly { + +// Copy str to thread local sds instance. Valid until next WrapSds call on thread +sds WrapSds(std::string_view str); + +using RandomPick = uint32_t; + +class PicksGenerator { + public: + virtual RandomPick Generate() = 0; + virtual ~PicksGenerator() = default; +}; + +class NonUniquePicksGenerator : public PicksGenerator { + public: + /* The generated value will be within the closed-open interval [0, max_range) */ + NonUniquePicksGenerator(RandomPick max_range); + + RandomPick Generate() override; + + private: + const RandomPick max_range_; + absl::BitGen bitgen_{}; +}; + +/* + * Generates unique index in O(1). + * + * picks_count specifies the number of random indexes to be generated. + * In other words, this is the number of times the Generate() function is called. + * + * The class uses Robert Floyd's sampling algorithm + * https://dl.acm.org/doi/pdf/10.1145/30401.315746 + * */ +class UniquePicksGenerator : public PicksGenerator { + public: + /* The generated value will be within the closed-open interval [0, max_range) */ + UniquePicksGenerator(uint32_t picks_count, RandomPick max_range); + + RandomPick Generate() override; + + private: + RandomPick current_random_limit_; + uint32_t remaining_picks_count_; + absl::flat_hash_set picked_indexes_; + absl::BitGen bitgen_{}; +}; + +} // namespace dfly diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 2f2ffb216..99b3e75aa 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -20,7 +20,7 @@ extern "C" { #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/server_state.h" +#include "server/family_utils.h" #include "server/transaction.h" /** @@ -283,8 +283,8 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL; for (string_view v : vals) { - es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size()); - quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); + auto vsds = WrapSds(v); + quicklistPush(ql, vsds, sdslen(vsds), pos); } if (res.is_new) { diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 3ac04d0dd..77db27654 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -4,6 +4,8 @@ #include "server/set_family.h" +#include "server/family_utils.h" + extern "C" { #include "redis/intset.h" #include "redis/redis_aux.h" diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 0ea4c02bd..a0503f8b1 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -19,7 +19,7 @@ extern "C" { #include "server/conn_context.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/server_state.h" +#include "server/family_utils.h" #include "server/transaction.h" namespace dfly { @@ -1055,8 +1055,7 @@ OpResult> OpConsumers(const DbContext& db_cntx, EngineShard vector result; const CompactObj& cobj = (*res_it)->second; stream* s = GetReadOnlyStream(cobj); - shard->tmp_str1 = sdscpylen(shard->tmp_str1, group_name.data(), group_name.length()); - streamCG* cg = streamLookupCG(s, shard->tmp_str1); + streamCG* cg = streamLookupCG(s, WrapSds(group_name)); if (cg == NULL) { return OpStatus::INVALID_VALUE; } @@ -1136,21 +1135,19 @@ struct FindGroupResult { DbSlice::AutoUpdater post_updater; }; -OpResult FindGroup(const OpArgs& op_args, string_view key, string_view gname) { - auto* shard = op_args.shard; +OpResult FindGroup(const OpArgs& op_args, string_view key, string_view gname, + bool skip_group = true) { auto& db_slice = op_args.GetDbSlice(); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); - if (!res_it) - return res_it.status(); + RETURN_ON_BAD_STATUS(res_it); CompactObj& cobj = res_it->it->second; - FindGroupResult res; - res.s = (stream*)cobj.RObjPtr(); - shard->tmp_str1 = sdscpylen(shard->tmp_str1, gname.data(), gname.size()); - res.cg = streamLookupCG(res.s, shard->tmp_str1); - res.post_updater = std::move(res_it->post_updater); + auto* s = static_cast(cobj.RObjPtr()); + auto* cg = streamLookupCG(s, WrapSds(gname)); + if (skip_group && !cg) + return OpStatus::SKIPPED; - return res; + return FindGroupResult{s, cg, std::move(res_it->post_updater)}; } constexpr uint8_t kClaimForce = 1 << 0; @@ -1210,11 +1207,8 @@ void AppendClaimResultItem(ClaimInfo& result, stream* s, streamID id) { OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts, absl::Span ids) { auto cgr_res = FindGroup(op_args, key, opts.group); - if (!cgr_res) - return cgr_res.status(); - if (!cgr_res->cg) { - return OpStatus::SKIPPED; - } + RETURN_ON_BAD_STATUS(cgr_res); + streamConsumer* consumer = nullptr; auto now = GetCurrentTimeMs(); ClaimInfo result; @@ -1262,12 +1256,10 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO } // Try to get the consumer. If not found, create a new one. - op_args.shard->tmp_str1 = - sdscpylen(op_args.shard->tmp_str1, opts.consumer.data(), opts.consumer.size()); - if ((consumer = streamLookupConsumer(cgr_res->cg, op_args.shard->tmp_str1, SLC_NO_REFRESH)) == - nullptr) { - consumer = streamCreateConsumer(cgr_res->cg, op_args.shard->tmp_str1, nullptr, 0, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + auto cname = WrapSds(opts.consumer); + if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) { + consumer = + streamCreateConsumer(cgr_res->cg, cname, nullptr, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } // If the entry belongs to the same consumer, we don't have to @@ -1306,23 +1298,18 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO // XGROUP DESTROY key groupname OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) { auto cgr_res = FindGroup(op_args, key, gname); - if (!cgr_res) - return cgr_res.status(); + RETURN_ON_BAD_STATUS(cgr_res); - if (cgr_res->cg) { - raxRemove(cgr_res->s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL); - streamFreeCG(cgr_res->cg); + raxRemove(cgr_res->s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL); + streamFreeCG(cgr_res->cg); - // Awake readers blocked on this group - auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); - if (blocking_controller) { - blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key); - } - - return OpStatus::OK; + // Awake readers blocked on this group + auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); + if (blocking_controller) { + blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key); } - return OpStatus::SKIPPED; + return OpStatus::OK; } struct GroupConsumerPair { @@ -1339,16 +1326,10 @@ struct GroupConsumerPairOpts { OpResult OpCreateConsumer(const OpArgs& op_args, string_view key, string_view gname, string_view consumer_name) { auto cgroup_res = FindGroup(op_args, key, gname); - if (!cgroup_res) - return cgroup_res.status(); - streamCG* cg = cgroup_res->cg; - if (cg == nullptr) - return OpStatus::SKIPPED; + RETURN_ON_BAD_STATUS(cgroup_res); - auto* shard = op_args.shard; - shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size()); - streamConsumer* consumer = - streamCreateConsumer(cg, shard->tmp_str1, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + streamConsumer* consumer = streamCreateConsumer(cgroup_res->cg, WrapSds(consumer_name), NULL, 0, + SCC_NO_NOTIFY | SCC_NO_DIRTIFY); if (consumer) return OpStatus::OK; @@ -1359,21 +1340,14 @@ OpResult OpCreateConsumer(const OpArgs& op_args, string_view key, stri OpResult OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname, string_view consumer_name) { auto cgroup_res = FindGroup(op_args, key, gname); - if (!cgroup_res) - return cgroup_res.status(); - - streamCG* cg = cgroup_res->cg; - if (cg == nullptr) - return OpStatus::SKIPPED; + RETURN_ON_BAD_STATUS(cgroup_res); long long pending = 0; - auto* shard = op_args.shard; - - shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size()); - streamConsumer* consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH); + streamConsumer* consumer = + streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH); if (consumer) { pending = raxSize(consumer->pel); - streamDelConsumer(cg, consumer); + streamDelConsumer(cgroup_res->cg, consumer); } return pending; @@ -1381,12 +1355,7 @@ OpResult OpDelConsumer(const OpArgs& op_args, string_view key, string_ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, string_view id) { auto cgr_res = FindGroup(op_args, key, gname); - if (!cgr_res) - return cgr_res.status(); - - streamCG* cg = cgr_res->cg; - if (cg == nullptr) - return OpStatus::SKIPPED; + RETURN_ON_BAD_STATUS(cgr_res); streamID sid; ParsedStreamId parsed_id; @@ -1399,7 +1368,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri return OpStatus::SYNTAX_ERR; } } - cg->last_id = sid; + cgr_res->cg->last_id = sid; return OpStatus::OK; } @@ -1487,9 +1456,8 @@ OpResult OpDel(const OpArgs& op_args, string_view key, absl::Span OpAck(const OpArgs& op_args, string_view key, string_view gname, absl::Span ids) { - auto res = FindGroup(op_args, key, gname); - if (!res) - return res.status(); + auto res = FindGroup(op_args, key, gname, false); + RETURN_ON_BAD_STATUS(res); if (res->cg == nullptr || res->s == nullptr) { return 0; @@ -1516,9 +1484,9 @@ OpResult OpAck(const OpArgs& op_args, string_view key, string_view gna } OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts) { - auto cgr_res = FindGroup(op_args, key, opts.group); - if (!cgr_res) - return cgr_res.status(); + auto cgr_res = FindGroup(op_args, key, opts.group, false); + RETURN_ON_BAD_STATUS(cgr_res); + stream* stream = cgr_res->s; streamCG* group = cgr_res->cg; @@ -1566,12 +1534,11 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl continue; } - op_args.shard->tmp_str1 = - sdscpylen(op_args.shard->tmp_str1, opts.consumer.data(), opts.consumer.size()); + auto cname = WrapSds(opts.consumer); if (consumer == nullptr) { - consumer = streamLookupConsumer(group, op_args.shard->tmp_str1, SLC_DEFAULT); + consumer = streamLookupConsumer(group, cname, SLC_DEFAULT); if (consumer == nullptr) { - consumer = streamCreateConsumer(group, op_args.shard->tmp_str1, nullptr, 0, SCC_DEFAULT); + consumer = streamCreateConsumer(group, cname, nullptr, 0, SCC_DEFAULT); } } @@ -1722,29 +1689,19 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer* OpResult OpPending(const OpArgs& op_args, string_view key, const PendingOpts& opts) { auto cgroup_res = FindGroup(op_args, key, opts.group_name); - if (!cgroup_res) { - return cgroup_res.status(); - } + RETURN_ON_BAD_STATUS(cgroup_res); - streamCG* cg = cgroup_res->cg; - if (cg == nullptr) { - return OpStatus::SKIPPED; - } - - auto* shard = op_args.shard; streamConsumer* consumer = nullptr; if (!opts.consumer_name.empty()) { - shard->tmp_str1 = - sdscpylen(shard->tmp_str1, opts.consumer_name.data(), opts.consumer_name.size()); - consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH); + consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH); } PendingResult result; if (opts.count == -1) { - result = GetPendingReducedResult(cg); + result = GetPendingReducedResult(cgroup_res->cg); } else { - result = GetPendingExtendedResult(cg, consumer, opts); + result = GetPendingExtendedResult(cgroup_res->cg, consumer, opts); } return result; } @@ -2784,9 +2741,7 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) { // Update group pointer and check it's validity if (opts->read_group) { - owner->tmp_str1 = - sdscpylen(owner->tmp_str1, opts->group_name.data(), opts->group_name.length()); - sitem.group = streamLookupCG(s, owner->tmp_str1); + sitem.group = streamLookupCG(s, WrapSds(opts->group_name)); if (!sitem.group) return true; // abort } @@ -2829,12 +2784,11 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) { // Update consumer if (sitem.group) { - shard->tmp_str1 = - sdscpylen(shard->tmp_str1, opts->consumer_name.data(), opts->consumer_name.length()); - range_opts.consumer = streamLookupConsumer(sitem.group, shard->tmp_str1, SLC_NO_REFRESH); + auto cname = WrapSds(opts->consumer_name); + range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH); if (!range_opts.consumer) { - range_opts.consumer = streamCreateConsumer(sitem.group, shard->tmp_str1, NULL, 0, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + range_opts.consumer = + streamCreateConsumer(sitem.group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } } @@ -2897,19 +2851,15 @@ variant HasEntries2(const OpArgs& op_args, string_view streamCG* group = nullptr; streamConsumer* consumer = nullptr; if (opts->read_group) { - auto& tmp_str = op_args.shard->tmp_str1; - tmp_str = sdscpylen(tmp_str, opts->group_name.data(), opts->group_name.size()); - group = streamLookupCG(s, tmp_str); - + group = streamLookupCG(s, WrapSds(opts->group_name)); if (!group) return facade::ErrorReply{ NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")}; - tmp_str = sdscpylen(tmp_str, opts->consumer_name.data(), opts->consumer_name.size()); - consumer = streamLookupConsumer(group, tmp_str, SLC_NO_REFRESH); + auto cname = WrapSds(opts->consumer_name); + consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH); if (!consumer) { - consumer = streamCreateConsumer(group, op_args.shard->tmp_str1, NULL, 0, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + consumer = streamCreateConsumer(group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } requested_sitem.group = group; diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 65f74eb72..bdf4a8885 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -28,6 +28,7 @@ extern "C" { #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/family_utils.h" #include "server/transaction.h" namespace dfly { @@ -979,7 +980,6 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ unsigned added = 0; unsigned updated = 0; - sds& tmp_str = op_args.shard->tmp_str1; double new_score = 0; int retflags = 0; @@ -1006,9 +1006,8 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ for (size_t j = 0; j < members.size(); j++) { const auto& m = members[j]; - tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size()); - - int retval = robj_wrapper->ZsetAdd(m.first, tmp_str, zparams.flags, &retflags, &new_score); + int retval = + robj_wrapper->ZsetAdd(m.first, WrapSds(m.second), zparams.flags, &retflags, &new_score); if (zparams.flags & ZADD_IN_INCR) { if (retval == 0) { @@ -1430,9 +1429,7 @@ OpResult OpRank(const OpArgs& op_args, string_view key, string_view me } DCHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST); detail::SortedMap* ss = (detail::SortedMap*)robj_wrapper->inner_obj(); - op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size()); - - std::optional rank = ss->GetRank(op_args.shard->tmp_str1, reverse); + std::optional rank = ss->GetRank(WrapSds(member), reverse); if (!rank) return OpStatus::KEY_NOTFOUND; @@ -1539,12 +1536,10 @@ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRang return res_it.status(); detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper(); - sds& tmp_str = op_args.shard->tmp_str1; unsigned deleted = 0; - for (string_view member : members) { - tmp_str = sdscpylen(tmp_str, member.data(), member.size()); - deleted += ZsetDel(robj_wrapper, tmp_str); - } + for (string_view member : members) + deleted += ZsetDel(robj_wrapper, WrapSds(member)); + auto zlen = robj_wrapper->Size(); res_it->post_updater.Run(); @@ -1566,11 +1561,8 @@ OpResult OpScore(const OpArgs& op_args, string_view key, string_view mem return res_it.status(); const PrimeValue& pv = res_it.value()->second; - sds& tmp_str = op_args.shard->tmp_str1; - tmp_str = sdscpylen(tmp_str, member.data(), member.size()); - const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper(); - auto res = GetZsetScore(robj_wrapper, tmp_str); + auto res = GetZsetScore(robj_wrapper, WrapSds(member)); if (!res) { return OpStatus::MEMBER_NOTFOUND; } @@ -1586,13 +1578,10 @@ OpResult OpMScore(const OpArgs& op_args, string_view key, MScoreResponse scores(members.Size()); const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper(); - sds& tmp_str = op_args.shard->tmp_str1; size_t i = 0; - for (string_view member : members.Range()) { - tmp_str = sdscpylen(tmp_str, member.data(), member.size()); - scores[i++] = GetZsetScore(robj_wrapper, tmp_str); - } + for (string_view member : members.Range()) + scores[i++] = GetZsetScore(robj_wrapper, WrapSds(member)); return scores; }