From 25e700f39f1e617549bc9fdc1a635a1ea236ec3b Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sat, 20 Aug 2022 16:50:43 +0300 Subject: [PATCH] feat(server): Implement STICK command #219 (#245) * feat(server): Implement STICK command #219 Signed-off-by: Vladislav Oleshko --- src/core/compact_object.h | 13 ++++++++ src/core/dash.h | 5 ++-- src/core/dash_internal.h | 16 +++++++--- src/core/dash_test.cc | 41 ++++++++++++++++++++++--- src/server/db_slice.cc | 14 ++++++++- src/server/dragonfly_test.cc | 26 ++++++++++++++++ src/server/generic_family.cc | 50 ++++++++++++++++++++++++++++++- src/server/generic_family.h | 2 ++ src/server/generic_family_test.cc | 36 ++++++++++++++++++++++ 9 files changed, 191 insertions(+), 12 deletions(-) diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 00665b6a1..1af5d1ca0 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -108,6 +108,7 @@ class CompactObj { ASCII1_ENC_BIT = 8, ASCII2_ENC_BIT = 0x10, IO_PENDING = 0x20, + STICKY = 0x40, }; static constexpr uint8_t kEncMask = ASCII1_ENC_BIT | ASCII2_ENC_BIT; @@ -213,6 +214,18 @@ class CompactObj { } } + bool IsSticky() const { + return mask_ & STICKY; + } + + void SetSticky(bool s) { + if (s) { + mask_ |= STICKY; + } else { + mask_ &= ~STICKY; + } + } + unsigned Encoding() const; unsigned ObjType() const; diff --git a/src/core/dash.h b/src/core/dash.h index d023d88d8..0106509f4 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -240,9 +240,10 @@ class DashTable : public detail::DashTableBase { // Returns true if an element was deleted i.e the rightmost slot was busy. bool ShiftRight(bucket_iterator it); - iterator BumpUp(iterator it) { + template + iterator BumpUp(iterator it, const BumpPolicy& bp) { SegmentIterator seg_it = - segment_[it.seg_id_]->BumpUp(it.bucket_id_, it.slot_id_, DoHash(it->first)); + segment_[it.seg_id_]->BumpUp(it.bucket_id_, it.slot_id_, DoHash(it->first), bp); return iterator{this, it.seg_id_, seg_it.index, seg_it.slot}; } diff --git a/src/core/dash_internal.h b/src/core/dash_internal.h index 7eb178fb9..a8bd045a1 100644 --- a/src/core/dash_internal.h +++ b/src/core/dash_internal.h @@ -522,7 +522,7 @@ template Iterator BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash, const BumpPolicy& ev); // Tries to move stash entries back to their normal buckets (exact or neighour). // Returns number of entries that succeeded to unload. @@ -1544,7 +1544,8 @@ auto Segment::FindValidStartingFrom(unsigned bid, unsigned s } template -auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash) -> Iterator { +template +auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_hash, const BumpPolicy& bp) -> Iterator { auto& from = bucket_[bid]; uint8_t target_bid = BucketIndex(key_hash); @@ -1554,12 +1555,12 @@ auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha if (bid < kNumBuckets) { // non stash case. - if (slot > 0) { + if (slot > 0 && bp.CanBumpDown(from.key[slot - 1])) { from.Swap(slot - 1, slot); return Iterator{bid, uint8_t(slot - 1)}; } // TODO: We could promote further, by swapping probing bucket with its previous one. - return Iterator{bid, slot}; + return Iterator{bid, slot}; } // stash bucket @@ -1587,6 +1588,13 @@ auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha constexpr unsigned kLastSlot = kNumSlots - 1; assert(swapb.GetBusy() & (1 << kLastSlot)); + // Don't move sticky items back to the stash because they're not evictable + // TODO: search for first swappable item + if (!bp.CanBumpDown(swapb.key[kLastSlot])) { + target.SetStashPtr(stash_pos, fp_hash, &next); + return Iterator{bid, slot}; + } + uint8_t swap_fp = swapb.Fp(kLastSlot); // is_probing for the existing entry in swapb. It's unrelated to bucket_offs, diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 6cd1177b5..8435fb434 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -75,6 +75,12 @@ struct UInt64Policy : public BasicDashPolicy { } }; +struct RelaxedBumpPolicy { + bool CanBumpDown(uint64_t key) const { + return true; + } +}; + class CappedResource final : public std::pmr::memory_resource { public: explicit CappedResource(size_t cap) : cap_(cap) { @@ -350,7 +356,7 @@ TEST_F(DashTest, BumpUp) { EXPECT_EQ(touched_bid[0], 1); // Bump up - segment_.BumpUp(kFirstStashId, 5, hash); + segment_.BumpUp(kFirstStashId, 5, hash, RelaxedBumpPolicy{}); // expect the key to move EXPECT_TRUE(segment_.GetBucket(1).IsFull()); @@ -365,13 +371,40 @@ TEST_F(DashTest, BumpUp) { EXPECT_EQ(1, segment_.CVCOnBump(2, kSecondStashId, 9, hash, touched_bid)); EXPECT_EQ(touched_bid[0], kSecondStashId); - segment_.BumpUp(kSecondStashId, 9, hash); + segment_.BumpUp(kSecondStashId, 9, hash, RelaxedBumpPolicy{}); ASSERT_TRUE(key == segment_.Key(0, kNumSlots - 1) || key == segment_.Key(1, kNumSlots - 1)); EXPECT_TRUE(segment_.GetBucket(kSecondStashId).IsFull()); EXPECT_TRUE(Contains(key)); EXPECT_TRUE(segment_.Key(kSecondStashId, 9)); } +TEST_F(DashTest, BumpPolicy) { + struct RestrictedBumpPolicy { + bool CanBumpDown(uint64_t key) const { + return false; + } + }; + + set keys = FillSegment(0); + constexpr unsigned kFirstStashId = Segment::kNumBuckets; + + EXPECT_TRUE(segment_.GetBucket(0).IsFull()); + EXPECT_TRUE(segment_.GetBucket(1).IsFull()); + EXPECT_TRUE(segment_.GetBucket(kFirstStashId).IsFull()); + + // check items are immovable in bucket + Segment::Key_t key = segment_.Key(1, 2); + uint64_t hash = dt_.DoHash(key); + segment_.BumpUp(1, 2, hash, RestrictedBumpPolicy{}); + EXPECT_EQ(key, segment_.Key(1, 2)); + + // check items don't swap from stash + key = segment_.Key(kFirstStashId, 2); + hash = dt_.DoHash(key); + segment_.BumpUp(kFirstStashId, 2, hash, RestrictedBumpPolicy{}); + EXPECT_EQ(key, segment_.Key(kFirstStashId, 2)); +} + TEST_F(DashTest, Insert2) { uint64_t k = 1191; ASSERT_EQ(2019837007031366716, UInt64Policy::HashFn(k)); @@ -954,7 +987,7 @@ TEST_P(EvictionPolicyTest, HitRateZipf) { << it.slot_id(); } else { if (use_bumps) - dt_.BumpUp(it); + dt_.BumpUp(it, RelaxedBumpPolicy{}); ++hits; } } @@ -984,7 +1017,7 @@ TEST_P(EvictionPolicyTest, HitRateZipfShr) { } } else { if (use_bumps) { - dt_.BumpUp(it); + dt_.BumpUp(it, RelaxedBumpPolicy{}); DVLOG(1) << "Bump up key " << key << " " << it.bucket_id() << " slot " << it.slot_id(); } else { DVLOG(1) << "Hit on key " << key; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ca10452e6..f972b45a9 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -89,6 +89,14 @@ class PrimeEvictionPolicy { const bool can_evict_; }; +class PrimeBumpPolicy { +public: + // returns true if key can be made less important for eviction (opposite of bump up) + bool CanBumpDown(const CompactObj& key) const { + return !key.IsSticky(); + } +}; + unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) { unsigned res = 0; @@ -124,6 +132,10 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT auto last_slot_it = bucket_it; last_slot_it += (PrimeTable::kBucketWidth - 1); if (!last_slot_it.is_done()) { + // don't evict sticky items + if (last_slot_it->first.IsSticky()) { + return 0; + } if (last_slot_it->second.HasExpire()) { ExpireTable* expire_tbl = db_slice_->GetTables(db_indx_).second; CHECK_EQ(1u, expire_tbl->Erase(last_slot_it->first)); @@ -259,7 +271,7 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view db.prime.CVCUponBump(change_cb_.front().first, res.first, bump_cb); } - res.first = db.prime.BumpUp(res.first); + res.first = db.prime.BumpUp(res.first, PrimeBumpPolicy{}); ++events_.bumpups; } diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index c4148efb2..0dd6e978c 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -499,6 +499,32 @@ TEST_F(DflyEngineTest, Bug207) { } } +TEST_F(DflyEngineTest, StickyEviction) { + shard_set->TEST_EnableHeartBeat(); + shard_set->TEST_EnableCacheMode(); + max_memory_limit = 0; + + string tmp_val(100, '.'); + + ssize_t failed = -1; + for (ssize_t i = 0; i < 5000; ++i) { + auto set_resp = Run({"set", StrCat("key", i), tmp_val}); + auto stick_resp = Run({"stick", StrCat("key", i)}); + + if (set_resp != "OK") { + failed = i; + break; + } + ASSERT_THAT(stick_resp, IntArg(1)); + } + + ASSERT_GE(failed, 0); + // Make sure neither of the sticky values was evicted + for (ssize_t i = 0; i < failed; ++i) { + ASSERT_THAT(Run({"exists", StrCat("key", i)}), IntArg(1)); + } +} + TEST_F(DflyEngineTest, PSubscribe) { single_response_ = false; auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); }); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index ad370d084..7b99a30d8 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -52,6 +52,7 @@ class Renamer { string_view key; PrimeValue ref_val; uint64_t expire_ts; + bool sticky; bool found = false; }; @@ -77,6 +78,7 @@ void Renamer::Find(Transaction* t) { if (IsValid(it)) { res->ref_val = it->second.AsRef(); res->expire_ts = db_slice.ExpireTime(exp_it); + res->sticky = it->first.IsSticky(); } return OpStatus::OK; }; @@ -157,6 +159,8 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts); } + dest_it->first.SetSticky(src_res_.sticky); + if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { es->blocking_controller()->AwakeWatched(db_indx_, dest_key); } @@ -436,6 +440,29 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { } } +void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { + Transaction* transaction = cntx->transaction; + VLOG(1) << "Stick " << ArgS(args, 1); + + atomic_uint32_t result{0}; + + auto cb = [&result](const Transaction* t, EngineShard* shard) { + ArgSlice args = t->ShardArgsInShard(shard->shard_id()); + auto res = OpStick(t->GetOpArgs(shard), args); + result.fetch_add(res.value_or(0), memory_order_relaxed); + + return OpStatus::OK; + }; + + OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); + CHECK_EQ(OpStatus::OK, status); + + DVLOG(2) << "Stick ts " << transaction->txid(); + + uint32_t match_cnt = result.load(memory_order_relaxed); + (*cntx)->SendLong(match_cnt); +} + void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) { OpResult st = RenameGeneric(args, false, cntx); (*cntx)->SendError(st.status()); @@ -693,6 +720,7 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, is_prior_list = (to_it->second.ObjType() == OBJ_LIST); } + bool sticky = from_it->first.IsSticky(); uint64_t exp_ts = db_slice.ExpireTime(from_expire); // we keep the value we want to move. @@ -718,12 +746,31 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts); } + to_it->first.SetSticky(sticky); + if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key); } return OpStatus::OK; } +OpResult GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys) { + DVLOG(1) << "Stick: " << keys[0]; + + auto& db_slice = op_args.shard->db_slice(); + + uint32_t res = 0; + for (uint32_t i = 0; i < keys.size(); ++i) { + auto [it, _] = db_slice.FindExt(op_args.db_ind, keys[i]); + if (IsValid(it) && !it->first.IsSticky()) { + it->first.SetSticky(true); + ++res; + } + } + + return res; +} + using CI = CommandId; #define HFUNC(x) SetHandler(&GenericFamily::x) @@ -750,7 +797,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type) - << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del); + << CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del) + << CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick); } } // namespace dfly diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 6e1073a2c..1ace67908 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -45,6 +45,7 @@ class GenericFamily { static void ExpireAt(CmdArgList args, ConnectionContext* cntx); static void Keys(CmdArgList args, ConnectionContext* cntx); static void PexpireAt(CmdArgList args, ConnectionContext* cntx); + static void Stick(CmdArgList args, ConnectionContext* cntx); static void Rename(CmdArgList args, ConnectionContext* cntx); static void RenameNx(CmdArgList args, ConnectionContext* cntx); @@ -67,6 +68,7 @@ class GenericFamily { static OpResult OpExists(const OpArgs& op_args, ArgSlice keys); static OpResult OpRen(const OpArgs& op_args, std::string_view from, std::string_view to, bool skip_exists); + static OpResult OpStick(const OpArgs& op_args, ArgSlice keys); }; } // namespace dfly diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 9f5bcea64..b9d5343e6 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -165,6 +165,42 @@ TEST_F(GenericFamilyTest, RenameNx) { ASSERT_EQ(Run({"get", "y"}), x_val); } +TEST_F(GenericFamilyTest, Stick) { + // check stick returns zero on non-existent keys + ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0)); + + for (auto key: {"a", "b", "c", "d"}) { + Run({"set", key, "."}); + } + + // check stick is applied only once + ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(2)); + ASSERT_THAT(Run({"stick", "a", "b"}), IntArg(0)); + ASSERT_THAT(Run({"stick", "a", "c"}), IntArg(1)); + ASSERT_THAT(Run({"stick", "b", "d"}), IntArg(1)); + ASSERT_THAT(Run({"stick", "c", "d"}), IntArg(0)); + + // check stickyness presists during writes + Run({"set", "a", "new"}); + ASSERT_THAT(Run({"stick", "a"}), IntArg(0)); + Run({"append", "a", "-value"}); + ASSERT_THAT(Run({"stick", "a"}), IntArg(0)); + + // check rename persists stickyness + Run({"rename", "a", "k"}); + ASSERT_THAT(Run({"stick", "k"}), IntArg(0)); + + // check rename perists stickyness on multiple shards + Run({"del", "b"}); + string b_val(32, 'b'); + string x_val(32, 'x'); + Run({"mset", "b", b_val, "x", x_val}); + ASSERT_EQ(2, last_cmd_dbg_info_.shards_count); + Run({"stick", "x"}); + Run({"rename", "x", "b"}); + ASSERT_THAT(Run({"stick", "b"}), IntArg(0)); +} + using testing::AnyOf; using testing::Each;