fix(server): update post updater iterator in tiering (#2497)

* fix(server): update post updater iterator in tiering

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-01-30 13:46:00 +02:00 committed by GitHub
parent b2bdb0f683
commit 503891b1fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 29 additions and 18 deletions

View file

@ -328,7 +328,6 @@ void DbSlice::AutoUpdater::Run() {
if (fields_.action == DestructorAction::kDoNothing) {
return;
}
// TBD add logic to update iterator if needed as we can now preempt in cb
// Check that AutoUpdater does not run after a key was removed.
// If this CHECK() failed for you, it probably means that you deleted a key while having an auto
@ -336,17 +335,23 @@ void DbSlice::AutoUpdater::Run() {
DCHECK(IsValid(fields_.db_slice->db_arr_[fields_.db_ind]->prime.Find(fields_.key)))
<< "Key was removed before PostUpdate() - this is a bug!";
// Make sure that the DB has not changed in size since this object was created.
// Adding or removing elements from the DB may invalidate iterators.
CHECK_EQ(fields_.db_size, fields_.db_slice->DbSize(fields_.db_ind))
<< "Attempting to run post-update after DB was modified";
CHECK_EQ(fields_.deletion_count, fields_.db_slice->deletion_count_)
<< "Attempting to run post-update after a deletion was issued";
DCHECK(fields_.action == DestructorAction::kRun);
CHECK_NE(fields_.db_slice, nullptr);
if (shard_set->IsTieringEnabled()) {
// When triering is enabled we can preempt on write to disk, therefore it can be invalidated
// until we run the post updated.
fields_.it = fields_.db_slice->db_arr_[fields_.db_ind]->Launder(fields_.it, fields_.key);
} else {
// Make sure that the DB has not changed in size since this object was created.
// Adding or removing elements from the DB may invalidate iterators.
CHECK_EQ(fields_.db_size, fields_.db_slice->DbSize(fields_.db_ind))
<< "Attempting to run post-update after DB was modified";
CHECK_EQ(fields_.deletion_count, fields_.db_slice->deletion_count_)
<< "Attempting to run post-update after a deletion was issued";
}
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size);
Cancel(); // Reset to not run again
}

View file

@ -116,4 +116,11 @@ void DbTable::Clear() {
stats = DbTableStats{};
}
PrimeIterator DbTable::Launder(PrimeIterator it, std::string_view key) {
if (!it.IsOccupied() || it->first != key) {
it = prime.Find(key);
}
return it;
}
} // namespace dfly

View file

@ -144,6 +144,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
~DbTable();
void Clear();
PrimeIterator Launder(PrimeIterator it, std::string_view key);
};
// We use reference counting semantics of DbTable when doing snapshotting.

View file

@ -409,16 +409,14 @@ PrimeIterator TieredStorage::Load(DbIndex db_index, PrimeIterator it, string_vie
auto ec = Read(offset, size, res.data());
CHECK(!ec) << "TBD";
// Read will preempt, check if iterator still points to our entry
PrimeTable* pt = db_slice_.GetTables(db_index).first;
if (!it.IsOccupied() || it->first != key) {
it = pt->Find(key);
if (it.is_done()) {
// Entry was remove from db while reading from disk. (background expire task)
return it;
}
entry = &it->second;
// Read will preempt, update iterator if needed.
DbTable* table = db_slice_.GetDBTable(db_index);
it = table->Launder(it, key);
if (it.is_done()) {
// Entry was remove from db while reading from disk. (background expire task)
return it;
}
entry = &it->second;
if (!entry->IsExternal()) {
// Because 2 reads can happen at the same time, then if the other read