mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore(tiering): Move cool entry warmup to DbSlice (#3397)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
20bda84317
commit
1a8c12225b
5 changed files with 70 additions and 112 deletions
|
@ -377,22 +377,12 @@ class CompactObj {
|
|||
static void InitThreadLocal(MemoryResource* mr);
|
||||
static MemoryResource* memory_resource(); // thread-local.
|
||||
|
||||
template <typename T>
|
||||
inline static constexpr bool IsConstructibleFromMR =
|
||||
std::is_constructible_v<T, decltype(memory_resource())>;
|
||||
|
||||
template <typename T> static std::enable_if_t<IsConstructibleFromMR<T>, T*> AllocateMR() {
|
||||
template <typename T, typename... Args> static T* AllocateMR(Args&&... args) {
|
||||
void* ptr = memory_resource()->allocate(sizeof(T), alignof(T));
|
||||
return new (ptr) T{memory_resource()};
|
||||
}
|
||||
|
||||
template <typename T, typename... Args>
|
||||
inline static constexpr bool IsConstructibleFromArgs = std::is_constructible_v<T, Args...>;
|
||||
|
||||
template <typename T, typename... Args>
|
||||
static std::enable_if_t<IsConstructibleFromArgs<T, Args...>, T*> AllocateMR(Args&&... args) {
|
||||
void* ptr = memory_resource()->allocate(sizeof(T), alignof(T));
|
||||
return new (ptr) T{std::forward<Args&&>(args)...};
|
||||
if constexpr (std::is_constructible_v<T, decltype(memory_resource())> && sizeof...(args) == 0)
|
||||
return new (ptr) T{memory_resource()};
|
||||
else
|
||||
return new (ptr) T{std::forward<Args>(args)...};
|
||||
}
|
||||
|
||||
template <typename T> static void DeleteMR(void* ptr) {
|
||||
|
|
|
@ -494,20 +494,6 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
|
|||
fetched_items_.insert(res.it->first.AsRef());
|
||||
}
|
||||
|
||||
// If the value has a pending stash, cancel it before any modification are applied.
|
||||
// Rationale: we either look it up for reads - and then it's hot, or alternatively,
|
||||
// we follow up with modifications during mutation operations, and in that case storing on disk
|
||||
// does not make much sense.
|
||||
if (res.it->second.HasStashPending()) {
|
||||
owner_->tiered_storage()->CancelStash(cntx.db_index, key, &res.it->second);
|
||||
}
|
||||
|
||||
// Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness
|
||||
// attribute of the entry in case of value overrides.
|
||||
res.it->first.SetTouched(true);
|
||||
|
||||
db.top_keys.Touch(key);
|
||||
|
||||
std::move(update_stats_on_miss).Cancel();
|
||||
switch (stats_mode) {
|
||||
case UpdateStatsMode::kMutableStats:
|
||||
|
@ -528,6 +514,27 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
|
|||
}
|
||||
break;
|
||||
}
|
||||
|
||||
auto& pv = res.it->second;
|
||||
|
||||
// Cancel any pending stashes of looked up values
|
||||
// Rationale: we either look it up for reads - and then it's hot, or alternatively,
|
||||
// we follow up with modifications, so the pending stash becomes outdated.
|
||||
if (pv.HasStashPending()) {
|
||||
owner_->tiered_storage()->CancelStash(cntx.db_index, key, &pv);
|
||||
}
|
||||
|
||||
// Fetch back cool items
|
||||
if (pv.IsExternal() && pv.IsCool()) {
|
||||
pv = owner_->tiered_storage()->Warmup(cntx.db_index, pv.GetCool());
|
||||
}
|
||||
|
||||
// Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness
|
||||
// attribute of the entry in case of value overrides.
|
||||
res.it->first.SetTouched(true);
|
||||
|
||||
db.top_keys.Touch(key);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -301,6 +301,9 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
|
|||
|
||||
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
|
||||
optional<uint64_t> expire, RdbSerializer* serializer) {
|
||||
if (pv.IsExternal() && pv.IsCool())
|
||||
return SerializeEntry(db_indx, pk, pv.GetCool().record->value, expire, serializer);
|
||||
|
||||
time_t expire_time = expire.value_or(0);
|
||||
if (!expire && pv.HasExpire()) {
|
||||
auto eit = db_array_[db_indx]->expire.Find(pk);
|
||||
|
|
|
@ -75,10 +75,7 @@ string DecodeString(bool is_raw, string_view str, PrimeValue decoder) {
|
|||
}
|
||||
|
||||
tiering::DiskSegment FromCoolItem(const PrimeValue::CoolItem& item) {
|
||||
tiering::DiskSegment res;
|
||||
res.length = item.serialized_size;
|
||||
res.offset = size_t(item.record->page_index) * 4096 + item.page_offset;
|
||||
return res;
|
||||
return {item.record->page_index * tiering::kPageSize + item.page_offset, item.serialized_size};
|
||||
}
|
||||
|
||||
} // anonymous namespace
|
||||
|
@ -152,8 +149,6 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
|
|||
// Find entry by key in db_slice and store external segment in place of original value.
|
||||
// Update memory stats
|
||||
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
|
||||
// TODO: to rename it to CoolEntry or something.
|
||||
|
||||
if (auto* pv = Find(key); pv) {
|
||||
auto* stats = GetDbTableStats(key.first);
|
||||
|
||||
|
@ -346,53 +341,25 @@ void TieredStorage::SetMemoryLowLimit(size_t mem_limit) {
|
|||
|
||||
util::fb2::Future<string> TieredStorage::Read(DbIndex dbid, string_view key,
|
||||
const PrimeValue& value) {
|
||||
DCHECK(value.IsExternal());
|
||||
util::fb2::Future<string> future;
|
||||
if (value.IsCool()) {
|
||||
// If we read a cool record - bring it back to primary table.
|
||||
PrimeValue hot = Warmup(dbid, value.GetCool());
|
||||
string tmp;
|
||||
|
||||
DCHECK_EQ(value.Size(), hot.Size());
|
||||
hot.GetString(&tmp);
|
||||
future.Resolve(tmp);
|
||||
|
||||
// TODO: An awful hack - to fix later.
|
||||
const_cast<PrimeValue&>(value) = std::move(hot);
|
||||
} else {
|
||||
// The raw_val passed to cb might need decoding based on the encoding mask of the "value"
|
||||
// object. We save the mask in decoder and use it to decode the final string that Read should
|
||||
// resolve.
|
||||
PrimeValue decoder;
|
||||
decoder.ImportExternal(value);
|
||||
|
||||
auto cb = [future, decoder = std::move(decoder)](bool is_raw, const string* raw_val) mutable {
|
||||
future.Resolve(DecodeString(is_raw, *raw_val, std::move(decoder)));
|
||||
return false; // was not modified
|
||||
};
|
||||
|
||||
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
|
||||
}
|
||||
return future;
|
||||
util::fb2::Future<std::string> fut;
|
||||
Read(dbid, key, value, [fut](const std::string& value) mutable { fut.Resolve(value); });
|
||||
return fut;
|
||||
}
|
||||
|
||||
void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
|
||||
std::function<void(const std::string&)> readf) {
|
||||
DCHECK(value.IsExternal());
|
||||
if (value.IsCool()) {
|
||||
util::fb2::Future<string> res = Read(dbid, key, value);
|
||||
readf(res.Get());
|
||||
} else {
|
||||
PrimeValue decoder;
|
||||
decoder.ImportExternal(value);
|
||||
DCHECK(!value.IsCool());
|
||||
|
||||
auto cb = [readf = std::move(readf), decoder = std::move(decoder)](
|
||||
bool is_raw, const string* raw_val) mutable {
|
||||
readf(DecodeString(is_raw, *raw_val, std::move(decoder)));
|
||||
return false;
|
||||
};
|
||||
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
|
||||
}
|
||||
PrimeValue decoder;
|
||||
decoder.ImportExternal(value);
|
||||
|
||||
auto cb = [readf = std::move(readf), decoder = std::move(decoder)](
|
||||
bool is_raw, const string* raw_val) mutable {
|
||||
readf(DecodeString(is_raw, *raw_val, std::move(decoder)));
|
||||
return false;
|
||||
};
|
||||
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
@ -402,31 +369,19 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
|
|||
DCHECK(value.IsExternal());
|
||||
|
||||
util::fb2::Future<T> future;
|
||||
if (value.IsCool()) {
|
||||
PrimeValue hot = Warmup(dbid, value.GetCool());
|
||||
string tmp;
|
||||
PrimeValue decoder;
|
||||
decoder.ImportExternal(value);
|
||||
|
||||
DCHECK_EQ(value.Size(), hot.Size());
|
||||
hot.GetString(&tmp);
|
||||
future.Resolve(modf(&tmp));
|
||||
|
||||
// TODO: An awful hack - to fix later.
|
||||
const_cast<PrimeValue&>(value).Materialize(tmp, false);
|
||||
} else {
|
||||
PrimeValue decoder;
|
||||
decoder.ImportExternal(value);
|
||||
|
||||
auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
|
||||
bool is_raw, std::string* raw_val) mutable {
|
||||
if (is_raw) {
|
||||
decoder.Materialize(*raw_val, true);
|
||||
decoder.GetString(raw_val);
|
||||
}
|
||||
future.Resolve(modf(raw_val));
|
||||
return true;
|
||||
};
|
||||
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
|
||||
}
|
||||
auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
|
||||
bool is_raw, std::string* raw_val) mutable {
|
||||
if (is_raw) {
|
||||
decoder.Materialize(*raw_val, true);
|
||||
decoder.GetString(raw_val);
|
||||
}
|
||||
future.Resolve(modf(raw_val));
|
||||
return true;
|
||||
};
|
||||
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
|
||||
return future;
|
||||
}
|
||||
|
||||
|
@ -477,15 +432,12 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
|
|||
|
||||
tiering::DiskSegment segment = value->GetExternalSlice();
|
||||
if (value->IsCool()) {
|
||||
// Delete the cool item.
|
||||
PrimeValue::CoolItem item = value->GetCool();
|
||||
PrimeValue hot = DeleteCool(item.record);
|
||||
DCHECK_EQ(OBJ_STRING, hot.ObjType());
|
||||
auto hot = DeleteCool(value->GetCool().record);
|
||||
DCHECK_EQ(hot.ObjType(), OBJ_STRING);
|
||||
}
|
||||
|
||||
// In any case we delete the offloaded segment and reset the value.
|
||||
value->Reset();
|
||||
stats_.total_deletes++;
|
||||
op_manager_->DeleteOffloaded(dbid, segment);
|
||||
}
|
||||
|
||||
|
@ -616,14 +568,15 @@ bool TieredStorage::ShouldStash(const PrimeValue& pv) const {
|
|||
|
||||
void TieredStorage::CoolDown(DbIndex db_ind, std::string_view str,
|
||||
const tiering::DiskSegment& segment, PrimeValue* pv) {
|
||||
cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
|
||||
detail::TieredColdRecord* record = CompactObj::AllocateMR<detail::TieredColdRecord>();
|
||||
cool_queue_.push_front(*record);
|
||||
cool_memory_used_ += (sizeof(detail::TieredColdRecord) + pv->MallocUsed());
|
||||
|
||||
record->key_hash = CompactObj::HashCode(str);
|
||||
record->db_index = db_ind;
|
||||
record->page_index = segment.offset / 4096;
|
||||
record->page_index = segment.offset / tiering::kPageSize;
|
||||
record->value = std::move(*pv);
|
||||
DCHECK(record);
|
||||
cool_queue_.push_front(*record);
|
||||
|
||||
pv->SetCool(segment.offset, segment.length, record);
|
||||
DCHECK_EQ(pv->Size(), record->value.Size());
|
||||
}
|
||||
|
|
|
@ -78,8 +78,12 @@ class TieredStorage {
|
|||
// Run offloading loop until i/o device is loaded or all entries were traversed
|
||||
void RunOffloading(DbIndex dbid);
|
||||
|
||||
// Prune cool entries to reach the set memory goal with freed memory
|
||||
size_t ReclaimMemory(size_t goal);
|
||||
|
||||
// Returns the primary value, and deletes the cool item as well as its offloaded storage.
|
||||
PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item);
|
||||
|
||||
size_t CoolMemoryUsage() const {
|
||||
return cool_memory_used_;
|
||||
}
|
||||
|
@ -92,9 +96,6 @@ class TieredStorage {
|
|||
void CoolDown(DbIndex db_ind, std::string_view str, const tiering::DiskSegment& segment,
|
||||
PrimeValue* pv);
|
||||
|
||||
// Returns the primary value, and deletes the cool item as well as its offloaded storage.
|
||||
PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item);
|
||||
|
||||
PrimeValue DeleteCool(detail::TieredColdRecord* record);
|
||||
detail::TieredColdRecord* PopCool();
|
||||
|
||||
|
@ -174,6 +175,10 @@ class TieredStorage {
|
|||
|
||||
void RunOffloading(DbIndex dbid) {
|
||||
}
|
||||
|
||||
PrimeValue Warmup(DbIndex dbid, PrimeValue::CoolItem item) {
|
||||
return PrimeValue{};
|
||||
}
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue