mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(server): Prevent preemption inside SerializeBucket (#1111)
* fix(server): Prevent preemption inside SerializeBucket * Modifications after speaking to Adi
This commit is contained in:
parent
246f6093db
commit
6632261a2d
3 changed files with 19 additions and 20 deletions
|
@ -857,7 +857,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, PrimeIterator it,
|
|||
uint64_t upper_bound) {
|
||||
FiberAtomicGuard fg;
|
||||
uint64_t bucket_version = it.GetVersion();
|
||||
// change_cb_ is ordered by vesion.
|
||||
// change_cb_ is ordered by version.
|
||||
for (const auto& ccb : change_cb_) {
|
||||
uint64_t cb_vesrion = ccb.first;
|
||||
DCHECK_LE(cb_vesrion, upper_bound);
|
||||
|
|
|
@ -26,7 +26,6 @@ using namespace util;
|
|||
using namespace chrono_literals;
|
||||
|
||||
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
|
||||
|
||||
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
|
||||
db_array_ = slice->databases();
|
||||
}
|
||||
|
@ -53,12 +52,12 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
|
|||
|
||||
snapshot_fb_ = MakeFiber([this, stream_journal, cll] {
|
||||
IterateBucketsFb(cll);
|
||||
db_slice_->UnregisterOnChange(snapshot_version_);
|
||||
if (cll->IsCancelled()) {
|
||||
Cancel();
|
||||
} else if (!stream_journal) {
|
||||
CloseRecordChannel();
|
||||
}
|
||||
db_slice_->UnregisterOnChange(snapshot_version_);
|
||||
});
|
||||
}
|
||||
|
||||
|
@ -77,7 +76,6 @@ void SliceSnapshot::Stop() {
|
|||
void SliceSnapshot::Cancel() {
|
||||
VLOG(1) << "SliceSnapshot::Cancel";
|
||||
|
||||
CloseRecordChannel();
|
||||
// Cancel() might be called multiple times from different fibers of the same thread, but we
|
||||
// should unregister the callback only once.
|
||||
uint32_t cb_id = journal_cb_id_;
|
||||
|
@ -85,6 +83,8 @@ void SliceSnapshot::Cancel() {
|
|||
journal_cb_id_ = 0;
|
||||
db_slice_->shard_owner()->journal()->UnregisterOnChange(cb_id);
|
||||
}
|
||||
|
||||
CloseRecordChannel();
|
||||
}
|
||||
|
||||
void SliceSnapshot::Join() {
|
||||
|
@ -121,10 +121,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
|||
|
||||
uint64_t last_yield = 0;
|
||||
PrimeTable* pt = &db_array_[db_indx]->prime;
|
||||
{
|
||||
lock_guard lk(mu_);
|
||||
current_db_ = db_indx;
|
||||
}
|
||||
current_db_ = db_indx;
|
||||
|
||||
VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
|
||||
do {
|
||||
|
@ -152,10 +149,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
|||
PushSerializedToChannel(true);
|
||||
} // for (dbindex)
|
||||
|
||||
// Wait for SerializePhysicalBucket to finish.
|
||||
mu_.lock();
|
||||
mu_.unlock();
|
||||
|
||||
CHECK(!serialize_bucket_running_);
|
||||
CHECK(!serializer_->SendFullSyncCut());
|
||||
PushSerializedToChannel(true);
|
||||
|
||||
|
@ -182,18 +176,25 @@ bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
|
|||
}
|
||||
|
||||
unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_iterator it) {
|
||||
// Must be atomic because after after we call it.snapshot_version_ we're starting
|
||||
// to send incremental updates instead of serializing the whole bucket: We must not
|
||||
// send the update until the initial SerializeBucket is called.
|
||||
// Relying on the atomicity of SerializeBucket is Ok here because only one thread may handle this
|
||||
// bucket.
|
||||
FiberAtomicGuard fg;
|
||||
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
||||
|
||||
// traverse physical bucket and write it into string file.
|
||||
serialize_bucket_running_ = true;
|
||||
it.SetVersion(snapshot_version_);
|
||||
unsigned result = 0;
|
||||
|
||||
lock_guard lk(mu_);
|
||||
while (!it.is_done()) {
|
||||
++result;
|
||||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
|
||||
++it;
|
||||
}
|
||||
serialize_bucket_running_ = false;
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -235,6 +236,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
|
|||
}
|
||||
|
||||
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
|
||||
FiberAtomicGuard fg;
|
||||
PrimeTable* table = db_slice_->GetTables(db_index).first;
|
||||
|
||||
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
||||
|
@ -271,11 +273,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_awai
|
|||
}
|
||||
|
||||
void SliceSnapshot::CloseRecordChannel() {
|
||||
// stupid barrier to make sure that SerializePhysicalBucket finished.
|
||||
// Can not think of anything more elegant.
|
||||
mu_.lock();
|
||||
mu_.unlock();
|
||||
|
||||
CHECK(!serialize_bucket_running_);
|
||||
// Make sure we close the channel only once with a CAS check.
|
||||
bool expected = false;
|
||||
if (closed_chan_.compare_exchange_strong(expected, true)) {
|
||||
|
|
|
@ -70,7 +70,7 @@ class SliceSnapshot {
|
|||
void Join();
|
||||
|
||||
// Force stop. Needs to be called together with cancelling the context.
|
||||
// Snapshot can't always react to cancellation in streaming mode becuase the
|
||||
// Snapshot can't always react to cancellation in streaming mode because the
|
||||
// iteration fiber might have finished running by then.
|
||||
void Cancel();
|
||||
|
||||
|
@ -128,7 +128,8 @@ class SliceSnapshot {
|
|||
|
||||
std::unique_ptr<RdbSerializer> serializer_;
|
||||
|
||||
Mutex mu_;
|
||||
// Used for sanity checks.
|
||||
bool serialize_bucket_running_ = false;
|
||||
Fiber snapshot_fb_; // IterateEntriesFb
|
||||
|
||||
CompressionMode compression_mode_;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue