From 137bd313ef761ccf9e3e66aad6f3fe7ba19968cd Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 30 May 2024 16:50:12 +0300 Subject: [PATCH] fix(server): Sync FLUSH with tiering (#3098) * fix(server): Sync FLUSH with tiering Signed-off-by: Vladislav Oleshko --- src/server/db_slice.cc | 50 +++++++++++++++++++++++-------- src/server/db_slice.h | 5 ++++ src/server/tiered_storage_test.cc | 29 ++++++++++++++++++ 3 files changed, 71 insertions(+), 13 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index af06032e4..efb18e569 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -704,7 +704,15 @@ void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) { } void DbSlice::FlushDbIndexes(const std::vector& indexes) { - // TODO: to add preeemptiveness by yielding inside clear. + // Async cleanup can only be performed if no tiered entries exist + bool async_cleanup = true; + for (DbIndex index : indexes) { + async_cleanup &= db_arr_[index]->stats.tiered_entries == 0; + } + + if (!async_cleanup) + ClearEntriesOnFlush(indexes, db_arr_, false); + DbTableArray flush_db_arr(db_arr_.size()); for (DbIndex index : indexes) { auto& db = db_arr_[index]; @@ -715,19 +723,11 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { CreateDb(index); std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks); } - CHECK(fetched_items_.empty()); - auto cb = [this, flush_db_arr = std::move(flush_db_arr)]() mutable { - for (auto& db_ptr : flush_db_arr) { - if (db_ptr && db_ptr->stats.tiered_entries > 0) { - for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) { - if (it->second.IsExternal()) - PerformDeletion(Iterator::FromPrime(it), db_ptr.get()); - } - DCHECK_EQ(0u, db_ptr->stats.tiered_entries); - db_ptr.reset(); - } - } + CHECK(fetched_items_.empty()); + auto cb = [this, async_cleanup, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable { + if (async_cleanup) + ClearEntriesOnFlush(indexes, flush_db_arr, true); flush_db_arr.clear(); ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap | ServerState::kGlibcmalloc); @@ -1382,6 +1382,30 @@ void DbSlice::InvalidateSlotWatches(const cluster::SlotSet& slot_ids) { } } +void DbSlice::ClearEntriesOnFlush(absl::Span indices, const DbTableArray& db_arr, + bool async) { + for (auto index : indices) { + const auto& db_ptr = db_arr[index]; + if (!db_ptr || db_ptr->stats.tiered_entries == 0) + continue; + + // Delete all tiered entries + PrimeTable::Cursor cursor; + do { + cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) { + if (it->second.IsExternal()) + PerformDeletion(it, db_ptr.get()); + }); + } while (cursor && db_ptr->stats.tiered_entries > 0); + + // Wait for delete operations to finish in sync + while (!async && db_ptr->stats.tiered_entries > 0) { + LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush"; + ThisFiber::SleepFor(1ms); + } + } +} + void DbSlice::SetDocDeletionCallback(DocDeletionCallback ddcb) { doc_del_cb_ = std::move(ddcb); } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 848e4a2dd..5da9ce571 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -486,6 +486,11 @@ class DbSlice { // Invalidate all watched keys for given slots. Used on FlushSlots. void InvalidateSlotWatches(const cluster::SlotSet& slot_ids); + // Properly clear db_arr before deleting it. If async is set, it's called from a detached fiber + // after swapping the db. + void ClearEntriesOnFlush(absl::Span indices, const DbTableArray& db_arr, + bool async); + void PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table); // Send invalidation message to the clients that are tracking the change to a key. diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index c93216599..58c570016 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -196,4 +196,33 @@ TEST_F(TieredStorageTest, BackgroundOffloading) { EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096); } +TEST_F(TieredStorageTest, FlushAll) { + absl::FlagSaver saver; + absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values + + const int kNum = 500; + for (size_t i = 0; i < kNum; i++) { + Run({"SET", absl::StrCat("k", i), string(3000, 'A')}); + } + ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; }); + + // Start reading random entries + atomic_bool done = false; + auto reader = pp_->at(0)->LaunchFiber([&] { + while (!done) { + Run("reader", {"GET", absl::StrCat("k", rand() % kNum)}); + } + }); + + util::ThisFiber::SleepFor(50ms); + Run({"FLUSHALL"}); + + done = true; + reader.Join(); + + auto metrics = GetMetrics(); + EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u); + EXPECT_GT(metrics.tiered_stats.total_fetches, 2u); +} + } // namespace dfly