mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: WrapSds from family_utils.h (#3818)
* chore: WrapSds from family_utils.h --------- Signed-off-by: Roman Gershman <romange@gmail.com> Signed-off-by: Kostas Kyrimis <kostaskyrim@gmail.com> Co-authored-by: Roman Gershman <romange@gmail.com> Co-authored-by: Kostas Kyrimis <kostaskyrim@gmail.com>
This commit is contained in:
parent
fa288c19b2
commit
beb2ec2b29
11 changed files with 176 additions and 216 deletions
|
@ -45,7 +45,7 @@ endif()
|
||||||
|
|
||||||
add_library(dragonfly_lib bloom_family.cc
|
add_library(dragonfly_lib bloom_family.cc
|
||||||
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc engine_shard.cc
|
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc engine_shard.cc
|
||||||
engine_shard_set.cc
|
engine_shard_set.cc family_utils.cc
|
||||||
generic_family.cc hset_family.cc http_api.cc json_family.cc
|
generic_family.cc hset_family.cc http_api.cc json_family.cc
|
||||||
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
||||||
protocol_client.cc
|
protocol_client.cc
|
||||||
|
|
|
@ -423,37 +423,6 @@ std::ostream& operator<<(std::ostream& os, const GlobalState& state) {
|
||||||
return os << GlobalStateName(state);
|
return os << GlobalStateName(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
|
|
||||||
CHECK_GT(max_range, RandomPick(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
RandomPick NonUniquePicksGenerator::Generate() {
|
|
||||||
return absl::Uniform(bitgen_, 0u, max_range_);
|
|
||||||
}
|
|
||||||
|
|
||||||
UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
|
|
||||||
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
|
|
||||||
CHECK_GE(max_range, picks_count);
|
|
||||||
current_random_limit_ = max_range - picks_count;
|
|
||||||
}
|
|
||||||
|
|
||||||
RandomPick UniquePicksGenerator::Generate() {
|
|
||||||
DCHECK_GT(remaining_picks_count_, 0u);
|
|
||||||
|
|
||||||
remaining_picks_count_--;
|
|
||||||
|
|
||||||
const RandomPick max_index = current_random_limit_++;
|
|
||||||
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);
|
|
||||||
|
|
||||||
const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
|
|
||||||
if (random_index_is_picked) {
|
|
||||||
return random_index;
|
|
||||||
}
|
|
||||||
|
|
||||||
picked_indexes_.insert(max_index);
|
|
||||||
return max_index;
|
|
||||||
}
|
|
||||||
|
|
||||||
ThreadLocalMutex::ThreadLocalMutex() {
|
ThreadLocalMutex::ThreadLocalMutex() {
|
||||||
shard_ = EngineShard::tlocal();
|
shard_ = EngineShard::tlocal();
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <absl/random/random.h>
|
|
||||||
#include <absl/strings/ascii.h>
|
#include <absl/strings/ascii.h>
|
||||||
#include <absl/strings/str_cat.h>
|
#include <absl/strings/str_cat.h>
|
||||||
#include <absl/types/span.h>
|
#include <absl/types/span.h>
|
||||||
|
@ -318,49 +317,6 @@ struct MemoryBytesFlag {
|
||||||
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err);
|
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err);
|
||||||
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag);
|
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag);
|
||||||
|
|
||||||
using RandomPick = std::uint32_t;
|
|
||||||
|
|
||||||
class PicksGenerator {
|
|
||||||
public:
|
|
||||||
virtual RandomPick Generate() = 0;
|
|
||||||
virtual ~PicksGenerator() = default;
|
|
||||||
};
|
|
||||||
|
|
||||||
class NonUniquePicksGenerator : public PicksGenerator {
|
|
||||||
public:
|
|
||||||
/* The generated value will be within the closed-open interval [0, max_range) */
|
|
||||||
NonUniquePicksGenerator(RandomPick max_range);
|
|
||||||
|
|
||||||
RandomPick Generate() override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
const RandomPick max_range_;
|
|
||||||
absl::BitGen bitgen_{};
|
|
||||||
};
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Generates unique index in O(1).
|
|
||||||
*
|
|
||||||
* picks_count specifies the number of random indexes to be generated.
|
|
||||||
* In other words, this is the number of times the Generate() function is called.
|
|
||||||
*
|
|
||||||
* The class uses Robert Floyd's sampling algorithm
|
|
||||||
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
|
|
||||||
* */
|
|
||||||
class UniquePicksGenerator : public PicksGenerator {
|
|
||||||
public:
|
|
||||||
/* The generated value will be within the closed-open interval [0, max_range) */
|
|
||||||
UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range);
|
|
||||||
|
|
||||||
RandomPick Generate() override;
|
|
||||||
|
|
||||||
private:
|
|
||||||
RandomPick current_random_limit_;
|
|
||||||
std::uint32_t remaining_picks_count_;
|
|
||||||
std::unordered_set<RandomPick> picked_indexes_;
|
|
||||||
absl::BitGen bitgen_{};
|
|
||||||
};
|
|
||||||
|
|
||||||
// Helper class used to guarantee atomicity between serialization of buckets
|
// Helper class used to guarantee atomicity between serialization of buckets
|
||||||
class ABSL_LOCKABLE ThreadLocalMutex {
|
class ABSL_LOCKABLE ThreadLocalMutex {
|
||||||
public:
|
public:
|
||||||
|
|
|
@ -376,17 +376,11 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
|
||||||
txq_([](const Transaction* t) { return t->txid(); }),
|
txq_([](const Transaction* t) { return t->txid(); }),
|
||||||
mi_resource_(heap),
|
mi_resource_(heap),
|
||||||
shard_id_(pb->GetPoolIndex()) {
|
shard_id_(pb->GetPoolIndex()) {
|
||||||
tmp_str1 = sdsempty();
|
|
||||||
|
|
||||||
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
|
defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); });
|
||||||
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
|
queue_.Start(absl::StrCat("shard_queue_", shard_id()));
|
||||||
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
|
queue2_.Start(absl::StrCat("l2_queue_", shard_id()));
|
||||||
}
|
}
|
||||||
|
|
||||||
EngineShard::~EngineShard() {
|
|
||||||
sdsfree(tmp_str1);
|
|
||||||
}
|
|
||||||
|
|
||||||
void EngineShard::Shutdown() {
|
void EngineShard::Shutdown() {
|
||||||
DVLOG(1) << "EngineShard::Shutdown";
|
DVLOG(1) << "EngineShard::Shutdown";
|
||||||
|
|
||||||
|
|
|
@ -41,9 +41,6 @@ class EngineShard {
|
||||||
Stats& operator+=(const Stats&);
|
Stats& operator+=(const Stats&);
|
||||||
};
|
};
|
||||||
|
|
||||||
// EngineShard() is private down below.
|
|
||||||
~EngineShard();
|
|
||||||
|
|
||||||
// Sets up a new EngineShard in the thread.
|
// Sets up a new EngineShard in the thread.
|
||||||
// If update_db_time is true, initializes periodic time update for its db_slice.
|
// If update_db_time is true, initializes periodic time update for its db_slice.
|
||||||
static void InitThreadLocal(util::ProactorBase* pb);
|
static void InitThreadLocal(util::ProactorBase* pb);
|
||||||
|
@ -122,9 +119,6 @@ class EngineShard {
|
||||||
return shard_search_indices_.get();
|
return shard_search_indices_.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
// for everyone to use for string transformations during atomic cpu sequences.
|
|
||||||
sds tmp_str1;
|
|
||||||
|
|
||||||
// Moving average counters.
|
// Moving average counters.
|
||||||
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };
|
enum MovingCnt { TTL_TRAVERSE, TTL_DELETE, COUNTER_TOTAL };
|
||||||
|
|
||||||
|
|
43
src/server/family_utils.cc
Normal file
43
src/server/family_utils.cc
Normal file
|
@ -0,0 +1,43 @@
|
||||||
|
#include "server/family_utils.h"
|
||||||
|
|
||||||
|
#include "base/logging.h"
|
||||||
|
|
||||||
|
namespace dfly {
|
||||||
|
|
||||||
|
sds WrapSds(std::string_view s) {
|
||||||
|
static thread_local sds tmp_sds = sdsempty();
|
||||||
|
return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length());
|
||||||
|
}
|
||||||
|
|
||||||
|
NonUniquePicksGenerator::NonUniquePicksGenerator(RandomPick max_range) : max_range_(max_range) {
|
||||||
|
CHECK_GT(max_range, RandomPick(0));
|
||||||
|
}
|
||||||
|
|
||||||
|
RandomPick NonUniquePicksGenerator::Generate() {
|
||||||
|
return absl::Uniform(bitgen_, 0u, max_range_);
|
||||||
|
}
|
||||||
|
|
||||||
|
UniquePicksGenerator::UniquePicksGenerator(std::uint32_t picks_count, RandomPick max_range)
|
||||||
|
: remaining_picks_count_(picks_count), picked_indexes_(picks_count) {
|
||||||
|
CHECK_GE(max_range, picks_count);
|
||||||
|
current_random_limit_ = max_range - picks_count;
|
||||||
|
}
|
||||||
|
|
||||||
|
RandomPick UniquePicksGenerator::Generate() {
|
||||||
|
DCHECK_GT(remaining_picks_count_, 0u);
|
||||||
|
|
||||||
|
remaining_picks_count_--;
|
||||||
|
|
||||||
|
const RandomPick max_index = current_random_limit_++;
|
||||||
|
const RandomPick random_index = absl::Uniform(bitgen_, 0u, max_index + 1u);
|
||||||
|
|
||||||
|
const bool random_index_is_picked = picked_indexes_.emplace(random_index).second;
|
||||||
|
if (random_index_is_picked) {
|
||||||
|
return random_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
picked_indexes_.insert(max_index);
|
||||||
|
return max_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
} // namespace dfly
|
63
src/server/family_utils.h
Normal file
63
src/server/family_utils.h
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||||
|
// See LICENSE for licensing terms.
|
||||||
|
//
|
||||||
|
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <absl/container/flat_hash_set.h>
|
||||||
|
#include <absl/random/random.h>
|
||||||
|
|
||||||
|
#include <cstdint>
|
||||||
|
|
||||||
|
extern "C" {
|
||||||
|
#include "redis/sds.h"
|
||||||
|
}
|
||||||
|
namespace dfly {
|
||||||
|
|
||||||
|
// Copy str to thread local sds instance. Valid until next WrapSds call on thread
|
||||||
|
sds WrapSds(std::string_view str);
|
||||||
|
|
||||||
|
using RandomPick = uint32_t;
|
||||||
|
|
||||||
|
class PicksGenerator {
|
||||||
|
public:
|
||||||
|
virtual RandomPick Generate() = 0;
|
||||||
|
virtual ~PicksGenerator() = default;
|
||||||
|
};
|
||||||
|
|
||||||
|
class NonUniquePicksGenerator : public PicksGenerator {
|
||||||
|
public:
|
||||||
|
/* The generated value will be within the closed-open interval [0, max_range) */
|
||||||
|
NonUniquePicksGenerator(RandomPick max_range);
|
||||||
|
|
||||||
|
RandomPick Generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
const RandomPick max_range_;
|
||||||
|
absl::BitGen bitgen_{};
|
||||||
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Generates unique index in O(1).
|
||||||
|
*
|
||||||
|
* picks_count specifies the number of random indexes to be generated.
|
||||||
|
* In other words, this is the number of times the Generate() function is called.
|
||||||
|
*
|
||||||
|
* The class uses Robert Floyd's sampling algorithm
|
||||||
|
* https://dl.acm.org/doi/pdf/10.1145/30401.315746
|
||||||
|
* */
|
||||||
|
class UniquePicksGenerator : public PicksGenerator {
|
||||||
|
public:
|
||||||
|
/* The generated value will be within the closed-open interval [0, max_range) */
|
||||||
|
UniquePicksGenerator(uint32_t picks_count, RandomPick max_range);
|
||||||
|
|
||||||
|
RandomPick Generate() override;
|
||||||
|
|
||||||
|
private:
|
||||||
|
RandomPick current_random_limit_;
|
||||||
|
uint32_t remaining_picks_count_;
|
||||||
|
absl::flat_hash_set<RandomPick> picked_indexes_;
|
||||||
|
absl::BitGen bitgen_{};
|
||||||
|
};
|
||||||
|
|
||||||
|
} // namespace dfly
|
|
@ -20,7 +20,7 @@ extern "C" {
|
||||||
#include "server/container_utils.h"
|
#include "server/container_utils.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
#include "server/server_state.h"
|
#include "server/family_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -283,8 +283,8 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
|
||||||
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
|
int pos = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
|
||||||
|
|
||||||
for (string_view v : vals) {
|
for (string_view v : vals) {
|
||||||
es->tmp_str1 = sdscpylen(es->tmp_str1, v.data(), v.size());
|
auto vsds = WrapSds(v);
|
||||||
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
|
quicklistPush(ql, vsds, sdslen(vsds), pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (res.is_new) {
|
if (res.is_new) {
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
#include "server/set_family.h"
|
#include "server/set_family.h"
|
||||||
|
|
||||||
|
#include "server/family_utils.h"
|
||||||
|
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#include "redis/intset.h"
|
#include "redis/intset.h"
|
||||||
#include "redis/redis_aux.h"
|
#include "redis/redis_aux.h"
|
||||||
|
|
|
@ -19,7 +19,7 @@ extern "C" {
|
||||||
#include "server/conn_context.h"
|
#include "server/conn_context.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
#include "server/server_state.h"
|
#include "server/family_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
@ -1055,8 +1055,7 @@ OpResult<vector<ConsumerInfo>> OpConsumers(const DbContext& db_cntx, EngineShard
|
||||||
vector<ConsumerInfo> result;
|
vector<ConsumerInfo> result;
|
||||||
const CompactObj& cobj = (*res_it)->second;
|
const CompactObj& cobj = (*res_it)->second;
|
||||||
stream* s = GetReadOnlyStream(cobj);
|
stream* s = GetReadOnlyStream(cobj);
|
||||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, group_name.data(), group_name.length());
|
streamCG* cg = streamLookupCG(s, WrapSds(group_name));
|
||||||
streamCG* cg = streamLookupCG(s, shard->tmp_str1);
|
|
||||||
if (cg == NULL) {
|
if (cg == NULL) {
|
||||||
return OpStatus::INVALID_VALUE;
|
return OpStatus::INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
@ -1136,21 +1135,19 @@ struct FindGroupResult {
|
||||||
DbSlice::AutoUpdater post_updater;
|
DbSlice::AutoUpdater post_updater;
|
||||||
};
|
};
|
||||||
|
|
||||||
OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, string_view gname) {
|
OpResult<FindGroupResult> FindGroup(const OpArgs& op_args, string_view key, string_view gname,
|
||||||
auto* shard = op_args.shard;
|
bool skip_group = true) {
|
||||||
auto& db_slice = op_args.GetDbSlice();
|
auto& db_slice = op_args.GetDbSlice();
|
||||||
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
|
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
|
||||||
if (!res_it)
|
RETURN_ON_BAD_STATUS(res_it);
|
||||||
return res_it.status();
|
|
||||||
|
|
||||||
CompactObj& cobj = res_it->it->second;
|
CompactObj& cobj = res_it->it->second;
|
||||||
FindGroupResult res;
|
auto* s = static_cast<stream*>(cobj.RObjPtr());
|
||||||
res.s = (stream*)cobj.RObjPtr();
|
auto* cg = streamLookupCG(s, WrapSds(gname));
|
||||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, gname.data(), gname.size());
|
if (skip_group && !cg)
|
||||||
res.cg = streamLookupCG(res.s, shard->tmp_str1);
|
return OpStatus::SKIPPED;
|
||||||
res.post_updater = std::move(res_it->post_updater);
|
|
||||||
|
|
||||||
return res;
|
return FindGroupResult{s, cg, std::move(res_it->post_updater)};
|
||||||
}
|
}
|
||||||
|
|
||||||
constexpr uint8_t kClaimForce = 1 << 0;
|
constexpr uint8_t kClaimForce = 1 << 0;
|
||||||
|
@ -1210,11 +1207,8 @@ void AppendClaimResultItem(ClaimInfo& result, stream* s, streamID id) {
|
||||||
OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts,
|
OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts,
|
||||||
absl::Span<streamID> ids) {
|
absl::Span<streamID> ids) {
|
||||||
auto cgr_res = FindGroup(op_args, key, opts.group);
|
auto cgr_res = FindGroup(op_args, key, opts.group);
|
||||||
if (!cgr_res)
|
RETURN_ON_BAD_STATUS(cgr_res);
|
||||||
return cgr_res.status();
|
|
||||||
if (!cgr_res->cg) {
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
}
|
|
||||||
streamConsumer* consumer = nullptr;
|
streamConsumer* consumer = nullptr;
|
||||||
auto now = GetCurrentTimeMs();
|
auto now = GetCurrentTimeMs();
|
||||||
ClaimInfo result;
|
ClaimInfo result;
|
||||||
|
@ -1262,12 +1256,10 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
|
||||||
}
|
}
|
||||||
|
|
||||||
// Try to get the consumer. If not found, create a new one.
|
// Try to get the consumer. If not found, create a new one.
|
||||||
op_args.shard->tmp_str1 =
|
auto cname = WrapSds(opts.consumer);
|
||||||
sdscpylen(op_args.shard->tmp_str1, opts.consumer.data(), opts.consumer.size());
|
if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) {
|
||||||
if ((consumer = streamLookupConsumer(cgr_res->cg, op_args.shard->tmp_str1, SLC_NO_REFRESH)) ==
|
consumer =
|
||||||
nullptr) {
|
streamCreateConsumer(cgr_res->cg, cname, nullptr, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
||||||
consumer = streamCreateConsumer(cgr_res->cg, op_args.shard->tmp_str1, nullptr, 0,
|
|
||||||
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the entry belongs to the same consumer, we don't have to
|
// If the entry belongs to the same consumer, we don't have to
|
||||||
|
@ -1306,10 +1298,8 @@ OpResult<ClaimInfo> OpClaim(const OpArgs& op_args, string_view key, const ClaimO
|
||||||
// XGROUP DESTROY key groupname
|
// XGROUP DESTROY key groupname
|
||||||
OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) {
|
OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gname) {
|
||||||
auto cgr_res = FindGroup(op_args, key, gname);
|
auto cgr_res = FindGroup(op_args, key, gname);
|
||||||
if (!cgr_res)
|
RETURN_ON_BAD_STATUS(cgr_res);
|
||||||
return cgr_res.status();
|
|
||||||
|
|
||||||
if (cgr_res->cg) {
|
|
||||||
raxRemove(cgr_res->s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL);
|
raxRemove(cgr_res->s->cgroups, (uint8_t*)(gname.data()), gname.size(), NULL);
|
||||||
streamFreeCG(cgr_res->cg);
|
streamFreeCG(cgr_res->cg);
|
||||||
|
|
||||||
|
@ -1320,9 +1310,6 @@ OpStatus OpDestroyGroup(const OpArgs& op_args, string_view key, string_view gnam
|
||||||
}
|
}
|
||||||
|
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
}
|
|
||||||
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
struct GroupConsumerPair {
|
struct GroupConsumerPair {
|
||||||
|
@ -1339,16 +1326,10 @@ struct GroupConsumerPairOpts {
|
||||||
OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, string_view gname,
|
OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, string_view gname,
|
||||||
string_view consumer_name) {
|
string_view consumer_name) {
|
||||||
auto cgroup_res = FindGroup(op_args, key, gname);
|
auto cgroup_res = FindGroup(op_args, key, gname);
|
||||||
if (!cgroup_res)
|
RETURN_ON_BAD_STATUS(cgroup_res);
|
||||||
return cgroup_res.status();
|
|
||||||
streamCG* cg = cgroup_res->cg;
|
|
||||||
if (cg == nullptr)
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
|
|
||||||
auto* shard = op_args.shard;
|
streamConsumer* consumer = streamCreateConsumer(cgroup_res->cg, WrapSds(consumer_name), NULL, 0,
|
||||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size());
|
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
||||||
streamConsumer* consumer =
|
|
||||||
streamCreateConsumer(cg, shard->tmp_str1, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
|
||||||
|
|
||||||
if (consumer)
|
if (consumer)
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
|
@ -1359,21 +1340,14 @@ OpResult<uint32_t> OpCreateConsumer(const OpArgs& op_args, string_view key, stri
|
||||||
OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname,
|
OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_view gname,
|
||||||
string_view consumer_name) {
|
string_view consumer_name) {
|
||||||
auto cgroup_res = FindGroup(op_args, key, gname);
|
auto cgroup_res = FindGroup(op_args, key, gname);
|
||||||
if (!cgroup_res)
|
RETURN_ON_BAD_STATUS(cgroup_res);
|
||||||
return cgroup_res.status();
|
|
||||||
|
|
||||||
streamCG* cg = cgroup_res->cg;
|
|
||||||
if (cg == nullptr)
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
|
|
||||||
long long pending = 0;
|
long long pending = 0;
|
||||||
auto* shard = op_args.shard;
|
streamConsumer* consumer =
|
||||||
|
streamLookupConsumer(cgroup_res->cg, WrapSds(consumer_name), SLC_NO_REFRESH);
|
||||||
shard->tmp_str1 = sdscpylen(shard->tmp_str1, consumer_name.data(), consumer_name.size());
|
|
||||||
streamConsumer* consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH);
|
|
||||||
if (consumer) {
|
if (consumer) {
|
||||||
pending = raxSize(consumer->pel);
|
pending = raxSize(consumer->pel);
|
||||||
streamDelConsumer(cg, consumer);
|
streamDelConsumer(cgroup_res->cg, consumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pending;
|
return pending;
|
||||||
|
@ -1381,12 +1355,7 @@ OpResult<uint32_t> OpDelConsumer(const OpArgs& op_args, string_view key, string_
|
||||||
|
|
||||||
OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, string_view id) {
|
OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, string_view id) {
|
||||||
auto cgr_res = FindGroup(op_args, key, gname);
|
auto cgr_res = FindGroup(op_args, key, gname);
|
||||||
if (!cgr_res)
|
RETURN_ON_BAD_STATUS(cgr_res);
|
||||||
return cgr_res.status();
|
|
||||||
|
|
||||||
streamCG* cg = cgr_res->cg;
|
|
||||||
if (cg == nullptr)
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
|
|
||||||
streamID sid;
|
streamID sid;
|
||||||
ParsedStreamId parsed_id;
|
ParsedStreamId parsed_id;
|
||||||
|
@ -1399,7 +1368,7 @@ OpStatus OpSetId(const OpArgs& op_args, string_view key, string_view gname, stri
|
||||||
return OpStatus::SYNTAX_ERR;
|
return OpStatus::SYNTAX_ERR;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cg->last_id = sid;
|
cgr_res->cg->last_id = sid;
|
||||||
|
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
}
|
}
|
||||||
|
@ -1487,9 +1456,8 @@ OpResult<uint32_t> OpDel(const OpArgs& op_args, string_view key, absl::Span<stre
|
||||||
// XACK key groupname id [id ...]
|
// XACK key groupname id [id ...]
|
||||||
OpResult<uint32_t> OpAck(const OpArgs& op_args, string_view key, string_view gname,
|
OpResult<uint32_t> OpAck(const OpArgs& op_args, string_view key, string_view gname,
|
||||||
absl::Span<streamID> ids) {
|
absl::Span<streamID> ids) {
|
||||||
auto res = FindGroup(op_args, key, gname);
|
auto res = FindGroup(op_args, key, gname, false);
|
||||||
if (!res)
|
RETURN_ON_BAD_STATUS(res);
|
||||||
return res.status();
|
|
||||||
|
|
||||||
if (res->cg == nullptr || res->s == nullptr) {
|
if (res->cg == nullptr || res->s == nullptr) {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1516,9 +1484,9 @@ OpResult<uint32_t> OpAck(const OpArgs& op_args, string_view key, string_view gna
|
||||||
}
|
}
|
||||||
|
|
||||||
OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts) {
|
OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const ClaimOpts& opts) {
|
||||||
auto cgr_res = FindGroup(op_args, key, opts.group);
|
auto cgr_res = FindGroup(op_args, key, opts.group, false);
|
||||||
if (!cgr_res)
|
RETURN_ON_BAD_STATUS(cgr_res);
|
||||||
return cgr_res.status();
|
|
||||||
stream* stream = cgr_res->s;
|
stream* stream = cgr_res->s;
|
||||||
streamCG* group = cgr_res->cg;
|
streamCG* group = cgr_res->cg;
|
||||||
|
|
||||||
|
@ -1566,12 +1534,11 @@ OpResult<ClaimInfo> OpAutoClaim(const OpArgs& op_args, string_view key, const Cl
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
op_args.shard->tmp_str1 =
|
auto cname = WrapSds(opts.consumer);
|
||||||
sdscpylen(op_args.shard->tmp_str1, opts.consumer.data(), opts.consumer.size());
|
|
||||||
if (consumer == nullptr) {
|
if (consumer == nullptr) {
|
||||||
consumer = streamLookupConsumer(group, op_args.shard->tmp_str1, SLC_DEFAULT);
|
consumer = streamLookupConsumer(group, cname, SLC_DEFAULT);
|
||||||
if (consumer == nullptr) {
|
if (consumer == nullptr) {
|
||||||
consumer = streamCreateConsumer(group, op_args.shard->tmp_str1, nullptr, 0, SCC_DEFAULT);
|
consumer = streamCreateConsumer(group, cname, nullptr, 0, SCC_DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1722,29 +1689,19 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer*
|
||||||
|
|
||||||
OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const PendingOpts& opts) {
|
OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const PendingOpts& opts) {
|
||||||
auto cgroup_res = FindGroup(op_args, key, opts.group_name);
|
auto cgroup_res = FindGroup(op_args, key, opts.group_name);
|
||||||
if (!cgroup_res) {
|
RETURN_ON_BAD_STATUS(cgroup_res);
|
||||||
return cgroup_res.status();
|
|
||||||
}
|
|
||||||
|
|
||||||
streamCG* cg = cgroup_res->cg;
|
|
||||||
if (cg == nullptr) {
|
|
||||||
return OpStatus::SKIPPED;
|
|
||||||
}
|
|
||||||
|
|
||||||
auto* shard = op_args.shard;
|
|
||||||
streamConsumer* consumer = nullptr;
|
streamConsumer* consumer = nullptr;
|
||||||
if (!opts.consumer_name.empty()) {
|
if (!opts.consumer_name.empty()) {
|
||||||
shard->tmp_str1 =
|
consumer = streamLookupConsumer(cgroup_res->cg, WrapSds(opts.consumer_name), SLC_NO_REFRESH);
|
||||||
sdscpylen(shard->tmp_str1, opts.consumer_name.data(), opts.consumer_name.size());
|
|
||||||
consumer = streamLookupConsumer(cg, shard->tmp_str1, SLC_NO_REFRESH);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
PendingResult result;
|
PendingResult result;
|
||||||
|
|
||||||
if (opts.count == -1) {
|
if (opts.count == -1) {
|
||||||
result = GetPendingReducedResult(cg);
|
result = GetPendingReducedResult(cgroup_res->cg);
|
||||||
} else {
|
} else {
|
||||||
result = GetPendingExtendedResult(cg, consumer, opts);
|
result = GetPendingExtendedResult(cgroup_res->cg, consumer, opts);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -2784,9 +2741,7 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
|
||||||
|
|
||||||
// Update group pointer and check it's validity
|
// Update group pointer and check it's validity
|
||||||
if (opts->read_group) {
|
if (opts->read_group) {
|
||||||
owner->tmp_str1 =
|
sitem.group = streamLookupCG(s, WrapSds(opts->group_name));
|
||||||
sdscpylen(owner->tmp_str1, opts->group_name.data(), opts->group_name.length());
|
|
||||||
sitem.group = streamLookupCG(s, owner->tmp_str1);
|
|
||||||
if (!sitem.group)
|
if (!sitem.group)
|
||||||
return true; // abort
|
return true; // abort
|
||||||
}
|
}
|
||||||
|
@ -2829,12 +2784,11 @@ void XReadBlock(ReadOpts* opts, ConnectionContext* cntx) {
|
||||||
|
|
||||||
// Update consumer
|
// Update consumer
|
||||||
if (sitem.group) {
|
if (sitem.group) {
|
||||||
shard->tmp_str1 =
|
auto cname = WrapSds(opts->consumer_name);
|
||||||
sdscpylen(shard->tmp_str1, opts->consumer_name.data(), opts->consumer_name.length());
|
range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH);
|
||||||
range_opts.consumer = streamLookupConsumer(sitem.group, shard->tmp_str1, SLC_NO_REFRESH);
|
|
||||||
if (!range_opts.consumer) {
|
if (!range_opts.consumer) {
|
||||||
range_opts.consumer = streamCreateConsumer(sitem.group, shard->tmp_str1, NULL, 0,
|
range_opts.consumer =
|
||||||
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
streamCreateConsumer(sitem.group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2897,19 +2851,15 @@ variant<bool, facade::ErrorReply> HasEntries2(const OpArgs& op_args, string_view
|
||||||
streamCG* group = nullptr;
|
streamCG* group = nullptr;
|
||||||
streamConsumer* consumer = nullptr;
|
streamConsumer* consumer = nullptr;
|
||||||
if (opts->read_group) {
|
if (opts->read_group) {
|
||||||
auto& tmp_str = op_args.shard->tmp_str1;
|
group = streamLookupCG(s, WrapSds(opts->group_name));
|
||||||
tmp_str = sdscpylen(tmp_str, opts->group_name.data(), opts->group_name.size());
|
|
||||||
group = streamLookupCG(s, tmp_str);
|
|
||||||
|
|
||||||
if (!group)
|
if (!group)
|
||||||
return facade::ErrorReply{
|
return facade::ErrorReply{
|
||||||
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};
|
NoGroupOrKey(skey, opts->group_name, " in XREADGROUP with GROUP option")};
|
||||||
|
|
||||||
tmp_str = sdscpylen(tmp_str, opts->consumer_name.data(), opts->consumer_name.size());
|
auto cname = WrapSds(opts->consumer_name);
|
||||||
consumer = streamLookupConsumer(group, tmp_str, SLC_NO_REFRESH);
|
consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH);
|
||||||
if (!consumer) {
|
if (!consumer) {
|
||||||
consumer = streamCreateConsumer(group, op_args.shard->tmp_str1, NULL, 0,
|
consumer = streamCreateConsumer(group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
||||||
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
requested_sitem.group = group;
|
requested_sitem.group = group;
|
||||||
|
|
|
@ -28,6 +28,7 @@ extern "C" {
|
||||||
#include "server/container_utils.h"
|
#include "server/container_utils.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
|
#include "server/family_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
@ -979,7 +980,6 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
||||||
unsigned added = 0;
|
unsigned added = 0;
|
||||||
unsigned updated = 0;
|
unsigned updated = 0;
|
||||||
|
|
||||||
sds& tmp_str = op_args.shard->tmp_str1;
|
|
||||||
double new_score = 0;
|
double new_score = 0;
|
||||||
int retflags = 0;
|
int retflags = 0;
|
||||||
|
|
||||||
|
@ -1006,9 +1006,8 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
||||||
|
|
||||||
for (size_t j = 0; j < members.size(); j++) {
|
for (size_t j = 0; j < members.size(); j++) {
|
||||||
const auto& m = members[j];
|
const auto& m = members[j];
|
||||||
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
|
int retval =
|
||||||
|
robj_wrapper->ZsetAdd(m.first, WrapSds(m.second), zparams.flags, &retflags, &new_score);
|
||||||
int retval = robj_wrapper->ZsetAdd(m.first, tmp_str, zparams.flags, &retflags, &new_score);
|
|
||||||
|
|
||||||
if (zparams.flags & ZADD_IN_INCR) {
|
if (zparams.flags & ZADD_IN_INCR) {
|
||||||
if (retval == 0) {
|
if (retval == 0) {
|
||||||
|
@ -1430,9 +1429,7 @@ OpResult<unsigned> OpRank(const OpArgs& op_args, string_view key, string_view me
|
||||||
}
|
}
|
||||||
DCHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST);
|
DCHECK_EQ(robj_wrapper->encoding(), OBJ_ENCODING_SKIPLIST);
|
||||||
detail::SortedMap* ss = (detail::SortedMap*)robj_wrapper->inner_obj();
|
detail::SortedMap* ss = (detail::SortedMap*)robj_wrapper->inner_obj();
|
||||||
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size());
|
std::optional<unsigned> rank = ss->GetRank(WrapSds(member), reverse);
|
||||||
|
|
||||||
std::optional<unsigned> rank = ss->GetRank(op_args.shard->tmp_str1, reverse);
|
|
||||||
if (!rank)
|
if (!rank)
|
||||||
return OpStatus::KEY_NOTFOUND;
|
return OpStatus::KEY_NOTFOUND;
|
||||||
|
|
||||||
|
@ -1539,12 +1536,10 @@ OpResult<unsigned> OpRem(const OpArgs& op_args, string_view key, facade::ArgRang
|
||||||
return res_it.status();
|
return res_it.status();
|
||||||
|
|
||||||
detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper();
|
detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper();
|
||||||
sds& tmp_str = op_args.shard->tmp_str1;
|
|
||||||
unsigned deleted = 0;
|
unsigned deleted = 0;
|
||||||
for (string_view member : members) {
|
for (string_view member : members)
|
||||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
deleted += ZsetDel(robj_wrapper, WrapSds(member));
|
||||||
deleted += ZsetDel(robj_wrapper, tmp_str);
|
|
||||||
}
|
|
||||||
auto zlen = robj_wrapper->Size();
|
auto zlen = robj_wrapper->Size();
|
||||||
res_it->post_updater.Run();
|
res_it->post_updater.Run();
|
||||||
|
|
||||||
|
@ -1566,11 +1561,8 @@ OpResult<double> OpScore(const OpArgs& op_args, string_view key, string_view mem
|
||||||
return res_it.status();
|
return res_it.status();
|
||||||
|
|
||||||
const PrimeValue& pv = res_it.value()->second;
|
const PrimeValue& pv = res_it.value()->second;
|
||||||
sds& tmp_str = op_args.shard->tmp_str1;
|
|
||||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
|
||||||
|
|
||||||
const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper();
|
const detail::RobjWrapper* robj_wrapper = pv.GetRobjWrapper();
|
||||||
auto res = GetZsetScore(robj_wrapper, tmp_str);
|
auto res = GetZsetScore(robj_wrapper, WrapSds(member));
|
||||||
if (!res) {
|
if (!res) {
|
||||||
return OpStatus::MEMBER_NOTFOUND;
|
return OpStatus::MEMBER_NOTFOUND;
|
||||||
}
|
}
|
||||||
|
@ -1586,13 +1578,10 @@ OpResult<MScoreResponse> OpMScore(const OpArgs& op_args, string_view key,
|
||||||
MScoreResponse scores(members.Size());
|
MScoreResponse scores(members.Size());
|
||||||
|
|
||||||
const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
|
const detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
|
||||||
sds& tmp_str = op_args.shard->tmp_str1;
|
|
||||||
|
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
for (string_view member : members.Range()) {
|
for (string_view member : members.Range())
|
||||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
scores[i++] = GetZsetScore(robj_wrapper, WrapSds(member));
|
||||||
scores[i++] = GetZsetScore(robj_wrapper, tmp_str);
|
|
||||||
}
|
|
||||||
|
|
||||||
return scores;
|
return scores;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue