mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix: fix RegisterOnChange methods for journal and db_slice (#3171)
* fix: fix RegisterOnChange methods for journal and db_slice. Call db_slice and journal callbacks atomically. Made a hack to avoid deadlock during SAVE
This commit is contained in:
parent
f66ee5f47d
commit
d75c79ce5c
6 changed files with 50 additions and 13 deletions
|
@ -458,9 +458,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
|
||||||
if (!change_cb_.empty()) {
|
if (!change_cb_.empty()) {
|
||||||
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
|
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
|
||||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
||||||
for (const auto& ccb : change_cb_) {
|
CallChangeCallbacks(cntx.db_index, bit);
|
||||||
ccb.second(cntx.db_index, bit);
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
|
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
|
||||||
}
|
}
|
||||||
|
@ -524,9 +522,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
|
||||||
|
|
||||||
// It's a new entry.
|
// It's a new entry.
|
||||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
||||||
for (const auto& ccb : change_cb_) {
|
CallChangeCallbacks(cntx.db_index, key);
|
||||||
ccb.second(cntx.db_index, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
// In case we are loading from rdb file or replicating we want to disable conservative memory
|
// In case we are loading from rdb file or replicating we want to disable conservative memory
|
||||||
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
|
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
|
||||||
|
@ -975,9 +971,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
|
||||||
FiberAtomicGuard fg;
|
FiberAtomicGuard fg;
|
||||||
|
|
||||||
DVLOG(2) << "Running callbacks in dbid " << db_ind;
|
DVLOG(2) << "Running callbacks in dbid " << db_ind;
|
||||||
for (const auto& ccb : change_cb_) {
|
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});
|
||||||
ccb.second(db_ind, ChangeReq{it.GetInnerIt()});
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the value has a pending stash, cancel it before any modification are applied.
|
// If the value has a pending stash, cancel it before any modification are applied.
|
||||||
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
|
// Note: we don't delete offloaded values before updates, because a read-modify operation (like
|
||||||
|
@ -1089,6 +1083,13 @@ void DbSlice::ExpireAllIfNeeded() {
|
||||||
|
|
||||||
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
||||||
uint64_t ver = NextVersion();
|
uint64_t ver = NextVersion();
|
||||||
|
|
||||||
|
// TODO rewrite this logic to be more clear
|
||||||
|
// this mutex lock is needed to check that this method is not called simultaneously with
|
||||||
|
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
|
||||||
|
// It can be unlocked anytime because DbSlice::RegisterOnChange
|
||||||
|
// and journal_slice::RegisterOnChange calls without preemption
|
||||||
|
std::lock_guard lk(cb_mu_);
|
||||||
change_cb_.emplace_back(ver, std::move(cb));
|
change_cb_.emplace_back(ver, std::move(cb));
|
||||||
return ver;
|
return ver;
|
||||||
}
|
}
|
||||||
|
@ -1099,6 +1100,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
|
||||||
// change_cb_ is ordered by version.
|
// change_cb_ is ordered by version.
|
||||||
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
|
DVLOG(2) << "Running callbacks in dbid " << db_ind << " with bucket_version=" << bucket_version
|
||||||
<< ", upper_bound=" << upper_bound;
|
<< ", upper_bound=" << upper_bound;
|
||||||
|
|
||||||
for (const auto& ccb : change_cb_) {
|
for (const auto& ccb : change_cb_) {
|
||||||
uint64_t cb_version = ccb.first;
|
uint64_t cb_version = ccb.first;
|
||||||
DCHECK_LE(cb_version, upper_bound);
|
DCHECK_LE(cb_version, upper_bound);
|
||||||
|
@ -1113,6 +1115,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
|
||||||
|
|
||||||
//! Unregisters the callback.
|
//! Unregisters the callback.
|
||||||
void DbSlice::UnregisterOnChange(uint64_t id) {
|
void DbSlice::UnregisterOnChange(uint64_t id) {
|
||||||
|
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
|
||||||
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
|
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
|
||||||
if (it->first == id) {
|
if (it->first == id) {
|
||||||
change_cb_.erase(it);
|
change_cb_.erase(it);
|
||||||
|
@ -1506,4 +1509,10 @@ void DbSlice::OnCbFinish() {
|
||||||
fetched_items_.clear();
|
fetched_items_.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
|
||||||
|
for (const auto& ccb : change_cb_) {
|
||||||
|
ccb.second(id, cr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -469,6 +469,14 @@ class DbSlice {
|
||||||
void PerformDeletion(Iterator del_it, DbTable* table);
|
void PerformDeletion(Iterator del_it, DbTable* table);
|
||||||
void PerformDeletion(PrimeIterator del_it, DbTable* table);
|
void PerformDeletion(PrimeIterator del_it, DbTable* table);
|
||||||
|
|
||||||
|
void LockChangeCb() const {
|
||||||
|
return cb_mu_.lock_shared();
|
||||||
|
}
|
||||||
|
|
||||||
|
void UnlockChangeCb() const {
|
||||||
|
return cb_mu_.unlock_shared();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
|
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
|
||||||
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
|
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
|
||||||
|
@ -523,6 +531,8 @@ class DbSlice {
|
||||||
return version_++;
|
return version_++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
ShardId shard_id_;
|
ShardId shard_id_;
|
||||||
uint8_t caching_mode_ : 1;
|
uint8_t caching_mode_ : 1;
|
||||||
|
@ -544,6 +554,12 @@ class DbSlice {
|
||||||
// Used in temporary computations in Acquire/Release.
|
// Used in temporary computations in Acquire/Release.
|
||||||
mutable absl::flat_hash_set<uint64_t> uniq_fps_;
|
mutable absl::flat_hash_set<uint64_t> uniq_fps_;
|
||||||
|
|
||||||
|
// To ensure correct data replication, we must serialize the buckets that each running command
|
||||||
|
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
|
||||||
|
// interleaving between bucket and journal registrations, and the command execution with its
|
||||||
|
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
|
||||||
|
// journaling is completed. Register to bucket and journal changes is also does without preemption
|
||||||
|
mutable util::fb2::SharedMutex cb_mu_;
|
||||||
// ordered from the smallest to largest version.
|
// ordered from the smallest to largest version.
|
||||||
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
|
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;
|
||||||
|
|
||||||
|
|
|
@ -254,7 +254,10 @@ void SaveStagesController::SaveDfs() {
|
||||||
|
|
||||||
// Save shard files.
|
// Save shard files.
|
||||||
auto cb = [this](Transaction* t, EngineShard* shard) {
|
auto cb = [this](Transaction* t, EngineShard* shard) {
|
||||||
|
// a hack to avoid deadlock in Transaction::RunCallback(...)
|
||||||
|
shard->db_slice().UnlockChangeCb();
|
||||||
SaveDfsSingle(shard);
|
SaveDfsSingle(shard);
|
||||||
|
shard->db_slice().LockChangeCb();
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
};
|
};
|
||||||
trans_->ScheduleSingleHop(std::move(cb));
|
trans_->ScheduleSingleHop(std::move(cb));
|
||||||
|
@ -294,7 +297,10 @@ void SaveStagesController::SaveRdb() {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
|
auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
|
||||||
|
// a hack to avoid deadlock in Transaction::RunCallback(...)
|
||||||
|
shard->db_slice().UnlockChangeCb();
|
||||||
snapshot->StartInShard(shard);
|
snapshot->StartInShard(shard);
|
||||||
|
shard->db_slice().LockChangeCb();
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
};
|
};
|
||||||
trans_->ScheduleSingleHop(std::move(cb));
|
trans_->ScheduleSingleHop(std::move(cb));
|
||||||
|
|
|
@ -199,13 +199,14 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
|
uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
|
||||||
lock_guard lk(cb_mu_);
|
// mutex lock isn't needed due to iterators are not invalidated
|
||||||
uint32_t id = next_cb_id_++;
|
uint32_t id = next_cb_id_++;
|
||||||
change_cb_arr_.emplace_back(id, std::move(cb));
|
change_cb_arr_.emplace_back(id, std::move(cb));
|
||||||
return id;
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalSlice::UnregisterOnChange(uint32_t id) {
|
void JournalSlice::UnregisterOnChange(uint32_t id) {
|
||||||
|
// we need to wait until callback is finished before remove it
|
||||||
lock_guard lk(cb_mu_);
|
lock_guard lk(cb_mu_);
|
||||||
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
|
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
|
||||||
[id](const auto& e) { return e.first == id; });
|
[id](const auto& e) { return e.first == id; });
|
||||||
|
|
|
@ -47,7 +47,6 @@ class JournalSlice {
|
||||||
void UnregisterOnChange(uint32_t);
|
void UnregisterOnChange(uint32_t);
|
||||||
|
|
||||||
bool HasRegisteredCallbacks() const {
|
bool HasRegisteredCallbacks() const {
|
||||||
std::shared_lock lk(cb_mu_);
|
|
||||||
return !change_cb_arr_.empty();
|
return !change_cb_arr_.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,8 +61,8 @@ class JournalSlice {
|
||||||
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
|
std::optional<base::RingBuffer<JournalItem>> ring_buffer_;
|
||||||
base::IoBuf ring_serialize_buf_;
|
base::IoBuf ring_serialize_buf_;
|
||||||
|
|
||||||
mutable util::fb2::SharedMutex cb_mu_;
|
mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
|
||||||
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
|
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;
|
||||||
|
|
||||||
LSN lsn_ = 1;
|
LSN lsn_ = 1;
|
||||||
|
|
||||||
|
|
|
@ -627,6 +627,7 @@ void Transaction::RunCallback(EngineShard* shard) {
|
||||||
DCHECK_EQ(shard, EngineShard::tlocal());
|
DCHECK_EQ(shard, EngineShard::tlocal());
|
||||||
|
|
||||||
RunnableResult result;
|
RunnableResult result;
|
||||||
|
shard->db_slice().LockChangeCb();
|
||||||
try {
|
try {
|
||||||
result = (*cb_ptr_)(this, shard);
|
result = (*cb_ptr_)(this, shard);
|
||||||
|
|
||||||
|
@ -664,7 +665,10 @@ void Transaction::RunCallback(EngineShard* shard) {
|
||||||
// Log to journal only once the command finished running
|
// Log to journal only once the command finished running
|
||||||
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
|
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
|
||||||
LogAutoJournalOnShard(shard, result);
|
LogAutoJournalOnShard(shard, result);
|
||||||
|
shard->db_slice().UnlockChangeCb();
|
||||||
MaybeInvokeTrackingCb();
|
MaybeInvokeTrackingCb();
|
||||||
|
} else {
|
||||||
|
shard->db_slice().UnlockChangeCb();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1247,9 +1251,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
|
||||||
DCHECK_EQ(unique_shard_cnt_, 1u);
|
DCHECK_EQ(unique_shard_cnt_, 1u);
|
||||||
|
|
||||||
auto* shard = EngineShard::tlocal();
|
auto* shard = EngineShard::tlocal();
|
||||||
|
shard->db_slice().LockChangeCb();
|
||||||
auto result = cb(this, shard);
|
auto result = cb(this, shard);
|
||||||
shard->db_slice().OnCbFinish();
|
shard->db_slice().OnCbFinish();
|
||||||
LogAutoJournalOnShard(shard, result);
|
LogAutoJournalOnShard(shard, result);
|
||||||
|
shard->db_slice().UnlockChangeCb();
|
||||||
MaybeInvokeTrackingCb();
|
MaybeInvokeTrackingCb();
|
||||||
|
|
||||||
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
|
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue