mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix(cluster): Don't miss updates in FLUSHSLOTS (#2783)
* fix(flushslots): Don't miss updates in `FLUSHSLOTS` This PR registers for PreUpdate() from inside the `FLUSHSLOTS` fiber so that any attempt to update a to-be-deleted key will work as expected (first delete, then apply the change). This fixes several issues: * Any attempt to touch bucket B (like insert a key), where another key in B should be removed, caused us to _not_ remove the latter key * Commands which use an existing value but not completely override then, like `APPEND` and `LPUSH` did not treat the key as removed but instead used the original value Fixes #2771 * fix flushslots syntax in test * EXPECT_EQ(key:0, xxxx) * dbsize
This commit is contained in:
parent
7d093460f0
commit
1d04683c48
2 changed files with 116 additions and 157 deletions
|
@ -409,12 +409,17 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
|
|||
}
|
||||
|
||||
PreUpdate(cntx.db_index, res->it);
|
||||
return {{res->it, res->exp_it,
|
||||
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = res->it,
|
||||
.key = key})}};
|
||||
// PreUpdate() might have caused a deletion of `it`
|
||||
if (res->it.IsOccupied()) {
|
||||
return {{res->it, res->exp_it,
|
||||
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = res->it,
|
||||
.key = key})}};
|
||||
} else {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
}
|
||||
|
||||
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) {
|
||||
|
@ -577,15 +582,20 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
|
|||
|
||||
if (res.ok()) {
|
||||
PreUpdate(cntx.db_index, res->it);
|
||||
return DbSlice::AddOrFindResult{
|
||||
.it = res->it,
|
||||
.exp_it = res->exp_it,
|
||||
.is_new = false,
|
||||
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = res->it,
|
||||
.key = key})};
|
||||
// PreUpdate() might have caused a deletion of `it`
|
||||
if (res->it.IsOccupied()) {
|
||||
return DbSlice::AddOrFindResult{
|
||||
.it = res->it,
|
||||
.exp_it = res->exp_it,
|
||||
.is_new = false,
|
||||
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
.db_ind = cntx.db_index,
|
||||
.it = res->it,
|
||||
.key = key})};
|
||||
} else {
|
||||
res = OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
}
|
||||
auto status = res.status();
|
||||
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;
|
||||
|
@ -706,7 +716,8 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
|
|||
// Slot deletion can take time as it traverses all the database, hence it runs in fiber.
|
||||
// We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb
|
||||
// was made. Therefore we delete slots entries with version < next_version
|
||||
uint64_t next_version = NextVersion();
|
||||
uint64_t next_version = 0;
|
||||
|
||||
std::string tmp;
|
||||
auto del_entry_cb = [&](PrimeTable::iterator it) {
|
||||
std::string_view key = it->first.GetSlice(&tmp);
|
||||
|
@ -717,6 +728,33 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
|
|||
return true;
|
||||
};
|
||||
|
||||
auto on_change = [&](DbIndex db_index, const ChangeReq& req) {
|
||||
FiberAtomicGuard fg;
|
||||
PrimeTable* table = GetTables(db_index).first;
|
||||
|
||||
auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) {
|
||||
while (!it.is_done()) {
|
||||
del_entry_cb(it);
|
||||
++it;
|
||||
}
|
||||
};
|
||||
|
||||
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
||||
if (bit->GetVersion() < next_version) {
|
||||
iterate_bucket(db_index, *bit);
|
||||
}
|
||||
} else {
|
||||
string_view key = get<string_view>(req.change);
|
||||
table->CVCUponInsert(
|
||||
next_version, key,
|
||||
[this, db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
|
||||
DCHECK_LT(it.GetVersion(), next_version);
|
||||
iterate_bucket(db_index, it);
|
||||
});
|
||||
}
|
||||
};
|
||||
next_version = RegisterOnChange(std::move(on_change));
|
||||
|
||||
ServerState& etl = *ServerState::tlocal();
|
||||
PrimeTable* pt = &db_arr_[0]->prime;
|
||||
PrimeTable::Cursor cursor;
|
||||
|
@ -730,6 +768,9 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
|
|||
}
|
||||
|
||||
} while (cursor && etl.gstate() != GlobalState::SHUTTING_DOWN);
|
||||
|
||||
UnregisterOnChange(next_version);
|
||||
|
||||
etl.DecommitMemory(ServerState::kDataHeap);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue