chore: reorganize EngineShard::Heartbeat (#3437)

* chore: reorganize EngineShard::Heartbeat

1. Simplify CacheStats by using accessorts directly provided by DbSlice
2. Separate eviction for tiering as tiering can be done on replica.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-08-04 15:00:43 +03:00 committed by GitHub
parent cfd2273fb0
commit 8f7c36e4b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 34 additions and 31 deletions

View file

@ -603,9 +603,34 @@ void EngineShard::Heartbeat() {
CacheStats();
if (IsReplica()) // Never run expiration on replica.
return;
if (!IsReplica()) { // Never run expiry/evictions on replica.
RetireExpiredAndEvict();
}
// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
size_t tiering_offload_threshold =
tiered_storage_ ? tiered_storage_->CoolMemoryUsage() +
size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) /
shard_set->size()
: std::numeric_limits<size_t>::max();
size_t used_memory = UsedMemory();
if (used_memory > tiering_offload_threshold) {
VLOG(1) << "Running Offloading, memory=" << used_memory
<< " tiering_threshold: " << tiering_offload_threshold
<< ", cool memory: " << tiered_storage_->CoolMemoryUsage();
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
tiered_storage_->RunOffloading(i);
}
}
}
void EngineShard::RetireExpiredAndEvict() {
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
@ -616,21 +641,13 @@ void EngineShard::Heartbeat() {
if (deleted > 10) {
// deleted should be <= traversed.
// hence we map our delete/traversed ratio into a range [0, kTtlDeleteLimit).
// The higher t
// The higher ttl_delete_target the more likely we have lots of expired items that need
// to be deleted.
ttl_delete_target = kTtlDeleteLimit * double(deleted) / (double(traversed) + 10);
}
ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
size_t tiering_offload_threshold =
tiered_storage_ ? tiered_storage_->CoolMemoryUsage() +
size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) /
shard_set->size()
: std::numeric_limits<size_t>::max();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
@ -655,14 +672,6 @@ void EngineShard::Heartbeat() {
db_slice.FreeMemWithEvictionStep(i, starting_segment_id,
eviction_redline - db_slice.memory_budget());
}
size_t used_memory = UsedMemory();
if (used_memory > tiering_offload_threshold) {
VLOG(1) << "Running Offloading, memory=" << used_memory
<< " tiering_threshold: " << tiering_offload_threshold
<< ", cool memory: " << tiered_storage_->CoolMemoryUsage();
tiered_storage_->RunOffloading(i);
}
}
// Journal entries for expired entries are not writen to socket in the loop above.
@ -743,17 +752,9 @@ void EngineShard::CacheStats() {
size_t current = used_mem_current.fetch_add(delta, memory_order_relaxed) + delta;
ssize_t free_mem = max_memory_limit - current;
size_t entries = 0;
size_t table_memory = 0;
for (size_t i = 0; i < db_slice.db_array_size(); ++i) {
DbTable* table = db_slice.GetDBTable(i);
if (table) {
entries += table->prime.size();
table_memory += table->table_memory();
}
}
DCHECK_EQ(table_memory, db_slice.table_memory());
DCHECK_EQ(entries, db_slice.entries_count());
size_t entries = db_slice.entries_count();
size_t table_memory = db_slice.table_memory();
if (tiered_storage_) {
table_memory += tiered_storage_->CoolMemoryUsage();
}

View file

@ -206,6 +206,8 @@ class EngineShard {
void StartPeriodicFiber(util::ProactorBase* pb, std::function<void()> global_handler);
void Heartbeat();
void RetireExpiredAndEvict();
void RunPeriodic(std::chrono::milliseconds period_ms, std::function<void()> global_handler);
void CacheStats();