feat: Add eviction based on rss memory

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2025-04-24 14:26:57 +02:00
parent b3e0bcfb31
commit 22272c9c60
No known key found for this signature in database
GPG key ID: 92DAADC9F15EE26E
4 changed files with 189 additions and 63 deletions

View file

@ -819,7 +819,12 @@ Usage: dragonfly [FLAGS]
// export MIMALLOC_VERBOSE=1 to see the options before the override. // export MIMALLOC_VERBOSE=1 to see the options before the override.
mi_option_enable(mi_option_show_errors); mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0); mi_option_set(mi_option_max_warnings, 0);
mi_option_enable(mi_option_purge_decommits); mi_option_enable(mi_option_purge_decommits);
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);
mi_option_set(mi_option_purge_delay, 0);
DCHECK(!mi_option_get(mi_option_reset_delay));
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize); fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

View file

@ -19,10 +19,12 @@ extern "C" {
#include "server/search/doc_index.h" #include "server/search/doc_index.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/tiered_storage.h" #include "server/tiered_storage.h"
#include "server/tiering/common.h"
#include "server/transaction.h" #include "server/transaction.h"
#include "util/fibers/proactor_base.h" #include "util/fibers/proactor_base.h"
using namespace std; using namespace std;
using namespace ::dfly::tiering::literals;
ABSL_FLAG(float, mem_defrag_threshold, 0.7, ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running " "Minimum percentage of used memory relative to maxmemory cap before running "
@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
"Eviction starts when the free memory (including RSS memory) drops below " "Eviction starts when the free memory (including RSS memory) drops below "
"eviction_memory_budget_threshold * max_memory_limit."); "eviction_memory_budget_threshold * max_memory_limit.");
ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB,
"The threshold of memory to force decommit when memory is under pressure.");
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat); ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
namespace dfly { namespace dfly {
@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0; return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
} }
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/
return goal_bytes;
}
} // namespace } // namespace
__thread EngineShard* EngineShard::shard_ = nullptr; __thread EngineShard* EngineShard::shard_ = nullptr;
@ -358,6 +324,18 @@ bool EngineShard::DefragTaskState::CheckRequired() {
return false; return false;
} }
std::optional<ShardMemUsage> shard_mem_usage;
if (GetFlag(FLAGS_enable_heartbeat_eviction)) {
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
const static double eviction_waste_threshold = 0.05;
if (shard_mem_usage->wasted_mem >
(uint64_t(shard_mem_usage->commited * eviction_waste_threshold))) {
VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
return true;
}
}
const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); const std::size_t global_threshold = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
if (global_threshold > rss_mem_current.load(memory_order_relaxed)) { if (global_threshold > rss_mem_current.load(memory_order_relaxed)) {
return false; return false;
@ -372,11 +350,15 @@ bool EngineShard::DefragTaskState::CheckRequired() {
} }
last_check_time = now; last_check_time = now;
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); if (!shard_mem_usage) {
shard_mem_usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
}
DCHECK(shard_mem_usage.has_value());
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold); const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
if (usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) { if (shard_mem_usage->wasted_mem > (uint64_t(shard_mem_usage->commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << usage; VLOG(1) << "memory issue found for memory " << shard_mem_usage.value();
return true; return true;
} }
@ -811,6 +793,7 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx; DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs(); db_cntx.time_now_ms = GetCurrentTimeMs();
size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
@ -822,6 +805,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (expt->size() > 0) { if (expt->size() > 0) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
deleted_bytes += stats.deleted_bytes;
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed); counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted); counter_[TTL_DELETE].IncBy(stats.deleted);
@ -843,9 +827,75 @@ void EngineShard::RetireExpiredAndEvict() {
<< " bytes. Max eviction per heartbeat: " << " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat); << GetFlag(FLAGS_max_eviction_per_heartbeat);
deleted_bytes += evicted_bytes;
eviction_goal -= std::min(eviction_goal, evicted_bytes); eviction_goal -= std::min(eviction_goal, evicted_bytes);
} }
} }
auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit;
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
deleted_bytes_before_decommit += deleted_bytes;
if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) {
// Decommit with force
deleted_bytes_before_decommit = 0;
ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory);
}
}
size_t EngineShard::CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
LOG_IF_EVERY_N(INFO, goal_bytes > 0, 10)
<< "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
deleted_bytes_before_rss_update -=
std::min(deleted_bytes_before_rss_update,
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
}
global_rss_memory_at_prev_eviction = global_used_rss_memory;
// Try to evict more bytes if we are close to the rss memory limit
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
shard_rss_memory_budget_threshold);
LOG_IF_EVERY_N(INFO, goal_bytes > 0, 10)
<< "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
} }
void EngineShard::CacheStats() { void EngineShard::CacheStats() {

View file

@ -216,6 +216,12 @@ class EngineShard {
void ResetScanState(); void ResetScanState();
}; };
struct EvictionTaskState {
size_t deleted_bytes_before_decommit = 0;
size_t deleted_bytes_before_rss_update = 0;
size_t global_rss_memory_at_prev_eviction = 0;
};
EngineShard(util::ProactorBase* pb, mi_heap_t* heap); EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
// blocks the calling fiber. // blocks the calling fiber.
@ -227,6 +233,9 @@ class EngineShard {
void Heartbeat(); void Heartbeat();
void RetireExpiredAndEvict(); void RetireExpiredAndEvict();
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes();
void CacheStats(); void CacheStats();
// We are running a task that checks whether we need to // We are running a task that checks whether we need to
@ -267,6 +276,7 @@ class EngineShard {
IntentLock shard_lock_; IntentLock shard_lock_;
uint32_t defrag_task_ = 0; uint32_t defrag_task_ = 0;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_; util::fb2::Done fiber_heartbeat_periodic_done_;

View file

@ -6,6 +6,15 @@ from . import dfly_args
from .instance import DflyInstance, DflyInstanceFactory from .instance import DflyInstance, DflyInstanceFactory
def extract_fragmentation_waste(memory_arena):
"""
Extracts the fragmentation waste from the memory arena info.
"""
match = re.search(r"fragmentation waste:\s*([0-9.]+)%", memory_arena)
assert match.group(1) is not None
return float(match.group(1))
@pytest.mark.slow @pytest.mark.slow
@pytest.mark.opt_only @pytest.mark.opt_only
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -176,49 +185,101 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"] assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio @pytest.mark.asyncio
@dfly_args( @pytest.mark.parametrize(
{ "proactor_threads_param, maxmemory_param",
"proactor_threads": 1, [(1, 512 * (1024**2)), (1, 6 * (1024**3)), (4, 6 * (1024**3))],
"cache_mode": "true",
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
) )
async def test_cache_eviction_with_rss_deny_oom( async def test_cache_eviction_with_rss_deny_oom_simple_case(
async_client: aioredis.Redis, df_factory: DflyInstanceFactory,
proactor_threads_param,
maxmemory_param,
): ):
""" """
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
""" """
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()
max_memory = 5 * 1024 * 1024 * 1024 # 5G async_client = df_server.client()
rss_max_memory = int(max_memory * 0.8)
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory
val_size = 1024 * 5 # 5 kb val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size num_keys = data_fill_size // val_size
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size) await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
assert ( assert (
memory_info["used_memory"] < max_memory * 0.9 memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory." ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert ( assert (
memory_info["used_memory_rss"] > rss_max_memory * 0.9 memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
# Get RSS memory after creating new connections # Track eviction stats
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
while memory_info["used_memory_rss"] > rss_max_memory * 0.9: prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while True:
# Wait for some time
await asyncio.sleep(1) await asyncio.sleep(1)
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
logging.info( logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.' f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)}.'
) )
stats_info = await async_client.info("stats") stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1
if evicted_keys_repeat_count > 2:
break
# Wait for some time
await asyncio.sleep(2)
memory_arena = await async_client.execute_command("MEMORY", "ARENA")
fragmentation_waste = extract_fragmentation_waste(memory_arena)
logging.info(f"Memory fragmentation waste: {fragmentation_waste}")
assert fragmentation_waste < 12.0, "Memory fragmentation waste should be less than 12%."
# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
evicted_keys = stats_info["evicted_keys"]
# We may evict slightly more than prev_evicted_keys due to gaps in RSS memory usage
assert (
evicted_keys > 0
and evicted_keys >= prev_evicted_keys
and evicted_keys <= prev_evicted_keys * 1.0015
), "We should not evict more items."