mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(generic_family): implement RANDOMKEY command (#2639)
Signed-off-by: Leonardo Mello <lsvmello@gmail.com>
This commit is contained in:
parent
cf9f10e94e
commit
8ef92629c5
4 changed files with 76 additions and 1 deletions
|
@ -5,6 +5,7 @@
|
|||
|
||||
#include <vector>
|
||||
|
||||
#include "absl/random/random.h"
|
||||
#include "base/pmr/memory_resource.h"
|
||||
#include "core/dash_internal.h"
|
||||
|
||||
|
@ -225,6 +226,10 @@ class DashTable : public detail::DashTableBase {
|
|||
return double(size()) / (SegmentType::capacity() * unique_segments());
|
||||
}
|
||||
|
||||
// Gets a random cursor based on the available segments and buckets.
|
||||
// Returns: cursor with a random position
|
||||
Cursor GetRandomCursor(absl::BitGen* bitgen);
|
||||
|
||||
// Traverses over a single bucket in table and calls cb(iterator) 0 or more
|
||||
// times. if cursor=0 starts traversing from the beginning, otherwise continues from where it
|
||||
// stopped. returns 0 if the supplied cursor reached end of traversal. Traverse iterates at bucket
|
||||
|
@ -916,6 +921,14 @@ auto DashTable<_Key, _Value, Policy>::TraverseBySegmentOrder(Cursor curs, Cb&& c
|
|||
return Cursor{global_depth_, sid, bid};
|
||||
}
|
||||
|
||||
template <typename _Key, typename _Value, typename Policy>
|
||||
auto DashTable<_Key, _Value, Policy>::GetRandomCursor(absl::BitGen* bitgen) -> Cursor {
|
||||
uint32_t sid = absl::Uniform<uint32_t>(*bitgen, 0, segment_.size());
|
||||
uint8_t bid = absl::Uniform<uint8_t>(*bitgen, 0, kLogicalBucketNum);
|
||||
|
||||
return Cursor{global_depth_, sid, bid};
|
||||
}
|
||||
|
||||
template <typename _Key, typename _Value, typename Policy>
|
||||
template <typename Cb>
|
||||
auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor {
|
||||
|
|
|
@ -4,6 +4,10 @@
|
|||
|
||||
#include "server/generic_family.h"
|
||||
|
||||
#include <optional>
|
||||
|
||||
#include "facade/reply_builder.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/crc64.h"
|
||||
#include "redis/util.h"
|
||||
|
@ -1509,6 +1513,54 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
void GenericFamily::RandomKey(CmdArgList args, ConnectionContext* cntx) {
|
||||
const static size_t kMaxAttempts = 3;
|
||||
|
||||
absl::BitGen bitgen;
|
||||
atomic_size_t candidates_counter{0};
|
||||
DbContext db_cntx{.db_index = cntx->conn_state.db_index};
|
||||
ScanOpts scan_opts;
|
||||
scan_opts.limit = 3; // number of entries per shard
|
||||
std::vector<StringVec> candidates_collection(shard_set->size());
|
||||
|
||||
shard_set->RunBriefInParallel(
|
||||
[&](EngineShard* shard) {
|
||||
auto [prime_table, expire_table] = shard->db_slice().GetTables(db_cntx.db_index);
|
||||
if (prime_table->size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
StringVec* candidates = &candidates_collection[shard->shard_id()];
|
||||
|
||||
for (size_t i = 0; i <= kMaxAttempts; ++i) {
|
||||
if (!candidates->empty()) {
|
||||
break;
|
||||
}
|
||||
uint64_t cursor = 0; // scans from the start of the shard after reaching kMaxAttemps
|
||||
if (i < kMaxAttempts) {
|
||||
cursor = prime_table->GetRandomCursor(&bitgen).value();
|
||||
}
|
||||
OpScan({shard, 0u, db_cntx}, scan_opts, &cursor, candidates);
|
||||
}
|
||||
|
||||
candidates_counter.fetch_add(candidates->size(), memory_order_relaxed);
|
||||
},
|
||||
[&](ShardId) { return true; });
|
||||
|
||||
auto candidates_count = candidates_counter.load(memory_order_relaxed);
|
||||
std::optional<string> random_key = std::nullopt;
|
||||
auto random_idx = absl::Uniform<size_t>(bitgen, 0, candidates_count);
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
for (const auto& candidate : candidates_collection) {
|
||||
if (random_idx >= candidate.size()) {
|
||||
random_idx -= candidate.size();
|
||||
} else {
|
||||
return rb->SendBulkString(candidate[random_idx]);
|
||||
}
|
||||
}
|
||||
rb->SendNull();
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&GenericFamily::x)
|
||||
|
@ -1580,7 +1632,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort)
|
||||
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC(
|
||||
Move)
|
||||
<< CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore);
|
||||
<< CI{"RESTORE", CO::WRITE, -4, 1, 1, acl::kRestore}.HFUNC(Restore)
|
||||
<< CI{"RANDOMKEY", CO::READONLY, 1, 0, 0, 0}.HFUNC(RandomKey);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -68,6 +68,7 @@ class GenericFamily {
|
|||
static void Type(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Dump(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Restore(CmdArgList args, ConnectionContext* cntx);
|
||||
static void RandomKey(CmdArgList args, ConnectionContext* cntx);
|
||||
static void FieldTtl(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static OpResult<void> RenameGeneric(CmdArgList args, bool skip_exist_dest,
|
||||
|
|
|
@ -734,4 +734,12 @@ TEST_F(GenericFamilyTest, FieldTtl) {
|
|||
EXPECT_EQ(-3, CheckedInt({"fieldttl", "k2", "f4"}));
|
||||
}
|
||||
|
||||
TEST_F(GenericFamilyTest, RandomKey) {
|
||||
auto resp = Run({"randomkey"});
|
||||
EXPECT_EQ(resp.type, RespExpr::NIL);
|
||||
|
||||
resp = Run({"set", "k1", "1"});
|
||||
EXPECT_EQ(Run({"randomkey"}), "k1");
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue