mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: do not preempt on db_slice::RegisterOnChange (#3388)
For big value serialization it is required to support preemption when db_slice::RegisterOnChange is called to avoid UB when a code path is iterating over the change_cb_ and preempts because it serializes a big value. As this is problematic and can lead to data inconsistencies I replace the std::vector with std::list and bound the iteration of change_cb_ on paths that preempt. * replace std::vector with std::list for change_cb_ * bound iteration of change_cb_ on paths that preempt --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
4b851be57a
commit
a95cf2e857
2 changed files with 13 additions and 8 deletions
|
@ -1126,7 +1126,6 @@ void DbSlice::ExpireAllIfNeeded() {
|
|||
}
|
||||
|
||||
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
||||
block_counter_.Wait();
|
||||
return change_cb_.emplace_back(NextVersion(), std::move(cb)).first;
|
||||
}
|
||||
|
||||
|
@ -1139,15 +1138,18 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
|
|||
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
|
||||
<< ", upper_bound=" << upper_bound;
|
||||
|
||||
for (const auto& ccb : change_cb_) {
|
||||
uint64_t cb_version = ccb.first;
|
||||
const size_t limit = change_cb_.size();
|
||||
auto ccb = change_cb_.begin();
|
||||
for (size_t i = 0; i < limit; ++i) {
|
||||
uint64_t cb_version = ccb->first;
|
||||
DCHECK_LE(cb_version, upper_bound);
|
||||
if (cb_version == upper_bound) {
|
||||
return;
|
||||
}
|
||||
if (bucket_version < cb_version) {
|
||||
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
|
||||
ccb->second(db_ind, ChangeReq{it.GetInnerIt()});
|
||||
}
|
||||
++ccb;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1477,9 +1479,12 @@ void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const Change
|
|||
FetchedItemsRestorer fetched_restorer(&fetched_items_);
|
||||
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
|
||||
|
||||
for (const auto& ccb : change_cb_) {
|
||||
CHECK(ccb.second);
|
||||
ccb.second(id, cr);
|
||||
const size_t limit = change_cb_.size();
|
||||
auto ccb = change_cb_.begin();
|
||||
for (size_t i = 0; i < limit; ++i) {
|
||||
CHECK(ccb->second);
|
||||
ccb->second(id, cr);
|
||||
++ccb;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -571,7 +571,7 @@ class DbSlice {
|
|||
mutable absl::flat_hash_set<uint64_t> uniq_fps_;
|
||||
|
||||
// ordered from the smallest to largest version.
|
||||
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
|
||||
std::list<std::pair<uint64_t, ChangeCallback>> change_cb_;
|
||||
|
||||
// Used in temporary computations in Find item and CbFinish
|
||||
mutable absl::flat_hash_set<CompactObjectView> fetched_items_;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue