mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
* feat(server): Implement STICK command #219 Signed-off-by: Vladislav Oleshko <vladislav.oleshko@gmail.com>
This commit is contained in:
parent
77bb34fab5
commit
25e700f39f
9 changed files with 191 additions and 12 deletions
|
@ -108,6 +108,7 @@ class CompactObj {
|
|||
ASCII1_ENC_BIT = 8,
|
||||
ASCII2_ENC_BIT = 0x10,
|
||||
IO_PENDING = 0x20,
|
||||
STICKY = 0x40,
|
||||
};
|
||||
|
||||
static constexpr uint8_t kEncMask = ASCII1_ENC_BIT | ASCII2_ENC_BIT;
|
||||
|
@ -213,6 +214,18 @@ class CompactObj {
|
|||
}
|
||||
}
|
||||
|
||||
bool IsSticky() const {
|
||||
return mask_ & STICKY;
|
||||
}
|
||||
|
||||
void SetSticky(bool s) {
|
||||
if (s) {
|
||||
mask_ |= STICKY;
|
||||
} else {
|
||||
mask_ &= ~STICKY;
|
||||
}
|
||||
}
|
||||
|
||||
unsigned Encoding() const;
|
||||
unsigned ObjType() const;
|
||||
|
||||
|
|
|
@ -240,9 +240,10 @@ class DashTable : public detail::DashTableBase {
|
|||
// Returns true if an element was deleted i.e the rightmost slot was busy.
|
||||
bool ShiftRight(bucket_iterator it);
|
||||
|
||||
iterator BumpUp(iterator it) {
|
||||
template<typename BumpPolicy>
|
||||
iterator BumpUp(iterator it, const BumpPolicy& bp) {
|
||||
SegmentIterator seg_it =
|
||||
segment_[it.seg_id_]->BumpUp(it.bucket_id_, it.slot_id_, DoHash(it->first));
|
||||
segment_[it.seg_id_]->BumpUp(it.bucket_id_, it.slot_id_, DoHash(it->first), bp);
|
||||
|
||||
return iterator{this, it.seg_id_, seg_it.index, seg_it.slot};
|
||||
}
|
||||
|
|
|
@ -522,7 +522,7 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
|
|||
}
|
||||
|
||||
// Bumps up this entry making it more "important" for the eviction policy.
|
||||
Iterator BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash);
|
||||
template<typename BumpPolicy> Iterator BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash, const BumpPolicy& ev);
|
||||
|
||||
// Tries to move stash entries back to their normal buckets (exact or neighour).
|
||||
// Returns number of entries that succeeded to unload.
|
||||
|
@ -1544,7 +1544,8 @@ auto Segment<Key, Value, Policy>::FindValidStartingFrom(unsigned bid, unsigned s
|
|||
}
|
||||
|
||||
template <typename Key, typename Value, typename Policy>
|
||||
auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash) -> Iterator {
|
||||
template <typename BumpPolicy>
|
||||
auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash, const BumpPolicy& bp) -> Iterator {
|
||||
auto& from = bucket_[bid];
|
||||
|
||||
uint8_t target_bid = BucketIndex(key_hash);
|
||||
|
@ -1554,12 +1555,12 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
|
|||
|
||||
if (bid < kNumBuckets) {
|
||||
// non stash case.
|
||||
if (slot > 0) {
|
||||
if (slot > 0 && bp.CanBumpDown(from.key[slot - 1])) {
|
||||
from.Swap(slot - 1, slot);
|
||||
return Iterator{bid, uint8_t(slot - 1)};
|
||||
}
|
||||
// TODO: We could promote further, by swapping probing bucket with its previous one.
|
||||
return Iterator{bid, slot};
|
||||
return Iterator{bid, slot};
|
||||
}
|
||||
|
||||
// stash bucket
|
||||
|
@ -1587,6 +1588,13 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
|
|||
constexpr unsigned kLastSlot = kNumSlots - 1;
|
||||
assert(swapb.GetBusy() & (1 << kLastSlot));
|
||||
|
||||
// Don't move sticky items back to the stash because they're not evictable
|
||||
// TODO: search for first swappable item
|
||||
if (!bp.CanBumpDown(swapb.key[kLastSlot])) {
|
||||
target.SetStashPtr(stash_pos, fp_hash, &next);
|
||||
return Iterator{bid, slot};
|
||||
}
|
||||
|
||||
uint8_t swap_fp = swapb.Fp(kLastSlot);
|
||||
|
||||
// is_probing for the existing entry in swapb. It's unrelated to bucket_offs,
|
||||
|
|
|
@ -75,6 +75,12 @@ struct UInt64Policy : public BasicDashPolicy {
|
|||
}
|
||||
};
|
||||
|
||||
struct RelaxedBumpPolicy {
|
||||
bool CanBumpDown(uint64_t key) const {
|
||||
return true;
|
||||
}
|
||||
};
|
||||
|
||||
class CappedResource final : public std::pmr::memory_resource {
|
||||
public:
|
||||
explicit CappedResource(size_t cap) : cap_(cap) {
|
||||
|
@ -350,7 +356,7 @@ TEST_F(DashTest, BumpUp) {
|
|||
EXPECT_EQ(touched_bid[0], 1);
|
||||
|
||||
// Bump up
|
||||
segment_.BumpUp(kFirstStashId, 5, hash);
|
||||
segment_.BumpUp(kFirstStashId, 5, hash, RelaxedBumpPolicy{});
|
||||
|
||||
// expect the key to move
|
||||
EXPECT_TRUE(segment_.GetBucket(1).IsFull());
|
||||
|
@ -365,13 +371,40 @@ TEST_F(DashTest, BumpUp) {
|
|||
EXPECT_EQ(1, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid));
|
||||
EXPECT_EQ(touched_bid[0], kSecondStashId);
|
||||
|
||||
segment_.BumpUp(kSecondStashId, 9, hash);
|
||||
segment_.BumpUp(kSecondStashId, 9, hash, RelaxedBumpPolicy{});
|
||||
ASSERT_TRUE(key == segment_.Key(0, kNumSlots - 1) || key == segment_.Key(1, kNumSlots - 1));
|
||||
EXPECT_TRUE(segment_.GetBucket(kSecondStashId).IsFull());
|
||||
EXPECT_TRUE(Contains(key));
|
||||
EXPECT_TRUE(segment_.Key(kSecondStashId, 9));
|
||||
}
|
||||
|
||||
TEST_F(DashTest, BumpPolicy) {
|
||||
struct RestrictedBumpPolicy {
|
||||
bool CanBumpDown(uint64_t key) const {
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
set<Segment::Key_t> keys = FillSegment(0);
|
||||
constexpr unsigned kFirstStashId = Segment::kNumBuckets;
|
||||
|
||||
EXPECT_TRUE(segment_.GetBucket(0).IsFull());
|
||||
EXPECT_TRUE(segment_.GetBucket(1).IsFull());
|
||||
EXPECT_TRUE(segment_.GetBucket(kFirstStashId).IsFull());
|
||||
|
||||
// check items are immovable in bucket
|
||||
Segment::Key_t key = segment_.Key(1, 2);
|
||||
uint64_t hash = dt_.DoHash(key);
|
||||
segment_.BumpUp(1, 2, hash, RestrictedBumpPolicy{});
|
||||
EXPECT_EQ(key, segment_.Key(1, 2));
|
||||
|
||||
// check items don't swap from stash
|
||||
key = segment_.Key(kFirstStashId, 2);
|
||||
hash = dt_.DoHash(key);
|
||||
segment_.BumpUp(kFirstStashId, 2, hash, RestrictedBumpPolicy{});
|
||||
EXPECT_EQ(key, segment_.Key(kFirstStashId, 2));
|
||||
}
|
||||
|
||||
TEST_F(DashTest, Insert2) {
|
||||
uint64_t k = 1191;
|
||||
ASSERT_EQ(2019837007031366716, UInt64Policy::HashFn(k));
|
||||
|
@ -954,7 +987,7 @@ TEST_P(EvictionPolicyTest, HitRateZipf) {
|
|||
<< it.slot_id();
|
||||
} else {
|
||||
if (use_bumps)
|
||||
dt_.BumpUp(it);
|
||||
dt_.BumpUp(it, RelaxedBumpPolicy{});
|
||||
++hits;
|
||||
}
|
||||
}
|
||||
|
@ -984,7 +1017,7 @@ TEST_P(EvictionPolicyTest, HitRateZipfShr) {
|
|||
}
|
||||
} else {
|
||||
if (use_bumps) {
|
||||
dt_.BumpUp(it);
|
||||
dt_.BumpUp(it, RelaxedBumpPolicy{});
|
||||
DVLOG(1) << "Bump up key " << key << " " << it.bucket_id() << " slot " << it.slot_id();
|
||||
} else {
|
||||
DVLOG(1) << "Hit on key " << key;
|
||||
|
|
|
@ -89,6 +89,14 @@ class PrimeEvictionPolicy {
|
|||
const bool can_evict_;
|
||||
};
|
||||
|
||||
class PrimeBumpPolicy {
|
||||
public:
|
||||
// returns true if key can be made less important for eviction (opposite of bump up)
|
||||
bool CanBumpDown(const CompactObj& key) const {
|
||||
return !key.IsSticky();
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
|
||||
unsigned res = 0;
|
||||
|
@ -124,6 +132,10 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
|||
auto last_slot_it = bucket_it;
|
||||
last_slot_it += (PrimeTable::kBucketWidth - 1);
|
||||
if (!last_slot_it.is_done()) {
|
||||
// don't evict sticky items
|
||||
if (last_slot_it->first.IsSticky()) {
|
||||
return 0;
|
||||
}
|
||||
if (last_slot_it->second.HasExpire()) {
|
||||
ExpireTable* expire_tbl = db_slice_->GetTables(db_indx_).second;
|
||||
CHECK_EQ(1u, expire_tbl->Erase(last_slot_it->first));
|
||||
|
@ -259,7 +271,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view
|
|||
db.prime.CVCUponBump(change_cb_.front().first, res.first, bump_cb);
|
||||
}
|
||||
|
||||
res.first = db.prime.BumpUp(res.first);
|
||||
res.first = db.prime.BumpUp(res.first, PrimeBumpPolicy{});
|
||||
++events_.bumpups;
|
||||
}
|
||||
|
||||
|
|
|
@ -499,6 +499,32 @@ TEST_F(DflyEngineTest, Bug207) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, StickyEviction) {
|
||||
shard_set->TEST_EnableHeartBeat();
|
||||
shard_set->TEST_EnableCacheMode();
|
||||
max_memory_limit = 0;
|
||||
|
||||
string tmp_val(100, '.');
|
||||
|
||||
ssize_t failed = -1;
|
||||
for (ssize_t i = 0; i < 5000; ++i) {
|
||||
auto set_resp = Run({"set", StrCat("key", i), tmp_val});
|
||||
auto stick_resp = Run({"stick", StrCat("key", i)});
|
||||
|
||||
if (set_resp != "OK") {
|
||||
failed = i;
|
||||
break;
|
||||
}
|
||||
ASSERT_THAT(stick_resp, IntArg(1));
|
||||
}
|
||||
|
||||
ASSERT_GE(failed, 0);
|
||||
// Make sure neither of the sticky values was evicted
|
||||
for (ssize_t i = 0; i < failed; ++i) {
|
||||
ASSERT_THAT(Run({"exists", StrCat("key", i)}), IntArg(1));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, PSubscribe) {
|
||||
single_response_ = false;
|
||||
auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); });
|
||||
|
|
|
@ -52,6 +52,7 @@ class Renamer {
|
|||
string_view key;
|
||||
PrimeValue ref_val;
|
||||
uint64_t expire_ts;
|
||||
bool sticky;
|
||||
bool found = false;
|
||||
};
|
||||
|
||||
|
@ -77,6 +78,7 @@ void Renamer::Find(Transaction* t) {
|
|||
if (IsValid(it)) {
|
||||
res->ref_val = it->second.AsRef();
|
||||
res->expire_ts = db_slice.ExpireTime(exp_it);
|
||||
res->sticky = it->first.IsSticky();
|
||||
}
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
@ -157,6 +159,8 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
|
|||
dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts);
|
||||
}
|
||||
|
||||
dest_it->first.SetSticky(src_res_.sticky);
|
||||
|
||||
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
|
||||
es->blocking_controller()->AwakeWatched(db_indx_, dest_key);
|
||||
}
|
||||
|
@ -436,6 +440,29 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
|
||||
Transaction* transaction = cntx->transaction;
|
||||
VLOG(1) << "Stick " << ArgS(args, 1);
|
||||
|
||||
atomic_uint32_t result{0};
|
||||
|
||||
auto cb = [&result](const Transaction* t, EngineShard* shard) {
|
||||
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
|
||||
auto res = OpStick(t->GetOpArgs(shard), args);
|
||||
result.fetch_add(res.value_or(0), memory_order_relaxed);
|
||||
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
|
||||
CHECK_EQ(OpStatus::OK, status);
|
||||
|
||||
DVLOG(2) << "Stick ts " << transaction->txid();
|
||||
|
||||
uint32_t match_cnt = result.load(memory_order_relaxed);
|
||||
(*cntx)->SendLong(match_cnt);
|
||||
}
|
||||
|
||||
void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
|
||||
OpResult<void> st = RenameGeneric(args, false, cntx);
|
||||
(*cntx)->SendError(st.status());
|
||||
|
@ -693,6 +720,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
|
|||
is_prior_list = (to_it->second.ObjType() == OBJ_LIST);
|
||||
}
|
||||
|
||||
bool sticky = from_it->first.IsSticky();
|
||||
uint64_t exp_ts = db_slice.ExpireTime(from_expire);
|
||||
|
||||
// we keep the value we want to move.
|
||||
|
@ -718,12 +746,31 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
|
|||
to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts);
|
||||
}
|
||||
|
||||
to_it->first.SetSticky(sticky);
|
||||
|
||||
if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
|
||||
es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key);
|
||||
}
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys) {
|
||||
DVLOG(1) << "Stick: " << keys[0];
|
||||
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
|
||||
uint32_t res = 0;
|
||||
for (uint32_t i = 0; i < keys.size(); ++i) {
|
||||
auto [it, _] = db_slice.FindExt(op_args.db_ind, keys[i]);
|
||||
if (IsValid(it) && !it->first.IsSticky()) {
|
||||
it->first.SetSticky(true);
|
||||
++res;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
using CI = CommandId;
|
||||
|
||||
#define HFUNC(x) SetHandler(&GenericFamily::x)
|
||||
|
@ -750,7 +797,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl)
|
||||
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
|
||||
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
|
||||
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del);
|
||||
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
|
||||
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -45,6 +45,7 @@ class GenericFamily {
|
|||
static void ExpireAt(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Keys(CmdArgList args, ConnectionContext* cntx);
|
||||
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
|
||||
static void Stick(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static void Rename(CmdArgList args, ConnectionContext* cntx);
|
||||
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -67,6 +68,7 @@ class GenericFamily {
|
|||
static OpResult<uint32_t> OpExists(const OpArgs& op_args, ArgSlice keys);
|
||||
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
|
||||
bool skip_exists);
|
||||
static OpResult<uint32_t> OpStick(const OpArgs& op_args, ArgSlice keys);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -165,6 +165,42 @@ TEST_F(GenericFamilyTest, RenameNx) {
|
|||
ASSERT_EQ(Run({"get", "y"}), x_val);
|
||||
}
|
||||
|
||||
TEST_F(GenericFamilyTest, Stick) {
|
||||
// check stick returns zero on non-existent keys
|
||||
ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0));
|
||||
|
||||
for (auto key: {"a", "b", "c", "d"}) {
|
||||
Run({"set", key, "."});
|
||||
}
|
||||
|
||||
// check stick is applied only once
|
||||
ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(2));
|
||||
ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0));
|
||||
ASSERT_THAT(Run({"stick", "a", "c"}), IntArg(1));
|
||||
ASSERT_THAT(Run({"stick", "b", "d"}), IntArg(1));
|
||||
ASSERT_THAT(Run({"stick", "c", "d"}), IntArg(0));
|
||||
|
||||
// check stickyness presists during writes
|
||||
Run({"set", "a", "new"});
|
||||
ASSERT_THAT(Run({"stick", "a"}), IntArg(0));
|
||||
Run({"append", "a", "-value"});
|
||||
ASSERT_THAT(Run({"stick", "a"}), IntArg(0));
|
||||
|
||||
// check rename persists stickyness
|
||||
Run({"rename", "a", "k"});
|
||||
ASSERT_THAT(Run({"stick", "k"}), IntArg(0));
|
||||
|
||||
// check rename perists stickyness on multiple shards
|
||||
Run({"del", "b"});
|
||||
string b_val(32, 'b');
|
||||
string x_val(32, 'x');
|
||||
Run({"mset", "b", b_val, "x", x_val});
|
||||
ASSERT_EQ(2, last_cmd_dbg_info_.shards_count);
|
||||
Run({"stick", "x"});
|
||||
Run({"rename", "x", "b"});
|
||||
ASSERT_THAT(Run({"stick", "b"}), IntArg(0));
|
||||
}
|
||||
|
||||
|
||||
using testing::AnyOf;
|
||||
using testing::Each;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue