feat(server): Move bumpup logic out of FindInternal (#4877)

Bumpup logic is moved to OnCbFinish. Previously keys which are going to
be delete were also bumped up but with this change if key doesn't exists
on callback we will skip it.

Closes #4775

Signed-off-by: mkaruza <mario@dragonflydb.io>
This commit is contained in:
mkaruza 2025-04-07 14:15:13 +02:00 committed by GitHub
parent b9ff1be7d8
commit 378bcda8a2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 111 additions and 43 deletions

View file

@ -355,20 +355,9 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
class DbSlice::PrimeBumpPolicy {
public:
PrimeBumpPolicy(absl::flat_hash_set<uint64_t, FpHasher>* items) : fetched_items_(items) {
}
// returns true if we can change the object location in dash table.
bool CanBump(const CompactObj& obj) const {
if (obj.IsSticky()) {
return false;
}
auto hc = obj.HashCode();
return fetched_items_->insert(hc).second;
return !obj.IsSticky();
}
private:
mutable absl::flat_hash_set<uint64_t, FpHasher>* fetched_items_;
};
DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner)
@ -565,21 +554,9 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
}
DCHECK(IsValid(res.it));
if (IsCacheMode()) {
if (!change_cb_.empty()) {
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(cntx.db_index, key, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
// We must not change the bucket's internal order during serialization
serialization_latch_.Wait();
auto bump_it = db.prime.BumpUp(res.it, PrimeBumpPolicy{&fetched_items_});
if (bump_it != res.it) { // the item was bumped
res.it = bump_it;
++events_.bumpups;
}
if (IsCacheMode()) {
fetched_items_.insert({res.it->first.HashCode(), cntx.db_index});
}
switch (stats_mode) {
@ -1714,10 +1691,40 @@ void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
PerformDeletionAtomic(del_it, exp_it, table);
}
void DbSlice::OnCbFinish() {
// TBD update bumpups logic we can not clear now after cb finish as cb can preempt
// btw what do we do with inline?
fetched_items_.clear();
void DbSlice::OnCbFinishBlocking() {
if (IsCacheMode()) {
// move fetched items to local variable
auto fetched_items = std::move(fetched_items_);
for (const auto& [key_hash, db_index] : fetched_items) {
auto& db = *db_arr_[db_index];
// We intentionally don't do extra key checking on this callback to speedup
// fetching. Probability of having hash collision is quite low and for bumpup
// purposes it should be fine if different key (with same hash) is returned.
auto predicate = [](const PrimeKey&) { return true; };
PrimeIterator it = db.prime.FindFirst(key_hash, predicate);
if (!IsValid(it)) {
continue;
}
if (!change_cb_.empty()) {
auto key = it->first.ToString();
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
CallChangeCallbacks(db_index, key, bit);
};
db.prime.CVCUponBump(change_cb_.back().first, it, bump_cb);
}
// We must not change the bucket's internal order during serialization
serialization_latch_.Wait();
auto bump_it = db.prime.BumpUp(it, PrimeBumpPolicy{});
if (bump_it != it) { // the item was bumped
++events_.bumpups;
}
}
}
if (!pending_send_map_.empty()) {
SendQueuedInvalidationMessages();

View file

@ -357,7 +357,7 @@ class DbSlice {
return shard_id_;
}
void OnCbFinish();
void OnCbFinishBlocking();
bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);
@ -615,10 +615,16 @@ class DbSlice {
DbTableArray db_arr_;
// key for bump up items pair contains <key hash, db_index>
using FetchedItemKey = std::pair<uint64_t, DbIndex>;
struct FpHasher {
size_t operator()(uint64_t val) const {
return val;
}
size_t operator()(const FetchedItemKey& val) const {
return val.first;
}
};
// Used in temporary computations in Acquire/Release.
@ -635,7 +641,7 @@ class DbSlice {
// for operations that preempt in the middle we have another mechanism -
// auto laundering iterators, so in case of preemption we do not mind that fetched_items are
// cleared or changed.
mutable absl::flat_hash_set<uint64_t, FpHasher> fetched_items_;
mutable absl::flat_hash_set<FetchedItemKey, FpHasher> fetched_items_;
// Registered by shard indices on when first document index is created.
DocDeletionCallback doc_del_cb_;

View file

@ -907,11 +907,11 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys,
ess.AwaitRunningOnShardQueue([&](EngineShard* shard) {
DoPopulateBatch(options, ps[shard->shard_id()]);
// Debug populate does not use transaction framework therefore we call OnCbFinish manually
// after running the callback
// Note that running debug populate while running flushall/db can cause dcheck fail because the
// finish cb is executed just when we finish populating the database.
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinish();
// Debug populate does not use transaction framework therefore we call OnCbFinishBlocking
// manually after running the callback Note that running debug populate while running
// flushall/db can cause dcheck fail because the finish cb is executed just when we finish
// populating the database.
cntx_->ns->GetDbSlice(shard->shard_id()).OnCbFinishBlocking();
});
}

View file

@ -283,9 +283,9 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
resp = Run({"info", "stats"});
size_t bumps1 = get_bump_ups(resp.GetString());
EXPECT_GT(bumps1, 0);
EXPECT_LT(bumps1, 10); // we assume that some bumps are blocked because items reside next to each
// other in the slot.
EXPECT_GE(bumps1, 0);
EXPECT_LE(bumps1, 10);
for (int i = 0; i < 10; ++i) {
auto get_resp = Run({"get", vec[i]});
@ -332,7 +332,7 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2465) {
resp = Run({"info", "stats"});
size_t bumps = get_bump_ups(resp.GetString());
EXPECT_EQ(bumps, 3); // one bump for del and one for get and one for mget
EXPECT_EQ(bumps, 2); // one bump for get and one for mget
}
TEST_F(StringFamilyTest, MSetGet) {

View file

@ -695,7 +695,7 @@ void Transaction::RunCallback(EngineShard* shard) {
}
auto& db_slice = GetDbSlice(shard->shard_id());
db_slice.OnCbFinish();
db_slice.OnCbFinishBlocking();
// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
@ -1364,7 +1364,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
auto& db_slice = GetDbSlice(shard->shard_id());
auto result = cb(this, shard);
db_slice.OnCbFinish();
db_slice.OnCbFinishBlocking();
LogAutoJournalOnShard(shard, result);
MaybeInvokeTrackingCb();

View file

@ -287,3 +287,58 @@ async def test_rename_huge_values(df_factory, type):
target_data = await DebugPopulateSeeder.capture(client)
assert source_data == target_data
@pytest.mark.asyncio
async def test_key_bump_ups(df_factory):
master = df_factory.create(
proactor_threads=2,
cache_mode="true",
)
df_factory.start_all([master])
c_master = master.client()
await c_master.execute_command("DEBUG POPULATE 18000 KEY 32 RAND")
info = await c_master.info("stats")
assert info["bump_ups"] == 0
keys = await c_master.execute_command("SCAN 0")
keys = keys[1][0:10]
# Bump keys
for key in keys:
await c_master.execute_command("GET " + key)
info = await c_master.info("stats")
assert info["bump_ups"] <= 10
# Multi get bump
await c_master.execute_command("MGET " + " ".join(keys))
info = await c_master.info("stats")
assert info["bump_ups"] >= 10 and info["bump_ups"] <= 20
last_bump_ups = info["bump_ups"]
for key in keys:
await c_master.execute_command("DEL " + key)
# DEL should not bump up any key
info = await c_master.info("stats")
assert last_bump_ups == info["bump_ups"]
# Find key that has slot > 0 and bump it
while True:
keys = await c_master.execute_command("SCAN 0")
key = keys[1][0]
debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
if slot_id == 0:
# delete the key and continue
await c_master.execute_command("DEL " + key)
continue
await c_master.execute_command("GET " + key)
debug_key_info = await c_master.execute_command("DEBUG OBJECT " + key)
new_slot_id = int(dict(map(lambda s: s.split(":"), debug_key_info.split()))["slot"])
assert new_slot_id + 1 == slot_id
break