feat(tiering): add background offload step (#2504)

* feat(tiering): add background offload step

Signed-off-by: adi_holden <adi@dragonflydb.io
This commit is contained in:
adiholden 2024-02-14 14:28:41 +02:00 committed by GitHub
parent b18fe8c0a8
commit 32e8d49123
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 261 additions and 56 deletions

View file

@ -118,6 +118,12 @@ class CompactObj {
ASCII2_ENC_BIT = 0x10,
IO_PENDING = 0x20,
STICKY = 0x40,
// TOUCHED used to determin which items are hot/cold.
// by checking if the item was touched from the last time we
// reached this item while travering the database to set items as cold.
// https://junchengyang.com/publication/nsdi24-SIEVE.pdf
TOUCHED = 0x80,
};
static constexpr uint8_t kEncMask = ASCII1_ENC_BIT | ASCII2_ENC_BIT;
@ -216,6 +222,18 @@ class CompactObj {
}
}
bool WasTouched() const {
return mask_ & TOUCHED;
}
void SetTouched(bool e) {
if (e) {
mask_ |= TOUCHED;
} else {
mask_ &= ~TOUCHED;
}
}
bool HasIoPending() const {
return mask_ & IO_PENDING;
}

View file

@ -213,7 +213,7 @@ class DashTable : public detail::DashTableBase {
}
size_t bucket_count() const {
return unique_segments_ * SegmentType::kRegularBucketCnt;
return unique_segments_ * SegmentType::kTotalBuckets;
}
// Overall capacity of the table (including stash buckets).
@ -240,6 +240,12 @@ class DashTable : public detail::DashTableBase {
// calling cb(iterator) for every non-empty slot. The iteration goes over a physical bucket.
template <typename Cb> void TraverseBucket(const_iterator it, Cb&& cb);
// Traverses over a single bucket in table and calls cb(iterator). The traverse order will be
// segment by segment over physical backets.
// traverse by segment order does not guarantees coverage if the table grows/shrinks, it is useful
// when formal full coverage is not critically important.
template <typename Cb> Cursor TraverseBySegmentOrder(Cursor curs, Cb&& cb);
// Discards slots information.
static const_bucket_iterator BucketIt(const_iterator it) {
return const_bucket_iterator{it.owner_, it.seg_id_, it.bucket_id_, 0};
@ -886,6 +892,30 @@ void DashTable<_Key, _Value, Policy>::Split(uint32_t seg_id) {
}
}
template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
auto DashTable<_Key, _Value, Policy>::TraverseBySegmentOrder(Cursor curs, Cb&& cb) -> Cursor {
uint32_t sid = curs.segment_id(global_depth_);
assert(sid < segment_.size());
SegmentType* s = segment_[sid];
assert(s);
uint8_t bid = curs.bucket_id();
auto dt_cb = [&](const SegmentIterator& it) { cb(iterator{this, sid, it.index, it.slot}); };
s->TraverseBucket(bid, std::move(dt_cb));
++bid;
if (bid == kPhysicalBucketNum) {
sid = NextSeg(sid);
bid = 0;
if (sid >= segment_.size()) {
return 0; // "End of traversal" cursor.
}
}
return Cursor{global_depth_, sid, bid};
}
template <typename _Key, typename _Value, typename Policy>
template <typename Cb>
auto DashTable<_Key, _Value, Policy>::Traverse(Cursor curs, Cb&& cb) -> Cursor {

View file

@ -459,6 +459,10 @@ template <typename _Key, typename _Value, typename Policy = DefaultSegmentPolicy
// Cb accepts (const Iterator&).
template <typename Cb> void TraverseAll(Cb&& cb) const;
// Traverses over Segment's bucket bid and calls cb(Iterator& it)
// for each slot in the bucket. The iteration goes over a physical bucket.
template <typename Cb> void TraverseBucket(uint8_t bid, Cb&& cb);
// Used in test.
unsigned NumProbingBuckets() const {
unsigned res = 0;
@ -1574,6 +1578,15 @@ std::enable_if_t<UV, unsigned> Segment<Key, Value, Policy>::CVCOnBump(uint64_t v
return result;
}
template <typename Key, typename Value, typename Policy>
template <typename Cb>
void Segment<Key, Value, Policy>::TraverseBucket(uint8_t bid, Cb&& cb) {
assert(bid < kTotalBuckets);
const Bucket& b = bucket_[bid];
b.ForEachSlot([&](auto* bucket, uint8_t slot, bool probe) { cb(Iterator{bid, slot}); });
}
template <typename Key, typename Value, typename Policy>
template <typename Cb, typename HashFn>
bool Segment<Key, Value, Policy>::TraverseLogicalBucket(uint8_t bid, HashFn&& hfun, Cb&& cb) const {

View file

@ -565,6 +565,30 @@ TEST_F(DashTest, Bucket) {
EXPECT_EQ(s.size(), num_items);
}
TEST_F(DashTest, TraverseSegmentOrder) {
constexpr auto kNumItems = 50;
for (size_t i = 0; i < kNumItems; ++i) {
dt_.Insert(i, i);
}
vector<unsigned> nums;
auto tr_cb = [&](Dash64::iterator it) {
nums.push_back(it->first);
VLOG(1) << it.bucket_id() << " " << it.slot_id() << " " << it->first;
};
Dash64::Cursor cursor;
do {
cursor = dt_.TraverseBySegmentOrder(cursor, tr_cb);
} while (cursor);
sort(nums.begin(), nums.end());
nums.resize(unique(nums.begin(), nums.end()) - nums.begin());
ASSERT_EQ(kNumItems, nums.size());
EXPECT_EQ(0, nums[0]);
EXPECT_EQ(kNumItems - 1, nums.back());
}
struct TestEvictionPolicy {
static constexpr bool can_evict = true;
static constexpr bool can_gc = false;

View file

@ -18,6 +18,7 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "strings/human_readable.h"
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
"Enable eviction during heartbeat when memory is under pressure.");
@ -229,7 +230,7 @@ DbStats& DbStats::operator+=(const DbStats& o) {
}
SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
static_assert(sizeof(SliceEvents) == 96, "You should update this function with new fields");
static_assert(sizeof(SliceEvents) == 112, "You should update this function with new fields");
ADD(evicted_keys);
ADD(hard_evictions);
@ -243,6 +244,8 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) {
ADD(mutations);
ADD(insertion_rejections);
ADD(update);
ADD(ram_hits);
ADD(ram_misses);
return *this;
}
@ -472,16 +475,21 @@ OpResult<DbSlice::ItAndExp> DbSlice::FindInternal(const Context& cntx, std::stri
if (TieredStorage* tiered = shard_owner()->tiered_storage();
tiered && load_mode == LoadExternalMode::kLoad) {
if (res.it->second.HasIoPending()) {
tiered->CancelIo(cntx.db_index, res.it);
} else if (res.it->second.IsExternal()) {
if (res.it->second.IsExternal()) {
// Load reads data from disk therefore we will preempt in this function.
// We will update the iterator if it changed during the preemption
res.it = tiered->Load(cntx.db_index, res.it, key);
if (!IsValid(res.it)) {
return OpStatus::KEY_NOTFOUND;
}
events_.ram_misses++;
} else {
if (res.it->second.HasIoPending()) {
tiered->CancelIo(cntx.db_index, res.it);
}
events_.ram_hits++;
}
res.it->first.SetTouched(true);
}
FiberAtomicGuard fg;
@ -975,7 +983,7 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
void DbSlice::ReleaseNormalized(IntentLock::Mode mode, DbIndex db_index, std::string_view key) {
DCHECK_EQ(key, KeyLockArgs::GetLockKey(key));
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " "
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " "
<< " for " << key;
auto& lt = db_arr_[db_index]->trans_locks;
@ -1198,6 +1206,37 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
db_arr_[db_ind]->prime.GetSegmentCount();
}
void DbSlice::ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes) {
VLOG(1) << "ScheduleForOffloadStep increase_goal_bytes:"
<< strings::HumanReadableNumBytes(increase_goal_bytes);
DCHECK(shard_owner()->tiered_storage());
FiberAtomicGuard guard;
PrimeTable& pt = db_arr_[db_indx]->prime;
static PrimeTable::Cursor cursor;
size_t offloaded_bytes = 0;
auto cb = [&](PrimeIterator it) {
// TBD check we did not lock it for future transaction
// If the item is cold (not touched) and can be externalized, schedule it for offload.
if (increase_goal_bytes > offloaded_bytes && !(it->first.WasTouched()) &&
TieredStorage::CanExternalizeEntry(it)) {
shard_owner()->tiered_storage()->ScheduleOffload(db_indx, it);
if (it->second.HasIoPending()) {
offloaded_bytes += it->second.Size();
VLOG(2) << "ScheduleOffload bytes:" << offloaded_bytes;
}
}
it->first.SetTouched(false);
};
// Traverse a single segment every time this function is called.
for (int i = 0; i < 60; ++i) {
cursor = pt.TraverseBySegmentOrder(cursor, cb);
}
}
void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes) {
DCHECK(!owner_->IsReplica());
if ((!caching_mode_) || !expire_allowed_ || !GetFlag(FLAGS_enable_heartbeat_eviction))

View file

@ -51,6 +51,10 @@ struct SliceEvents {
size_t misses = 0;
size_t mutations = 0;
// ram hit/miss when tiering is enabled
size_t ram_hits = 0;
size_t ram_misses = 0;
// how many insertions were rejected due to OOM.
size_t insertion_rejections = 0;
@ -364,6 +368,7 @@ class DbSlice {
// Deletes some amount of possible expired items.
DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count);
void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes);
void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes);
int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;

View file

@ -38,6 +38,9 @@ ABSL_FLAG(dfly::MemoryBytesFlag, tiered_max_file_size, dfly::MemoryBytesFlag{},
"0 - means the program will automatically determine its maximum file size. "
"default: 0");
ABSL_FLAG(float, tiered_offload_threshold, 0.5,
"The ratio of used/max memory above which we start offloading values to disk");
ABSL_FLAG(uint32_t, hz, 100,
"Base frequency at which the server performs other background tasks. "
"Warning: not advised to decrease in production.");
@ -602,7 +605,9 @@ void EngineShard::Heartbeat() {
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}
ssize_t redline = (max_memory_limit * kRedLimitFactor) / shard_set->size();
ssize_t eviction_redline = (max_memory_limit * kRedLimitFactor) / shard_set->size();
size_t tiering_redline =
(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
@ -620,8 +625,16 @@ void EngineShard::Heartbeat() {
}
// if our budget is below the limit
if (db_slice_.memory_budget() < redline) {
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
if (db_slice_.memory_budget() < eviction_redline) {
db_slice_.FreeMemWithEvictionStep(i, eviction_redline - db_slice_.memory_budget());
}
if (tiered_storage_) {
size_t offload_bytes = 0;
if (UsedMemory() > tiering_redline) {
offload_bytes = UsedMemory() - tiering_redline;
}
db_slice_.ScheduleForOffloadStep(i, offload_bytes);
}
}

View file

@ -1885,11 +1885,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("reply_count", reply_stats.send_stats.count);
append("reply_latency_usec", reply_stats.send_stats.total_duration);
append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter);
append("ram_hits", m.events.ram_hits);
append("ram_misses", m.events.ram_misses);
}
if (should_enter("TIERED", true)) {
append("tiered_entries", total.tiered_entries);
append("tiered_bytes", total.tiered_size);
append("tiered_bytes_human", HumanReadableNumBytes(total.tiered_size));
append("tiered_reads", m.disk_stats.read_total);
append("tiered_read_latency_usec", m.disk_stats.read_delay_usec);
append("tiered_writes", m.tiered_stats.tiered_writes);

View file

@ -610,10 +610,8 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
}
if (shard->tiered_storage() &&
TieredStorage::EligibleForOffload(value)) { // external storage enabled.
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
// afterwards. handle this
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it, key);
TieredStorage::EligibleForOffload(value.size())) { // external storage enabled.
shard->tiered_storage()->ScheduleOffloadWithThrottle(op_args_.db_cntx.db_index, it, key);
}
if (manual_journal_ && op_args_.shard->journal()) {

View file

@ -399,7 +399,7 @@ void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) {
}
delete req;
--num_active_requests_;
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
if (IoDeviceUnderloaded()) {
this->throttle_ec_.notifyAll();
}
VLOG_IF(2, num_active_requests_ == 0) << "Finished active requests";
@ -440,7 +440,7 @@ PrimeIterator TieredStorage::Load(DbIndex db_index, PrimeIterator it, string_vie
return it;
}
error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it, string_view key) {
bool TieredStorage::PrepareForOffload(DbIndex db_index, PrimeIterator it) {
CHECK_EQ(OBJ_STRING, it->second.ObjType());
DCHECK(!it->second.IsExternal());
DCHECK(!it->second.HasIoPending());
@ -456,13 +456,7 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it, st
}
if (blob_len > kMaxSmallBin) {
auto [schedule, res_it] = CanScheduleOffload(db_index, it, key);
if (schedule) {
WriteSingle(db_index, res_it, blob_len);
} else {
VLOG(2) << "Skip WriteSingle for: " << key;
}
return error_code{};
return true;
}
PerDb* db = db_arr_[db_index];
@ -474,34 +468,79 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it, st
unsigned max_entries = NumEntriesInSmallBin(kSmallBins[bin_index]);
auto& bin_record = db->bin_map[bin_index];
// TODO: we need to track in stats all the cases where we omit offloading attempt.
CHECK_LT(bin_record.pending_entries.size(), max_entries);
if (bin_record.pending_entries.size() == max_entries) {
// This bin is full and was not offloaded yet, can not set this entry for offload.
return false;
}
VLOG(2) << "ScheduleOffload:" << key;
VLOG(2) << "ScheduleOffload:" << it->first.ToString();
bin_record.pending_entries.insert(it->first.AsRef());
it->second.SetIoPending(true);
if (bin_record.pending_entries.size() < max_entries)
return error_code{}; // gather more.
if (bin_record.pending_entries.size() == max_entries) {
return true;
}
return false; // Gather more entries for bin before offload.
}
bool flush_succeeded = false;
void TieredStorage::CancelOffload(DbIndex db_index, PrimeIterator it) {
size_t blob_len = it->second.Size();
if (blob_len > kMaxSmallBin) {
return;
}
PerDb* db = db_arr_[db_index];
unsigned bin_index = SmallToBin(blob_len);
auto& bin_record = db->bin_map[bin_index];
bin_record.pending_entries.erase(it->first.AsRef());
it->second.SetIoPending(false);
++stats_.flush_skip_cnt;
}
auto [schedule, res_it] = CanScheduleOffload(db_index, it, key);
error_code TieredStorage::ScheduleOffloadWithThrottle(DbIndex db_index, PrimeIterator it,
string_view key) {
bool schedule_offload = PrepareForOffload(db_index, it);
if (!schedule_offload) {
return error_code{};
}
auto [schedule, res_it] = ThrottleWrites(db_index, it, key);
if (schedule) {
flush_succeeded = FlushPending(db_index, bin_index);
return ScheduleOffloadInternal(db_index, res_it);
} else {
CancelOffload(db_index, res_it);
}
return error_code{};
}
// if we reached high utilization of the file range - try to grow the file.
if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
InitiateGrow(1ULL << 28);
}
error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
bool schedule_offload = PrepareForOffload(db_index, it);
if (!schedule_offload) {
return error_code{};
}
if (IoDeviceUnderloaded()) {
return ScheduleOffloadInternal(db_index, it);
} else {
CancelOffload(db_index, it);
}
return error_code{};
}
error_code TieredStorage::ScheduleOffloadInternal(DbIndex db_index, PrimeIterator it) {
size_t blob_len = it->second.Size();
if (blob_len > kMaxSmallBin) {
WriteSingle(db_index, it, blob_len);
return error_code{};
}
if (!flush_succeeded) {
VLOG(2) << "flush failed remove entry: " << key;
// we could not flush because I/O is saturated, so lets remove the last item.
bin_record.pending_entries.erase(res_it->first.AsRef());
res_it->second.SetIoPending(false);
++stats_.flush_skip_cnt;
unsigned bin_index = SmallToBin(blob_len);
bool flashed = FlushPending(db_index, bin_index);
if (!flashed) {
CancelOffload(db_index, it);
}
// if we reached high utilization of the file range - try to grow the file.
if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
InitiateGrow(1ULL << 28);
}
return error_code{};
@ -588,7 +627,7 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
mi_free(req->block_ptr);
delete req;
--num_active_requests_;
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
if (IoDeviceUnderloaded()) {
this->throttle_ec_.notifyAll();
}
};
@ -626,17 +665,16 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
++stats_.tiered_writes;
}
std::pair<bool, PrimeIterator> TieredStorage::CanScheduleOffload(DbIndex db_index, PrimeIterator it,
string_view key) {
unsigned max_pending_writes = GetFlag(FLAGS_tiered_storage_max_pending_writes);
std::pair<bool, PrimeIterator> TieredStorage::ThrottleWrites(DbIndex db_index, PrimeIterator it,
string_view key) {
unsigned throttle_usec = GetFlag(FLAGS_tiered_storage_throttle_us);
PrimeIterator res_it = it;
if (num_active_requests_ >= max_pending_writes && throttle_usec > 0) {
if (!IoDeviceUnderloaded() && throttle_usec > 0) {
chrono::steady_clock::time_point next =
chrono::steady_clock::now() + chrono::microseconds(throttle_usec);
stats_.throttled_write_cnt++;
throttle_ec_.await_until([&]() { return num_active_requests_ < max_pending_writes; }, next);
throttle_ec_.await_until([&]() { return IoDeviceUnderloaded(); }, next);
PrimeTable* pt = db_slice_.GetTables(db_index).first;
if (!it.IsOccupied() || it->first != key) {
@ -649,7 +687,11 @@ std::pair<bool, PrimeIterator> TieredStorage::CanScheduleOffload(DbIndex db_inde
}
}
return std::make_pair((num_active_requests_ < max_pending_writes), res_it);
return std::make_pair(IoDeviceUnderloaded(), res_it);
}
bool TieredStorage::IoDeviceUnderloaded() const {
return num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes);
}
bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
@ -727,4 +769,9 @@ void TieredStorage::InitiateGrow(size_t grow_size) {
CHECK(!ec) << "TBD"; // TODO
}
bool TieredStorage::CanExternalizeEntry(PrimeIterator it) {
return it->first.ObjType() == OBJ_STRING && !it->second.HasIoPending() &&
!it->second.IsExternal() && EligibleForOffload(it->second.Size());
}
} // namespace dfly

View file

@ -28,15 +28,20 @@ class TieredStorage {
PrimeIterator Load(DbIndex db_index, PrimeIterator it, std::string_view key);
// Schedules unloading of the item, pointed by the iterator.
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it, std::string_view key);
void CancelIo(DbIndex db_index, PrimeIterator it);
static bool EligibleForOffload(std::string_view val) {
return val.size() >= kMinBlobLen;
static bool EligibleForOffload(size_t size) {
return size >= kMinBlobLen;
}
static bool CanExternalizeEntry(PrimeIterator it);
// Schedules offloadin of the item, pointed by the iterator, this function can preempt.
std::error_code ScheduleOffloadWithThrottle(DbIndex db_index, PrimeIterator it,
std::string_view key);
// Schedules offloadin of the item, pointed by the iterator.
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it);
void Free(PrimeIterator it, DbTableStats* stats);
void Shutdown();
@ -51,15 +56,25 @@ class TieredStorage {
std::error_code Read(size_t offset, size_t len, char* dest);
bool IoDeviceUnderloaded() const;
private:
class InflightWriteRequest;
void WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len);
// Returns a pair consisting of an bool denoting whether we can write to disk, and updated
// iterator as this function can yield. 'it' should not be used after the call to this function.
std::pair<bool, PrimeIterator> CanScheduleOffload(DbIndex db_index, PrimeIterator it,
std::string_view key);
// If the io device is overloaded this funciton will yield untill the device is underloaded or
// throttle timeout is reached. Returns a pair consisting of an bool denoting whether device is
// underloaded and updated iterator as this function can yield. 'it' should not be used after the
// call to this function.
std::pair<bool, PrimeIterator> ThrottleWrites(DbIndex db_index, PrimeIterator it,
std::string_view key);
// Schedules unloading of the item, pointed by the iterator.
std::error_code ScheduleOffloadInternal(DbIndex db_index, PrimeIterator it);
bool PrepareForOffload(DbIndex db_index, PrimeIterator it);
void CancelOffload(DbIndex db_index, PrimeIterator it);
bool FlushPending(DbIndex db_index, unsigned bin_index);