fix(server): mget crash on same key get (#2474)

* fix(server): mget crash on same key get

fix: #2465
the bug: on cache mode mget bumps up items. When executing mget with the same key several times i.e mget key key we will invalidate the iterator when we bump up the item in dash table.
the fix: bump up/down items only once by using bumped_items set
This PR also reverts c225113
and updates the bumped stats and bumped_items set if the item was bumped

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-01-28 11:45:35 +02:00 committed by GitHub
parent 3ebb32df3f
commit 9f4c4353b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 62 additions and 37 deletions

View file

@ -1660,9 +1660,12 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
uint8_t fp_hash = key_hash & kFpMask;
assert(fp_hash == from.Fp(slot));
if (!bp.CanBump(from.key[slot])) {
return Iterator{bid, slot};
}
if (bid < kRegularBucketCnt) {
// non stash case.
if (slot > 0 && bp.CanBumpDown(from.key[slot - 1])) {
if (slot > 0 && bp.CanBump(from.key[slot - 1])) {
from.Swap(slot - 1, slot);
return Iterator{bid, uint8_t(slot - 1)};
}
@ -1697,7 +1700,7 @@ auto Segment<Key, Value, Policy>::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha
// Don't move sticky items back to the stash because they're not evictable
// TODO: search for first swappable item
if (!bp.CanBumpDown(swapb.key[kLastSlot])) {
if (!bp.CanBump(swapb.key[kLastSlot])) {
target.SetStashPtr(stash_pos, fp_hash, &next);
return Iterator{bid, slot};
}

View file

@ -78,7 +78,7 @@ struct UInt64Policy : public BasicDashPolicy {
};
struct RelaxedBumpPolicy {
bool CanBumpDown(uint64_t key) const {
bool CanBump(uint64_t key) const {
return true;
}
};
@ -396,7 +396,7 @@ TEST_F(DashTest, BumpUp) {
TEST_F(DashTest, BumpPolicy) {
struct RestrictedBumpPolicy {
bool CanBumpDown(uint64_t key) const {
bool CanBump(uint64_t key) const {
return false;
}
};

View file

@ -117,16 +117,16 @@ class PrimeEvictionPolicy {
class PrimeBumpPolicy {
public:
PrimeBumpPolicy(const absl::flat_hash_set<CompactObjectView>& bumped_items)
: bumped_items_(bumped_items) {
PrimeBumpPolicy(const absl::flat_hash_set<CompactObjectView>& fetched_items)
: fetched_items_(fetched_items) {
}
// returns true if key can be made less important for eviction (opposite of bump up)
bool CanBumpDown(const CompactObj& obj) const {
return !obj.IsSticky() && !bumped_items_.contains(obj);
// returns true if we can change the object location in dash table.
bool CanBump(const CompactObj& obj) const {
return !obj.IsSticky() && !fetched_items_.contains(obj);
}
private:
const absl::flat_hash_set<CompactObjectView>& bumped_items_;
const absl::flat_hash_set<CompactObjectView>& fetched_items_;
};
bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const {
@ -497,9 +497,12 @@ OpResult<DbSlice::ItAndExp> DbSlice::FindInternal(const Context& cntx, std::stri
};
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
}
res.it = db.prime.BumpUp(res.it, PrimeBumpPolicy{bumped_items_});
++events_.bumpups;
bumped_items_.insert(res.it->first.AsRef());
auto bump_it = db.prime.BumpUp(res.it, PrimeBumpPolicy{fetched_items_});
if (bump_it != res.it) { // the item was bumped
res.it = bump_it;
++events_.bumpups;
}
fetched_items_.insert(res.it->first.AsRef());
}
db.top_keys.Touch(key);
@ -673,7 +676,7 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
DbContext cntx{db_ind, GetCurrentTimeMs()};
doc_del_cb_(key, cntx, it->second);
}
bumped_items_.erase(it->first.AsRef());
fetched_items_.erase(it->first.AsRef());
PerformDeletion(it, db.get());
deletion_count_++;
@ -734,7 +737,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
tiered->CancelAllIos(index);
}
}
CHECK(bumped_items_.empty());
CHECK(fetched_items_.empty());
auto cb = [this, flush_db_arr = std::move(flush_db_arr)]() mutable {
for (auto& db_ptr : flush_db_arr) {
if (db_ptr && db_ptr->stats.tiered_entries > 0) {
@ -1536,7 +1539,7 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
void DbSlice::OnCbFinish() {
// TBD update bumpups logic we can not clear now after cb finish as cb can preempt
// btw what do we do with inline?
bumped_items_.clear();
fetched_items_.clear();
}
} // namespace dfly

View file

@ -482,7 +482,7 @@ class DbSlice {
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
// Used in temporary computations in Find item and CbFinish
mutable absl::flat_hash_set<CompactObjectView> bumped_items_;
mutable absl::flat_hash_set<CompactObjectView> fetched_items_;
// Registered by shard indices on when first document index is created.
DocDeletionCallback doc_del_cb_;

View file

@ -520,24 +520,6 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const
if (it.is_done())
continue;
// If keys contain the same key several time,
// then with cache_mode=true we may have a "data race":
// The first Find(key) will return the iterator after it bumped it up,
// the second Find(key) above will also return the iterator but it will
// bump up the key again, and the first iterator will be invalidated.
// TODO: to understand better the dynamics of this scenario and to fix it.
if (it->first != keys[i]) {
LOG(WARNING) << "Inconcistent key(" << i << "), expected " << keys[i] << " but found "
<< it->first.ToString();
string key_arr;
for (unsigned j = 0; j < keys.size(); ++j) {
absl::StrAppend(&key_arr, keys[j], ",");
}
key_arr.pop_back();
LOG(WARNING) << "The keys requested are: [" << key_arr << "]";
it = db_slice.GetDBTable(t->GetDbContext().db_index)->prime.Find(keys[i]);
CHECK(!it.is_done());
}
auto& resp = response.resp_arr[i].emplace();
size_t size = CopyValueToBuffer(it->second, next);

View file

@ -241,7 +241,7 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
absl::FlagSaver fs;
SetTestFlag("cache_mode", "true");
ResetService();
Run({"debug", "populate", "100000", "key", "32", "RAND"});
Run({"debug", "populate", "18000", "key", "32", "RAND"});
// Scan starts traversing the database, because we populated the database with lots of items we
// assume that scan will return items from the same bucket that reside next to each other.
@ -253,7 +253,9 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
auto get_bump_ups = [](const string& str) -> size_t {
const string matcher = "bump_ups:";
const auto pos = str.find(matcher) + matcher.size();
const auto sub = str.substr(pos, 1);
const auto next_new_line =
str.find("\r\n", pos); // Find the position of the next "\r\n" after the initial position
const auto sub = str.substr(pos, next_new_line - pos);
return atoi(sub.c_str());
};
@ -279,6 +281,41 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2276) {
EXPECT_GT(bumps2, bumps1);
}
TEST_F(StringFamilyTest, MGetCachingModeBug2465) {
absl::FlagSaver fs;
SetTestFlag("cache_mode", "true");
ResetService();
Run({"debug", "populate", "18000", "key", "32", "RAND"});
// Scan starts traversing the database, because we populated the database with lots of items we
// assume that scan will return items from the same bucket that reside next to each other.
auto resp = Run({"scan", "0"});
ASSERT_THAT(resp, ArrLen(2));
StringVec vec = StrArray(resp.GetVec()[1]);
ASSERT_GE(vec.size(), 10);
auto get_bump_ups = [](const string& str) -> size_t {
const string matcher = "bump_ups:";
const auto pos = str.find(matcher) + matcher.size();
const auto next_new_line =
str.find("\r\n", pos); // Find the position of the next "\r\n" after the initial position
const auto sub = str.substr(pos, next_new_line - pos);
return atoi(sub.c_str());
};
resp = Run({"info", "stats"});
EXPECT_EQ(get_bump_ups(resp.GetString()), 0);
Run({"del", vec[1]});
Run({"lpush", vec[1], "a"});
auto mget_resp = StrArray(Run({"mget", vec[2], vec[2], vec[2]}));
resp = Run({"info", "stats"});
size_t bumps = get_bump_ups(resp.GetString());
EXPECT_EQ(bumps, 2); // one bump for del and one for the mget key
}
TEST_F(StringFamilyTest, MSetGet) {
Run({"mset", "x", "0", "y", "0", "a", "0", "b", "0"});
ASSERT_EQ(2, GetDebugInfo().shards_count);