diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 0f3084dc7..20ad8b224 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -76,6 +76,7 @@ void RestoreStreamer::Start(io::Sink* dest) { return; cursor = pt->Traverse(cursor, absl::bind_front(&RestoreStreamer::WriteBucket, this)); + ++last_yield; if (last_yield >= 100) { ThisFiber::Yield(); @@ -111,22 +112,28 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); it.SetVersion(snapshot_version_); - while (!it.is_done()) { - const auto& pv = it->second; + { + FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator - string key_buffer; - string_view key = it->first.GetSlice(&key_buffer); + while (!it.is_done()) { + const auto& pv = it->second; - uint64_t expire = 0; - if (pv.HasExpire()) { - auto eit = db_slice_->databases()[0]->expire.Find(it->first); - expire = db_slice_->ExpireTime(eit); + string key_buffer; + string_view key = it->first.GetSlice(&key_buffer); + + uint64_t expire = 0; + if (pv.HasExpire()) { + auto eit = db_slice_->databases()[0]->expire.Find(it->first); + expire = db_slice_->ExpireTime(eit); + } + + WriteEntry(key, pv, expire); + + ++it; } - - WriteEntry(key, pv, expire); - - ++it; } + + NotifyWritten(true); } void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {