From 1a8c12225b6a6798f8a7e3f89b88224044f35f69 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 28 Jul 2024 17:30:41 +0300 Subject: [PATCH] chore(tiering): Move cool entry warmup to DbSlice (#3397) Signed-off-by: Vladislav Oleshko --- src/core/compact_object.h | 20 ++----- src/server/db_slice.cc | 35 ++++++----- src/server/snapshot.cc | 3 + src/server/tiered_storage.cc | 113 ++++++++++------------------------- src/server/tiered_storage.h | 11 +++- 5 files changed, 70 insertions(+), 112 deletions(-) diff --git a/src/core/compact_object.h b/src/core/compact_object.h index a9bbd3d14..1dcb0aa72 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -377,22 +377,12 @@ class CompactObj { static void InitThreadLocal(MemoryResource* mr); static MemoryResource* memory_resource(); // thread-local. - template - inline static constexpr bool IsConstructibleFromMR = - std::is_constructible_v; - - template static std::enable_if_t, T*> AllocateMR() { + template static T* AllocateMR(Args&&... args) { void* ptr = memory_resource()->allocate(sizeof(T), alignof(T)); - return new (ptr) T{memory_resource()}; - } - - template - inline static constexpr bool IsConstructibleFromArgs = std::is_constructible_v; - - template - static std::enable_if_t, T*> AllocateMR(Args&&... args) { - void* ptr = memory_resource()->allocate(sizeof(T), alignof(T)); - return new (ptr) T{std::forward(args)...}; + if constexpr (std::is_constructible_v && sizeof...(args) == 0) + return new (ptr) T{memory_resource()}; + else + return new (ptr) T{std::forward(args)...}; } template static void DeleteMR(void* ptr) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index b2c36d93f..a447fe880 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -494,20 +494,6 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: fetched_items_.insert(res.it->first.AsRef()); } - // If the value has a pending stash, cancel it before any modification are applied. - // Rationale: we either look it up for reads - and then it's hot, or alternatively, - // we follow up with modifications during mutation operations, and in that case storing on disk - // does not make much sense. - if (res.it->second.HasStashPending()) { - owner_->tiered_storage()->CancelStash(cntx.db_index, key, &res.it->second); - } - - // Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness - // attribute of the entry in case of value overrides. - res.it->first.SetTouched(true); - - db.top_keys.Touch(key); - std::move(update_stats_on_miss).Cancel(); switch (stats_mode) { case UpdateStatsMode::kMutableStats: @@ -528,6 +514,27 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: } break; } + + auto& pv = res.it->second; + + // Cancel any pending stashes of looked up values + // Rationale: we either look it up for reads - and then it's hot, or alternatively, + // we follow up with modifications, so the pending stash becomes outdated. + if (pv.HasStashPending()) { + owner_->tiered_storage()->CancelStash(cntx.db_index, key, &pv); + } + + // Fetch back cool items + if (pv.IsExternal() && pv.IsCool()) { + pv = owner_->tiered_storage()->Warmup(cntx.db_index, pv.GetCool()); + } + + // Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness + // attribute of the entry in case of value overrides. + res.it->first.SetTouched(true); + + db.top_keys.Touch(key); + return res; } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 62782e79c..d6bedf56f 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -301,6 +301,9 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv, optional expire, RdbSerializer* serializer) { + if (pv.IsExternal() && pv.IsCool()) + return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer); + time_t expire_time = expire.value_or(0); if (!expire && pv.HasExpire()) { auto eit = db_array_[db_indx]->expire.Find(pk); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index effad5ea1..4c87ca37d 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -75,10 +75,7 @@ string DecodeString(bool is_raw, string_view str, PrimeValue decoder) { } tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) { - tiering::DiskSegment res; - res.length = item.serialized_size; - res.offset = size_t(item.record->page_index) * 4096 + item.page_offset; - return res; + return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size}; } } // anonymous namespace @@ -152,8 +149,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { // Find entry by key in db_slice and store external segment in place of original value. // Update memory stats void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) { - // TODO: to rename it to CoolEntry or something. - if (auto* pv = Find(key); pv) { auto* stats = GetDbTableStats(key.first); @@ -346,53 +341,25 @@ void TieredStorage::SetMemoryLowLimit(size_t mem_limit) { util::fb2::Future TieredStorage::Read(DbIndex dbid, string_view key, const PrimeValue& value) { - DCHECK(value.IsExternal()); - util::fb2::Future future; - if (value.IsCool()) { - // If we read a cool record - bring it back to primary table. - PrimeValue hot = Warmup(dbid, value.GetCool()); - string tmp; - - DCHECK_EQ(value.Size(), hot.Size()); - hot.GetString(&tmp); - future.Resolve(tmp); - - // TODO: An awful hack - to fix later. - const_cast(value) = std::move(hot); - } else { - // The raw_val passed to cb might need decoding based on the encoding mask of the "value" - // object. We save the mask in decoder and use it to decode the final string that Read should - // resolve. - PrimeValue decoder; - decoder.ImportExternal(value); - - auto cb = [future, decoder = std::move(decoder)](bool is_raw, const string* raw_val) mutable { - future.Resolve(DecodeString(is_raw, *raw_val, std::move(decoder))); - return false; // was not modified - }; - - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); - } - return future; + util::fb2::Future fut; + Read(dbid, key, value, [fut](const std::string& value) mutable { fut.Resolve(value); }); + return fut; } void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value, std::function readf) { DCHECK(value.IsExternal()); - if (value.IsCool()) { - util::fb2::Future res = Read(dbid, key, value); - readf(res.Get()); - } else { - PrimeValue decoder; - decoder.ImportExternal(value); + DCHECK(!value.IsCool()); - auto cb = [readf = std::move(readf), decoder = std::move(decoder)]( - bool is_raw, const string* raw_val) mutable { - readf(DecodeString(is_raw, *raw_val, std::move(decoder))); - return false; - }; - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); - } + PrimeValue decoder; + decoder.ImportExternal(value); + + auto cb = [readf = std::move(readf), decoder = std::move(decoder)]( + bool is_raw, const string* raw_val) mutable { + readf(DecodeString(is_raw, *raw_val, std::move(decoder))); + return false; + }; + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); } template @@ -402,31 +369,19 @@ util::fb2::Future TieredStorage::Modify(DbIndex dbid, std::string_view key, DCHECK(value.IsExternal()); util::fb2::Future future; - if (value.IsCool()) { - PrimeValue hot = Warmup(dbid, value.GetCool()); - string tmp; + PrimeValue decoder; + decoder.ImportExternal(value); - DCHECK_EQ(value.Size(), hot.Size()); - hot.GetString(&tmp); - future.Resolve(modf(&tmp)); - - // TODO: An awful hack - to fix later. - const_cast(value).Materialize(tmp, false); - } else { - PrimeValue decoder; - decoder.ImportExternal(value); - - auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)]( - bool is_raw, std::string* raw_val) mutable { - if (is_raw) { - decoder.Materialize(*raw_val, true); - decoder.GetString(raw_val); - } - future.Resolve(modf(raw_val)); - return true; - }; - op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); - } + auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)]( + bool is_raw, std::string* raw_val) mutable { + if (is_raw) { + decoder.Materialize(*raw_val, true); + decoder.GetString(raw_val); + } + future.Resolve(modf(raw_val)); + return true; + }; + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); return future; } @@ -477,15 +432,12 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { tiering::DiskSegment segment = value->GetExternalSlice(); if (value->IsCool()) { - // Delete the cool item. - PrimeValue::CoolItem item = value->GetCool(); - PrimeValue hot = DeleteCool(item.record); - DCHECK_EQ(OBJ_STRING, hot.ObjType()); + auto hot = DeleteCool(value->GetCool().record); + DCHECK_EQ(hot.ObjType(), OBJ_STRING); } // In any case we delete the offloaded segment and reset the value. value->Reset(); - stats_.total_deletes++; op_manager_->DeleteOffloaded(dbid, segment); } @@ -616,14 +568,15 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const { void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment, PrimeValue* pv) { - cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); detail::TieredColdRecord* record = CompactObj::AllocateMR(); + cool_queue_.push_front(*record); + cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed()); + record->key_hash = CompactObj::HashCode(str); record->db_index = db_ind; - record->page_index = segment.offset / 4096; + record->page_index = segment.offset / tiering::kPageSize; record->value = std::move(*pv); - DCHECK(record); - cool_queue_.push_front(*record); + pv->SetCool(segment.offset, segment.length, record); DCHECK_EQ(pv->Size(), record->value.Size()); } diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 8a33846fc..3089fdb10 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -78,8 +78,12 @@ class TieredStorage { // Run offloading loop until i/o device is loaded or all entries were traversed void RunOffloading(DbIndex dbid); + // Prune cool entries to reach the set memory goal with freed memory size_t ReclaimMemory(size_t goal); + // Returns the primary value, and deletes the cool item as well as its offloaded storage. + PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item); + size_t CoolMemoryUsage() const { return cool_memory_used_; } @@ -92,9 +96,6 @@ class TieredStorage { void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment, PrimeValue* pv); - // Returns the primary value, and deletes the cool item as well as its offloaded storage. - PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item); - PrimeValue DeleteCool(detail::TieredColdRecord* record); detail::TieredColdRecord* PopCool(); @@ -174,6 +175,10 @@ class TieredStorage { void RunOffloading(DbIndex dbid) { } + + PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item) { + return PrimeValue{}; + } }; } // namespace dfly