From f27506e678a4fa55be4761af3689d1b894060fae Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 6 May 2024 22:28:45 +0300 Subject: [PATCH] feat(tiering): simple offload loop (#2987) Simple offloading for tiering Signed-off-by: Vladislav Oleshko --- src/core/compact_object.h | 3 ++ src/core/dash_internal.h | 8 ++++ src/server/common.cc | 3 +- src/server/common.h | 1 + src/server/engine_shard_set.cc | 8 +++- src/server/server_family.cc | 4 ++ src/server/table.cc | 2 +- src/server/table.h | 2 +- src/server/test_utils.cc | 3 +- src/server/tiered_storage.cc | 75 +++++++++++++++++++++++++++---- src/server/tiered_storage.h | 11 ++++- src/server/tiered_storage_test.cc | 43 ++++++++++++++++-- 12 files changed, 144 insertions(+), 19 deletions(-) diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 2d010a122..6b2a830d6 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -120,6 +120,9 @@ class CompactObj { // while ASCII2_ENC_BIT rounds it up. See DecodedLen implementation for more info. ASCII1_ENC_BIT = 8, ASCII2_ENC_BIT = 0x10, + + // IO_PENDING is set when the tiered storage has issued an i/o request to save the value. It is + // cleared when the io request finishes or is cancelled. IO_PENDING = 0x20, STICKY = 0x40, diff --git a/src/core/dash_internal.h b/src/core/dash_internal.h index c89994d42..c335fedfc 100644 --- a/src/core/dash_internal.h +++ b/src/core/dash_internal.h @@ -701,6 +701,14 @@ class DashCursor { return val_ != 0; } + bool operator==(const DashCursor& other) const { + return val_ == other.val_; + } + + bool operator!=(const DashCursor& other) const { + return !(val_ == other.val_); + } + private: uint64_t val_; }; diff --git a/src/server/common.cc b/src/server/common.cc index 761b1de20..44c72c296 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -258,11 +258,12 @@ bool ParseDouble(string_view src, double* value) { #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { - static_assert(sizeof(TieredStats) == 80); + static_assert(sizeof(TieredStats) == 88); ADD(total_stashes); ADD(total_fetches); ADD(total_cancels); + ADD(total_deletes); ADD(allocated_bytes); ADD(capacity_bytes); diff --git a/src/server/common.h b/src/server/common.h index 25a8b1efd..563ed7119 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -63,6 +63,7 @@ struct TieredStats { size_t total_stashes = 0; size_t total_fetches = 0; size_t total_cancels = 0; + size_t total_deletes = 0; size_t allocated_bytes = 0; size_t capacity_bytes = 0; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 20af4b629..a246a3603 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -51,7 +51,7 @@ ABSL_FLAG(uint32_t, hz, 100, ABSL_FLAG(bool, cache_mode, false, "If true, the backend behaves like a cache, " "by evicting entries when getting close to maxmemory limit"); -// memory defragmented related flags + ABSL_FLAG(float, mem_defrag_threshold, 0.7, "Minimum percentage of used memory relative to maxmemory cap before running " "defragmentation"); @@ -582,6 +582,8 @@ void EngineShard::Heartbeat() { } ssize_t eviction_redline = (max_memory_limit * kRedLimitFactor) / shard_set->size(); + size_t tiering_redline = + (max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size(); DbContext db_cntx; db_cntx.time_now_ms = GetCurrentTimeMs(); @@ -603,6 +605,10 @@ void EngineShard::Heartbeat() { if (db_slice_.memory_budget() < eviction_redline) { db_slice_.FreeMemWithEvictionStep(i, eviction_redline - db_slice_.memory_budget()); } + + if (tiered_storage_ && UsedMemory() > tiering_redline) { + tiered_storage_->RunOffloading(i); + } } // Journal entries for expired entries are not writen to socket in the loop above. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d29aa35fb..81309f12d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2083,9 +2083,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("TIERED", true)) { + append("tiered_entries", total.tiered_entries); + append("tiered_entries_bytes", total.tiered_used_bytes); + append("tiered_total_stashes", m.tiered_stats.total_stashes); append("tiered_total_fetches", m.tiered_stats.total_fetches); append("tiered_total_cancels", m.tiered_stats.total_cancels); + append("tiered_total_deletes", m.tiered_stats.total_deletes); append("tiered_allocated_bytes", m.tiered_stats.allocated_bytes); append("tiered_capacity_bytes", m.tiered_stats.capacity_bytes); diff --git a/src/server/table.cc b/src/server/table.cc index 28a6ffb19..1cd65e212 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -39,7 +39,7 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) { ADD(listpack_blob_cnt); ADD(listpack_bytes); ADD(tiered_entries); - ADD(tiered_size); + ADD(tiered_used_bytes); for (size_t i = 0; i < o.memory_usage_by_type.size(); ++i) { memory_usage_by_type[i] += o.memory_usage_by_type[i]; diff --git a/src/server/table.h b/src/server/table.h index ad9494e81..97f2e612e 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -68,7 +68,7 @@ struct DbTableStats { size_t listpack_blob_cnt = 0; size_t listpack_bytes = 0; size_t tiered_entries = 0; - size_t tiered_size = 0; + size_t tiered_used_bytes = 0; std::array memory_usage_by_type = {}; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 682e4bcf7..81a1e8bbd 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -33,7 +33,7 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to namespace dfly { std::ostream& operator<<(std::ostream& os, const DbStats& stats) { - os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_size + os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_used_bytes << ", tiered_entries: " << stats.tiered_entries << "\n"; return os; @@ -658,7 +658,6 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function& c break; } ThisFiber::SleepFor(5ms); - // absl::SleepFor(absl::Milliseconds(10)); ?? } EXPECT_LE(absl::Now(), deadline) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 6a6e7797d..941ada55f 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -22,10 +22,14 @@ #include "server/tiering/common.h" #include "server/tiering/op_manager.h" #include "server/tiering/small_bins.h" +#include "server/tx_base.h" ABSL_FLAG(bool, tiered_storage_cache_fetched, true, "WIP: Load results of offloaded reads to memory"); +ABSL_FLAG(size_t, tiered_storage_write_depth, 50, + "Maximum number of concurrent stash requests issued by background offload"); + namespace dfly { using namespace std; @@ -52,11 +56,28 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched); } - // Find entry by key in db_slice and store external segment in place of original value + // Called before overriding value with segment + void RecordAdded(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) { + stats->AddTypeMemoryUsage(pv.ObjType(), -pv.MallocUsed()); + stats->tiered_entries++; + stats->tiered_used_bytes += segment.length; + } + + // Called after setting new value in place of previous segment + void RecordDeleted(DbTableStats* stats, const PrimeValue& pv, tiering::DiskSegment segment) { + stats->AddTypeMemoryUsage(pv.ObjType(), pv.MallocUsed()); + stats->tiered_entries--; + stats->tiered_used_bytes -= segment.length; + } + + // Find entry by key in db_slice and store external segment in place of original value. + // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { if (auto pv = Find(key); pv) { + RecordAdded(db_slice_->MutableStats(0), *pv, segment); + pv->SetIoPending(false); - pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats + pv->SetExternal(segment.offset, segment.length); stats_.total_stashes++; } @@ -82,14 +103,22 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { ClearIoPending(key); } + // Set value to be an in-memory type again, either empty or with a value. Update memory stats + void SetInMemory(PrimeValue* pv, string_view value, tiering::DiskSegment segment) { + pv->Reset(); + if (!value.empty()) + pv->SetString(value); + + RecordDeleted(db_slice_->MutableStats(0), *pv, segment); + + (value.empty() ? stats_.total_deletes : stats_.total_fetches)++; + } + // Find entry by key and store it's up-to-date value in place of external segment. // Returns false if the value is outdated, true otherwise bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) { if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) { - pv->Reset(); // TODO: account for memory - pv->SetString(value); - - stats_.total_fetches++; + SetInMemory(pv, value, segment); return true; } return false; @@ -136,7 +165,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { bool cache_fetched_ = false; struct { - size_t total_stashes = 0, total_fetches = 0, total_cancels = 0; + size_t total_stashes = 0, total_fetches = 0, total_cancels = 0, total_deletes = 0; } stats_; TieredStorage* ts_; @@ -219,7 +248,7 @@ void TieredStorage::Delete(PrimeValue* value) { } else if (auto bin = bins_->Delete(segment); bin) { op_manager_->Delete(*bin); } - value->Reset(); + op_manager_->SetInMemory(value, "", segment); } void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) { @@ -232,7 +261,7 @@ void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value->SetIoPending(false); } -bool TieredStorage::ShouldStash(const PrimeValue& pv) { +bool TieredStorage::ShouldStash(const PrimeValue& pv) const { return !pv.IsExternal() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize; } @@ -264,4 +293,32 @@ TieredStats TieredStorage::GetStats() const { return stats; } +void TieredStorage::RunOffloading(DbIndex dbid) { + PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime; + int stash_limit = + absl::GetFlag(FLAGS_tiered_storage_write_depth) - op_manager_->GetStats().pending_stash_cnt; + if (stash_limit <= 0) + return; + + std::string tmp; + auto cb = [this, dbid, &tmp, &stash_limit](PrimeIterator it) { + if (it->second.HasIoPending() || it->second.IsExternal()) + return; + + if (ShouldStash(it->second)) { + Stash(dbid, it->first.GetSlice(&tmp), &it->second); + stash_limit--; + } + }; + + PrimeTable::Cursor start_cursor{}; + + // Loop while we haven't traversed all entries or reached our stash io device limit. + // Keep number of iterations below resonable limit to keep datastore always responsive + size_t iterations = 0; + do { + offloading_cursor_ = table.TraverseBySegmentOrder(offloading_cursor_, cb); + } while (offloading_cursor_ != start_cursor && stash_limit > 0 && iterations++ < 100); +} + } // namespace dfly diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index c3304b365..4543f82c4 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -7,6 +7,7 @@ #include #include "server/tiering/common.h" +#include "server/tx_base.h" #include "util/fibers/future.h" #ifdef __linux__ @@ -60,11 +61,16 @@ class TieredStorage { void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value); // Returns if a value should be stashed - bool ShouldStash(const PrimeValue& pv); + bool ShouldStash(const PrimeValue& pv) const; TieredStats GetStats() const; + // Run offloading loop until i/o device is loaded or all entries were traversed + void RunOffloading(DbIndex dbid); + private: + PrimeTable::Cursor offloading_cursor_{}; // where RunOffloading left off + std::unique_ptr op_manager_; std::unique_ptr bins_; }; @@ -127,6 +133,9 @@ class TieredStorage { TieredStats GetStats() const { return {}; } + + void RunOffloading() { + } }; } // namespace dfly diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 7a6b71c91..766c8712e 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -9,6 +9,7 @@ #include #include "absl/flags/internal/flag.h" +#include "absl/flags/reflection.h" #include "base/flags.h" #include "base/logging.h" #include "facade/facade_test.h" @@ -19,13 +20,12 @@ using namespace std; using namespace testing; -using absl::SetFlag; -using absl::StrCat; ABSL_DECLARE_FLAG(bool, force_epoll); ABSL_DECLARE_FLAG(string, tiered_prefix); ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched); ABSL_DECLARE_FLAG(bool, backing_file_direct); +ABSL_DECLARE_FLAG(float, tiered_offload_threshold); namespace dfly { @@ -59,13 +59,18 @@ TEST_F(TieredStorageTest, SimpleGetSet) { Run({"SET", absl::StrCat("k", i), string(i, 'A')}); } - // Make sure all entries were stashed, except the one few not filling a small page + // Make sure all entries were stashed, except the one not filling a small page size_t stashes = 0; ExpectConditionWithinTimeout([this, &stashes] { stashes = GetMetrics().tiered_stats.total_stashes; return stashes >= kMax - 256 - 1; }); + // All entries were accounted for except that one (see comment above) + auto metrics = GetMetrics(); + EXPECT_EQ(metrics.db_stats[0].tiered_entries, kMax - kMin - 1); + EXPECT_EQ(metrics.db_stats[0].tiered_used_bytes, (kMax - 1 + kMin) * (kMax - kMin) / 2 - 2047); + // Perform GETSETs for (size_t i = kMin; i < kMax; i++) { auto resp = Run({"GETSET", absl::StrCat("k", i), string(i, 'B')}); @@ -105,4 +110,36 @@ TEST_F(TieredStorageTest, MultiDb) { } } +TEST_F(TieredStorageTest, BackgroundOffloading) { + absl::FlagSaver saver; + absl::SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values + + const int kNum = 500; + + max_memory_limit = kNum * 4096; + pp_->at(0)->AwaitBrief([] { EngineShard::tlocal()->TEST_EnableHeartbeat(); }); + + // Stash all values + for (size_t i = 0; i < kNum; i++) { + Run({"SET", absl::StrCat("k", i), string(3000, 'A')}); + } + + ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; }); + ASSERT_EQ(GetMetrics().tiered_stats.total_stashes, kNum); + ASSERT_EQ(GetMetrics().db_stats[0].tiered_entries, kNum); + + // Trigger re-fetch + for (size_t i = 0; i < kNum; i++) { + Run({"GET", absl::StrCat("k", i)}); + } + + // Wait for offload to do it all again + ExpectConditionWithinTimeout([&] { return GetMetrics().db_stats[0].tiered_entries == kNum; }); + + auto metrics = GetMetrics(); + EXPECT_EQ(metrics.tiered_stats.total_stashes, 2 * kNum); + EXPECT_EQ(metrics.tiered_stats.total_fetches, kNum); + EXPECT_EQ(metrics.tiered_stats.allocated_bytes, kNum * 4096); +} + } // namespace dfly