mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: reenable evictions upon insertion to avoid OOM rejections (#3387)
* chore: reenable evictions upon insertion to avoid OOM rejections Before: when running dragonfly with --cache_mode we could get OOM rejections even though the eviction policy allowed to evict items to free memory. Ideally, dragonfly in cache mode should not respond with the OOM error. This PR reuses the same Eviction step we have in the Heartbeat and conditionally applies it during the insertion. In my test the OOM errors went from 500K to 0 and the server still respected memory limit. Also, remove the old heuristics that has never been used. Test: ./dfly_bench --key_prefix=bar: -d 1024 --ratio=1:0 --qps=200 -n 3000 ./dragonfly --dbfilename= --proactor_threads=2 --maxmemory=600M --cache_mode --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
fb4222d01e
commit
e2d65a0900
7 changed files with 67 additions and 141 deletions
|
@ -403,6 +403,8 @@ class CompactObj {
|
|||
// Precondition: the object is a non-inline string.
|
||||
StringOrView GetRawString() const;
|
||||
|
||||
bool HasAllocated() const;
|
||||
|
||||
private:
|
||||
void EncodeString(std::string_view str);
|
||||
size_t DecodedLen(size_t sz) const;
|
||||
|
@ -412,8 +414,6 @@ class CompactObj {
|
|||
// Requires: HasAllocated() - true.
|
||||
void Free();
|
||||
|
||||
bool HasAllocated() const;
|
||||
|
||||
bool CmpEncoded(std::string_view sv) const;
|
||||
|
||||
void SetMeta(uint8_t taglen, uint8_t mask = 0) {
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
|
||||
namespace dfly::acl {
|
||||
|
||||
class AclKeys;
|
||||
struct AclKeys;
|
||||
|
||||
std::pair<bool, AclLog::Reason> IsUserAllowedToInvokeCommandGeneric(
|
||||
const std::vector<uint64_t>& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args,
|
||||
|
|
|
@ -20,9 +20,6 @@
|
|||
#include "util/fibers/fibers.h"
|
||||
#include "util/fibers/stacktrace.h"
|
||||
|
||||
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
|
||||
"Enable eviction during heartbeat when memory is under pressure.");
|
||||
|
||||
ABSL_FLAG(uint32_t, max_eviction_per_heartbeat, 100,
|
||||
"The maximum number of key-value pairs that will be deleted in each eviction "
|
||||
"when heartbeat based eviction is triggered under memory pressure.");
|
||||
|
@ -484,8 +481,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
|
|||
if (caching_mode_ && IsValid(res.it)) {
|
||||
if (!change_cb_.empty()) {
|
||||
auto bump_cb = [&](PrimeTable::bucket_iterator bit) {
|
||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
||||
CallChangeCallbacks(cntx.db_index, bit);
|
||||
CallChangeCallbacks(cntx.db_index, key, bit);
|
||||
};
|
||||
db.prime.CVCUponBump(change_cb_.back().first, res.it, bump_cb);
|
||||
}
|
||||
|
@ -565,8 +561,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
|
|||
CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status;
|
||||
|
||||
// It's a new entry.
|
||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << cntx.db_index;
|
||||
CallChangeCallbacks(cntx.db_index, key);
|
||||
CallChangeCallbacks(cntx.db_index, key, {key});
|
||||
|
||||
// In case we are loading from rdb file or replicating we want to disable conservative memory
|
||||
// checks (inside PrimeEvictionPolicy::CanGrow) and reject insertions only after we pass max
|
||||
|
@ -598,8 +593,8 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
|
|||
CompactObj co_key{key};
|
||||
PrimeIterator it;
|
||||
|
||||
// I try/catch just for sake of having a convenient place to set a breakpoint.
|
||||
size_t table_before = db.prime.mem_usage();
|
||||
size_t table_before = db.table_memory();
|
||||
|
||||
try {
|
||||
it = db.prime.InsertNew(std::move(co_key), PrimeValue{}, evp);
|
||||
} catch (bad_alloc& e) {
|
||||
|
@ -608,19 +603,20 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
|
|||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
table_memory_ += (db.prime.mem_usage() - table_before);
|
||||
size_t evicted_obj_bytes = 0;
|
||||
|
||||
// We may still reach the state when our memory usage is above the limit even if we
|
||||
// do not add new segments. For example, we have half full segments
|
||||
// and we add new objects or update the existing ones and our memory usage grows.
|
||||
if (evp.mem_budget() < 0) {
|
||||
// TODO(roman): EvictObjects is too aggressive and it's messing with cache hit-rate.
|
||||
// The regular eviction policy does a decent job though it may cross the passed limit
|
||||
// a little bit. I do not consider it as a serious bug.
|
||||
// evicted_obj_bytes = EvictObjects(-evp.mem_budget(), it, &db);
|
||||
if (evp.mem_budget() < 0 && apply_memory_limit) {
|
||||
// We may reach the state when our memory usage is below the limit even if we
|
||||
// do not add new segments. For example, we have half full segments
|
||||
// and we add new objects or update the existing ones and our memory usage grows.
|
||||
// We do not require for a single operation to unload the whole negative debt.
|
||||
// Instead, we create a positive, converging force that should help with freeing enough memory.
|
||||
// Free at least 256 bytes or 3% of the total debt.
|
||||
size_t evict_goal = std::max<size_t>(256, (-evp.mem_budget()) / 32);
|
||||
evicted_obj_bytes = FreeMemWithEvictionStep(cntx.db_index, it.segment_id(), evict_goal);
|
||||
}
|
||||
|
||||
table_memory_ += (db.table_memory() - table_before);
|
||||
|
||||
db.stats.inline_keys += it->first.IsInline();
|
||||
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key
|
||||
|
||||
|
@ -709,7 +705,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
|
|||
string_view key = get<string_view>(req.change);
|
||||
table->CVCUponInsert(
|
||||
next_version, key,
|
||||
[this, db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
|
||||
[db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) {
|
||||
DCHECK_LT(it.GetVersion(), next_version);
|
||||
iterate_bucket(db_index, it);
|
||||
});
|
||||
|
@ -762,7 +758,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
|
|||
}
|
||||
|
||||
CHECK(fetched_items_.empty());
|
||||
auto cb = [this, indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
|
||||
auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
|
||||
flush_db_arr.clear();
|
||||
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
|
||||
ServerState::kGlibcmalloc);
|
||||
|
@ -1023,9 +1019,7 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, uint64_t fp) const
|
|||
}
|
||||
|
||||
void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) {
|
||||
DVLOG(2) << "Running callbacks in dbid " << db_ind;
|
||||
CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()});
|
||||
|
||||
CallChangeCallbacks(db_ind, key, ChangeReq{it.GetInnerIt()});
|
||||
it.GetInnerIt().SetVersion(NextVersion());
|
||||
}
|
||||
|
||||
|
@ -1217,25 +1211,22 @@ int32_t DbSlice::GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) c
|
|||
db_arr_[db_ind]->prime.GetSegmentCount();
|
||||
}
|
||||
|
||||
void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes) {
|
||||
size_t DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t starting_segment_id,
|
||||
size_t increase_goal_bytes) {
|
||||
DCHECK(!owner_->IsReplica());
|
||||
if ((!caching_mode_) || !expire_allowed_ || !GetFlag(FLAGS_enable_heartbeat_eviction))
|
||||
return;
|
||||
if ((!caching_mode_) || !expire_allowed_)
|
||||
return 0;
|
||||
|
||||
auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat);
|
||||
auto max_segment_to_consider = GetFlag(FLAGS_max_segment_to_consider);
|
||||
|
||||
auto time_start = absl::GetCurrentTimeNanos();
|
||||
auto& db_table = db_arr_[db_ind];
|
||||
int32_t num_segments = db_table->prime.GetSegmentCount();
|
||||
int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets;
|
||||
int32_t num_slots = PrimeTable::Segment_t::kSlotNum;
|
||||
constexpr int32_t num_buckets = PrimeTable::Segment_t::kTotalBuckets;
|
||||
constexpr int32_t num_slots = PrimeTable::Segment_t::kSlotNum;
|
||||
|
||||
size_t used_memory_after;
|
||||
size_t evicted = 0;
|
||||
size_t evicted_items = 0, evicted_bytes = 0;
|
||||
string tmp;
|
||||
int32_t starting_segment_id = rand() % num_segments;
|
||||
size_t used_memory_before = owner_->UsedMemory();
|
||||
|
||||
bool record_keys = owner_->journal() != nullptr || expired_keys_events_recording_;
|
||||
vector<string> keys_to_journal;
|
||||
|
@ -1257,7 +1248,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
|
|||
continue;
|
||||
|
||||
auto evict_it = db_table->prime.GetIterator(segment_id, bucket_id, slot_id);
|
||||
if (evict_it->first.IsSticky())
|
||||
if (evict_it->first.IsSticky() || !evict_it->second.HasAllocated())
|
||||
continue;
|
||||
|
||||
// check if the key is locked by looking up transaction table.
|
||||
|
@ -1269,13 +1260,12 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
|
|||
if (record_keys)
|
||||
keys_to_journal.emplace_back(key);
|
||||
|
||||
evicted_bytes += evict_it->second.MallocUsed();
|
||||
++evicted_items;
|
||||
PerformDeletion(Iterator(evict_it, StringOrView::FromView(key)), db_table.get());
|
||||
++evicted;
|
||||
|
||||
used_memory_after = owner_->UsedMemory();
|
||||
// returns when whichever condition is met first
|
||||
if ((evicted == max_eviction_per_hb) ||
|
||||
(used_memory_before - used_memory_after >= increase_goal_bytes))
|
||||
if ((evicted_items == max_eviction_per_hb) || (evicted_bytes >= increase_goal_bytes))
|
||||
goto finish;
|
||||
}
|
||||
}
|
||||
|
@ -1294,12 +1284,12 @@ finish:
|
|||
}
|
||||
|
||||
auto time_finish = absl::GetCurrentTimeNanos();
|
||||
events_.evicted_keys += evicted;
|
||||
DVLOG(2) << "Memory usage before eviction: " << used_memory_before;
|
||||
DVLOG(2) << "Memory usage after eviction: " << used_memory_after;
|
||||
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted << "/"
|
||||
events_.evicted_keys += evicted_items;
|
||||
DVLOG(2) << "Evicted: " << evicted_bytes;
|
||||
DVLOG(2) << "Number of keys evicted / max eviction per hb: " << evicted_items << "/"
|
||||
<< max_eviction_per_hb;
|
||||
DVLOG(2) << "Eviction time (us): " << (time_finish - time_start) / 1000;
|
||||
return evicted_bytes;
|
||||
}
|
||||
|
||||
void DbSlice::CreateDb(DbIndex db_ind) {
|
||||
|
@ -1310,93 +1300,6 @@ void DbSlice::CreateDb(DbIndex db_ind) {
|
|||
}
|
||||
}
|
||||
|
||||
// "it" is the iterator that we just added/updated and it should not be deleted.
|
||||
// "table" is the instance where we should delete the objects from.
|
||||
size_t DbSlice::EvictObjects(size_t memory_to_free, Iterator it, DbTable* table) {
|
||||
if (owner_->IsReplica()) {
|
||||
return 0;
|
||||
}
|
||||
PrimeTable::Segment_t* segment = table->prime.GetSegment(it.GetInnerIt().segment_id());
|
||||
DCHECK(segment);
|
||||
|
||||
constexpr unsigned kNumStashBuckets = PrimeTable::Segment_t::kStashBucketNum;
|
||||
constexpr unsigned kNumRegularBuckets = PrimeTable::Segment_t::kBucketNum;
|
||||
|
||||
PrimeTable::bucket_iterator it2(it.GetInnerIt());
|
||||
unsigned evicted = 0;
|
||||
bool evict_succeeded = false;
|
||||
|
||||
EngineShard* shard = owner_;
|
||||
size_t used_memory_start = shard->UsedMemory();
|
||||
|
||||
auto freed_memory_fun = [&] {
|
||||
size_t current = shard->UsedMemory();
|
||||
return current < used_memory_start ? used_memory_start - current : 0;
|
||||
};
|
||||
|
||||
for (unsigned i = 0; !evict_succeeded && i < kNumStashBuckets; ++i) {
|
||||
unsigned stash_bid = i + kNumRegularBuckets;
|
||||
const auto& bucket = segment->GetBucket(stash_bid);
|
||||
if (bucket.IsEmpty())
|
||||
continue;
|
||||
|
||||
for (int slot_id = PrimeTable::Segment_t::kSlotNum - 1; slot_id >= 0; --slot_id) {
|
||||
if (!bucket.IsBusy(slot_id))
|
||||
continue;
|
||||
|
||||
auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), stash_bid, slot_id);
|
||||
// skip the iterator that we must keep or the sticky items.
|
||||
if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky())
|
||||
continue;
|
||||
|
||||
PerformDeletion(evict_it, table);
|
||||
++evicted;
|
||||
|
||||
if (freed_memory_fun() > memory_to_free) {
|
||||
evict_succeeded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (evicted) {
|
||||
DVLOG(1) << "Evicted " << evicted << " stashed items, freed " << freed_memory_fun() << " bytes";
|
||||
}
|
||||
|
||||
// Try normal buckets now. We iterate from largest slot to smallest across the whole segment.
|
||||
for (int slot_id = PrimeTable::Segment_t::kSlotNum - 1; !evict_succeeded && slot_id >= 0;
|
||||
--slot_id) {
|
||||
for (unsigned i = 0; i < kNumRegularBuckets; ++i) {
|
||||
unsigned bid = (it.GetInnerIt().bucket_id() + i) % kNumRegularBuckets;
|
||||
const auto& bucket = segment->GetBucket(bid);
|
||||
if (!bucket.IsBusy(slot_id))
|
||||
continue;
|
||||
|
||||
auto evict_it = table->prime.GetIterator(it.GetInnerIt().segment_id(), bid, slot_id);
|
||||
if (evict_it == it.GetInnerIt() || evict_it->first.IsSticky())
|
||||
continue;
|
||||
|
||||
PerformDeletion(evict_it, table);
|
||||
++evicted;
|
||||
|
||||
if (freed_memory_fun() > memory_to_free) {
|
||||
evict_succeeded = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (evicted) {
|
||||
DVLOG(1) << "Evicted total: " << evicted << " items, freed " << freed_memory_fun() << " bytes "
|
||||
<< "success: " << evict_succeeded;
|
||||
|
||||
events_.evicted_keys += evicted;
|
||||
events_.hard_evictions += evicted;
|
||||
}
|
||||
|
||||
return freed_memory_fun();
|
||||
};
|
||||
|
||||
void DbSlice::RegisterWatchedKey(DbIndex db_indx, std::string_view key,
|
||||
ConnectionState::ExecInfo* exec_info) {
|
||||
db_arr_[db_indx]->watched_keys[key].push_back(exec_info);
|
||||
|
@ -1566,7 +1469,11 @@ void DbSlice::OnCbFinish() {
|
|||
fetched_items_.clear();
|
||||
}
|
||||
|
||||
void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const {
|
||||
void DbSlice::CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const {
|
||||
if (change_cb_.empty())
|
||||
return;
|
||||
|
||||
DVLOG(2) << "Running callbacks for key " << key << " in dbid " << id;
|
||||
FetchedItemsRestorer fetched_restorer(&fetched_items_);
|
||||
std::unique_lock<LocalBlockingCounter> lk(block_counter_);
|
||||
|
||||
|
|
|
@ -437,7 +437,12 @@ class DbSlice {
|
|||
|
||||
// Deletes some amount of possible expired items.
|
||||
DeleteExpiredStats DeleteExpiredStep(const Context& cntx, unsigned count);
|
||||
void FreeMemWithEvictionStep(DbIndex db_indx, size_t increase_goal_bytes);
|
||||
|
||||
// Evicts items with dynamically allocated data from the primary table.
|
||||
// Does not shrink tables.
|
||||
// Returnes number of bytes freed due to evictions.
|
||||
size_t FreeMemWithEvictionStep(DbIndex db_indx, size_t starting_segment_id,
|
||||
size_t increase_goal_bytes);
|
||||
void ScheduleForOffloadStep(DbIndex db_indx, size_t increase_goal_bytes);
|
||||
|
||||
int32_t GetNextSegmentForEviction(int32_t segment_id, DbIndex db_ind) const;
|
||||
|
@ -469,6 +474,8 @@ class DbSlice {
|
|||
// Resets events_ member. Used by CONFIG RESETSTAT
|
||||
void ResetEvents();
|
||||
|
||||
// Controls the expiry/eviction state. The server may enter states where
|
||||
// Both evictions and expiries will be stopped for a short period of time.
|
||||
void SetExpireAllowed(bool is_allowed) {
|
||||
expire_allowed_ = is_allowed;
|
||||
}
|
||||
|
@ -508,7 +515,6 @@ class DbSlice {
|
|||
void SendInvalidationTrackingMessage(std::string_view key);
|
||||
|
||||
void CreateDb(DbIndex index);
|
||||
size_t EvictObjects(size_t memory_to_free, Iterator it, DbTable* table);
|
||||
|
||||
enum class UpdateStatsMode {
|
||||
kReadStats,
|
||||
|
@ -534,7 +540,7 @@ class DbSlice {
|
|||
return version_++;
|
||||
}
|
||||
|
||||
void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const;
|
||||
void CallChangeCallbacks(DbIndex id, std::string_view key, const ChangeReq& cr) const;
|
||||
|
||||
class LocalBlockingCounter {
|
||||
public:
|
||||
|
|
|
@ -74,6 +74,9 @@ ABSL_FLAG(string, shard_round_robin_prefix, "",
|
|||
ABSL_FLAG(uint32_t, mem_defrag_check_sec_interval, 10,
|
||||
"Number of seconds between every defragmentation necessity check");
|
||||
|
||||
ABSL_FLAG(bool, enable_heartbeat_eviction, true,
|
||||
"Enable eviction during heartbeat when memory is under pressure.");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace tiering::literals;
|
||||
|
@ -641,8 +644,10 @@ void EngineShard::Heartbeat() {
|
|||
}
|
||||
|
||||
// if our budget is below the limit
|
||||
if (db_slice.memory_budget() < eviction_redline) {
|
||||
db_slice.FreeMemWithEvictionStep(i, eviction_redline - db_slice.memory_budget());
|
||||
if (db_slice.memory_budget() < eviction_redline && GetFlag(FLAGS_enable_heartbeat_eviction)) {
|
||||
uint32_t starting_segment_id = rand() % pt->GetSegmentCount();
|
||||
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
|
||||
eviction_redline - db_slice.memory_budget());
|
||||
}
|
||||
|
||||
if (UsedMemory() > tiering_offload_threshold) {
|
||||
|
@ -658,9 +663,12 @@ void EngineShard::Heartbeat() {
|
|||
}
|
||||
|
||||
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
|
||||
VLOG(1) << "RunPeriodic with period " << period_ms.count() << "ms";
|
||||
|
||||
bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
|
||||
unsigned global_count = 0;
|
||||
int64_t last_stats_time = time(nullptr);
|
||||
int64_t last_heartbeat_ms = INT64_MAX;
|
||||
|
||||
while (true) {
|
||||
if (fiber_periodic_done_.WaitFor(period_ms)) {
|
||||
|
@ -668,7 +676,12 @@ void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
|
|||
return;
|
||||
}
|
||||
|
||||
int64_t now_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
|
||||
if (now_ms - 5 * period_ms.count() > last_heartbeat_ms) {
|
||||
VLOG(1) << "This heartbeat took " << now_ms - last_heartbeat_ms << "ms";
|
||||
}
|
||||
Heartbeat();
|
||||
last_heartbeat_ms = fb2::ProactorBase::GetMonotonicTimeNs() / 1000000;
|
||||
|
||||
if (runs_global_periodic) {
|
||||
++global_count;
|
||||
|
|
|
@ -165,7 +165,7 @@ void MemoryCmd::Run(CmdArgList args) {
|
|||
}
|
||||
|
||||
if (sub_cmd == "DEFRAGMENT") {
|
||||
shard_set->pool()->DispatchOnAll([this](util::ProactorBase*) {
|
||||
shard_set->pool()->DispatchOnAll([](util::ProactorBase*) {
|
||||
if (auto* shard = EngineShard::tlocal(); shard)
|
||||
shard->ForceDefrag();
|
||||
});
|
||||
|
|
|
@ -701,7 +701,7 @@ std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, Namesp
|
|||
return std::nullopt;
|
||||
}
|
||||
|
||||
// We should not expire/evict keys while clients are puased.
|
||||
// We should not expire/evict keys while clients are paused.
|
||||
shard_set->RunBriefInParallel(
|
||||
[ns](EngineShard* shard) { ns->GetDbSlice(shard->shard_id()).SetExpireAllowed(false); });
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue