fix(bug): access invalid prime table iterator (#2300)

The bug:
When running dragonfly in cache mode we bump up items on dash table when we find them. If we access few items on the callback that reside next to each other we will invalidate the first found iterator.

The fix:
After we bump up entry we insert the prime table ref to bump set. When checking if we can bump down an item we check the item is not in this set. Once we finish running the transaction callback we clear the set.

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-12-20 13:05:29 +02:00 committed by GitHub
parent 6c32c8004d
commit 6398a73942
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 74 additions and 16 deletions

View file

@ -166,10 +166,16 @@ class PrimeEvictionPolicy {
class PrimeBumpPolicy {
public:
// returns true if key can be made less important for eviction (opposite of bump up)
bool CanBumpDown(const CompactObj& key) const {
return !key.IsSticky();
PrimeBumpPolicy(const absl::flat_hash_set<CompactObjectView, PrimeHasher>& bumped_items)
: bumped_items_(bumped_items) {
}
// returns true if key can be made less important for eviction (opposite of bump up)
bool CanBumpDown(const CompactObj& obj) const {
return !obj.IsSticky() && !bumped_items_.contains(obj);
}
private:
const absl::flat_hash_set<CompactObjectView, PrimeHasher>& bumped_items_;
};
bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const {
@ -468,12 +474,11 @@ DbSlice::ItAndExp DbSlice::FindInternal(const Context& cntx, std::string_view ke
ccb.second(cntx.db_index, bit);
}
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
res.it = db.prime.BumpUp(res.it, PrimeBumpPolicy{});
res.it = db.prime.BumpUp(res.it, PrimeBumpPolicy{bumped_items_});
++events_.bumpups;
bumped_items_.insert(res.it->first.AsRef());
}
db.top_keys.Touch(key);
@ -625,7 +630,7 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
DbContext cntx{db_ind, GetCurrentTimeMs()};
doc_del_cb_(key, cntx, it->second);
}
bumped_items_.erase(it->first.AsRef());
PerformDeletion(it, shard_owner(), db.get());
deletion_count_++;
@ -686,7 +691,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
tiered->CancelAllIos(index);
}
}
CHECK(bumped_items_.empty());
auto cb = [this, flush_db_arr = std::move(flush_db_arr)]() mutable {
for (auto& db_ptr : flush_db_arr) {
if (db_ptr && db_ptr->stats.tiered_entries > 0) {
@ -1410,4 +1415,8 @@ void DbSlice::TrackKeys(const facade::Connection::WeakRef& conn, const ArgSlice&
}
}
void DbSlice::OnCbFinish() {
bumped_items_.clear();
}
} // namespace dfly

View file

@ -276,6 +276,8 @@ class DbSlice {
return shard_id_;
}
void OnCbFinish();
bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args);
void Release(IntentLock::Mode m, const KeyLockArgs& lock_args);
@ -445,6 +447,9 @@ class DbSlice {
// ordered from the smallest to largest version.
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
// Used in temporary computations in Find item and CbFinish
mutable absl::flat_hash_set<CompactObjectView, PrimeHasher> bumped_items_;
// Registered by shard indices on when first document index is created.
DocDeletionCallback doc_del_cb_;

View file

@ -228,6 +228,48 @@ TEST_F(StringFamilyTest, MGetSet) {
set_fb.Join();
}
TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
absl::FlagSaver fs;
SetTestFlag("cache_mode", "true");
ResetService();
Run({"debug", "populate", "100000", "key", "32", "RAND"});
// Scan starts traversing the database, because we populated the database with lots of items we
// assume that scan will return items from the same bucket that reside next to each other.
auto resp = Run({"scan", "0"});
ASSERT_THAT(resp, ArrLen(2));
StringVec vec = StrArray(resp.GetVec()[1]);
ASSERT_GE(vec.size(), 10);
auto get_bump_ups = [](const string& str) -> size_t {
const string matcher = "bump_ups:";
const auto pos = str.find(matcher) + matcher.size();
const auto sub = str.substr(pos, 1);
return atoi(sub.c_str());
};
resp = Run({"info", "stats"});
EXPECT_EQ(get_bump_ups(resp.GetString()), 0);
auto mget_resp = StrArray(Run(
{"mget", vec[0], vec[1], vec[2], vec[3], vec[4], vec[5], vec[6], vec[7], vec[8], vec[9]}));
resp = Run({"info", "stats"});
size_t bumps1 = get_bump_ups(resp.GetString());
EXPECT_GT(bumps1, 0);
EXPECT_LT(bumps1, 10); // we assume that some bumps are blocked because items reside next to each
// other in the slot.
for (int i = 0; i < 10; ++i) {
auto get_resp = Run({"get", vec[i]});
EXPECT_EQ(get_resp, mget_resp[i]);
}
resp = Run({"info", "stats"});
size_t bumps2 = get_bump_ups(resp.GetString());
EXPECT_GT(bumps2, bumps1);
}
TEST_F(StringFamilyTest, MSetGet) {
Run({"mset", "x", "0", "y", "0", "a", "0", "b", "0"});
ASSERT_EQ(2, GetDebugInfo().shards_count);

View file

@ -47,6 +47,12 @@ inline bool IsValid(ExpireConstIterator it) {
return !it.is_done();
}
struct PrimeHasher {
size_t operator()(const PrimeKey& o) const {
return o.HashCode();
}
};
struct SlotStats {
uint64_t key_count = 0;
uint64_t total_reads = 0;

View file

@ -100,12 +100,6 @@ static size_t ExternalizeEntry(size_t item_offset, DbTableStats* stats, PrimeVal
return item_size;
}
struct PrimeHasher {
size_t operator()(const PrimeKey& o) const {
return o.HashCode();
}
};
struct SingleRequest {
SingleRequest(size_t blob_len, int64 offset, string key)
: blob_len(blob_len), offset(offset), key(std::move(key)) {
@ -422,7 +416,7 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
CHECK_LT(bin_record.pending_entries.size(), max_entries);
VLOG(2) << "ScheduleOffload:" << it->first.ToString();
bin_record.pending_entries.insert(it->first);
bin_record.pending_entries.insert(it->first.AsRef());
it->second.SetIoPending(true);
if (bin_record.pending_entries.size() < max_entries)

View file

@ -497,6 +497,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
if (is_concluding) // Check last hop
LogAutoJournalOnShard(shard);
shard->db_slice().OnCbFinish();
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);
@ -950,7 +951,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard);
sd.is_armed.store(false, memory_order_relaxed);
@ -1238,6 +1239,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
DCHECK_EQ(unique_shard_cnt_, 1u);
auto* shard = EngineShard::tlocal();
auto status = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard);
return status;
}