fix(streamer): Do not yield from the Traverse callback. (#2638)

* fix(streamer): Do not yield from the Traverse callback.

Yielding inside the callback can move entries within the bucket, which
is unsupported.

* fix
This commit is contained in:
Shahar Mike 2024-02-21 20:26:16 +02:00 committed by GitHub
parent 9e66ec5833
commit 9baf7c2645
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 26 additions and 12 deletions

View file

@ -75,8 +75,15 @@ void RestoreStreamer::Start(io::Sink* dest) {
if (fiber_cancellation_.IsCancelled())
return;
cursor =
pt->Traverse(cursor, [this](PrimeTable::bucket_iterator it) { WriteBucket(it, true); });
bool written = false;
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
if (WriteBucket(it)) {
written = true;
}
});
if (written) {
NotifyWritten(true);
}
++last_yield;
if (last_yield >= 100) {
@ -129,18 +136,21 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.contains(slot_id);
}
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it, bool allow_await) {
bool is_data_present = false;
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
// Can't switch fibers because that could invalidate iterator or cause bucket splits which may
// move keys between buckets.
FiberAtomicGuard fg;
bool written = false;
if (it.GetVersion() < snapshot_version_) {
it.SetVersion(snapshot_version_);
FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator
string key_buffer; // we can reuse it
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
is_data_present = true;
written = true;
uint64_t expire = 0;
if (pv.HasExpire()) {
@ -153,8 +163,7 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it, bool allow_awa
}
}
if (is_data_present)
NotifyWritten(allow_await);
return written;
}
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -164,12 +173,16 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
PrimeTable* table = db_slice_->GetTables(0).first;
if (const PrimeTable::bucket_iterator* bit = req.update()) {
WriteBucket(*bit, false);
if (WriteBucket(*bit)) {
NotifyWritten(false);
}
} else {
string_view key = get<string_view>(req.change);
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
WriteBucket(it, false);
if (WriteBucket(it)) {
NotifyWritten(false);
}
});
}
}