feat(eviction): Add eviction based on RSS memory

fixes dragonflydb#4011

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2025-03-15 21:26:09 +01:00
parent 93bd52ceb8
commit 3f70d04c29
No known key found for this signature in database
GPG key ID: 92DAADC9F15EE26E
5 changed files with 281 additions and 63 deletions

View file

@ -816,7 +816,22 @@ Usage: dragonfly [FLAGS]
// export MIMALLOC_VERBOSE=1 to see the options before the override. // export MIMALLOC_VERBOSE=1 to see the options before the override.
mi_option_enable(mi_option_show_errors); mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0); mi_option_set(mi_option_max_warnings, 0);
mi_option_enable(mi_option_purge_decommits); mi_option_enable(mi_option_purge_decommits);
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);
mi_option_set(mi_option_purge_delay, 0);
DCHECK(!mi_option_get(mi_option_reset_delay));
/*
mi_option_arena_eager_commit
MIMALLOC_PAGE_RESET=1
MIMALLOC_RESET_DECOMMITS=1
MIMALLOC_ABANDONED_PAGE_RESET=1
MIMALLOC_RESET_DELAY=1
MIMALLOC_ARENA_EAGER_COMMIT=0
*/
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize); fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

View file

@ -19,10 +19,12 @@ extern "C" {
#include "server/search/doc_index.h" #include "server/search/doc_index.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/tiered_storage.h" #include "server/tiered_storage.h"
#include "server/tiering/common.h"
#include "server/transaction.h" #include "server/transaction.h"
#include "util/fibers/proactor_base.h" #include "util/fibers/proactor_base.h"
using namespace std; using namespace std;
using namespace ::dfly::tiering::literals;
ABSL_FLAG(float, mem_defrag_threshold, 0.7, ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running " "Minimum percentage of used memory relative to maxmemory cap before running "
@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
"Eviction starts when the free memory (including RSS memory) drops below " "Eviction starts when the free memory (including RSS memory) drops below "
"eviction_memory_budget_threshold * max_memory_limit."); "eviction_memory_budget_threshold * max_memory_limit.");
ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB,
"The threshold of memory to force decommit when memory is under pressure.");
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat); ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
namespace dfly { namespace dfly {
@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0; return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
} }
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/
return goal_bytes;
}
} // namespace } // namespace
__thread EngineShard* EngineShard::shard_ = nullptr; __thread EngineShard* EngineShard::shard_ = nullptr;
@ -794,6 +760,7 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx; DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs(); db_cntx.time_now_ms = GetCurrentTimeMs();
size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
@ -805,6 +772,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (expt->size() > pt->size() / 4) { if (expt->size() > pt->size() / 4) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
deleted_bytes += stats.deleted_bytes;
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed); counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted); counter_[TTL_DELETE].IncBy(stats.deleted);
@ -820,9 +788,87 @@ void EngineShard::RetireExpiredAndEvict() {
<< " bytes. Max eviction per heartbeat: " << " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat); << GetFlag(FLAGS_max_eviction_per_heartbeat);
deleted_bytes += evicted_bytes;
eviction_goal -= std::min(eviction_goal, evicted_bytes); eviction_goal -= std::min(eviction_goal, evicted_bytes);
} }
} }
if (deleted_bytes > 0) {
// Fast decommit without force
// No force to not preemt during decommit
mi_heap_t* heap = ServerState::tlocal()->data_heap();
mi_heap_collect(heap, false);
}
auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit;
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
deleted_bytes_before_decommit += deleted_bytes;
if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) {
// Decommit with force
deleted_bytes_before_decommit = 0;
ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory);
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
const size_t total_rss = FetchRssMemory(sdata_res.value());
rss_mem_current.store(total_rss, memory_order_relaxed);
}
}
}
size_t EngineShard::CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
DVLOG(2) << "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
deleted_bytes_before_rss_update -=
std::min(deleted_bytes_before_rss_update,
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
}
global_rss_memory_at_prev_eviction = global_used_rss_memory;
// Try to evict more bytes if we are close to the rss memory limit
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
shard_rss_memory_budget_threshold);
LOG_IF(INFO, rss_goal_bytes > 0)
<< "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
} }
void EngineShard::CacheStats() { void EngineShard::CacheStats() {

View file

@ -212,6 +212,12 @@ class EngineShard {
void ResetScanState(); void ResetScanState();
}; };
struct EvictionTaskState {
size_t deleted_bytes_before_decommit = 0;
size_t deleted_bytes_before_rss_update = 0;
size_t global_rss_memory_at_prev_eviction = 0;
};
EngineShard(util::ProactorBase* pb, mi_heap_t* heap); EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
// blocks the calling fiber. // blocks the calling fiber.
@ -223,6 +229,9 @@ class EngineShard {
void Heartbeat(); void Heartbeat();
void RetireExpiredAndEvict(); void RetireExpiredAndEvict();
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes();
void CacheStats(); void CacheStats();
// We are running a task that checks whether we need to // We are running a task that checks whether we need to
@ -263,6 +272,7 @@ class EngineShard {
IntentLock shard_lock_; IntentLock shard_lock_;
uint32_t defrag_task_ = 0; uint32_t defrag_task_ = 0;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_; util::fb2::Done fiber_heartbeat_periodic_done_;

View file

@ -998,9 +998,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos(); uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns); auto memory_stats = etl.GetMemoryUsage(start_ns);
if (memory_stats.used_mem > max_memory_limit || if (memory_stats.used_mem > max_memory_limit) {
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss " DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
<< memory_stats.rss_mem << " ,limit " << max_memory_limit; << memory_stats.rss_mem << " ,limit " << max_memory_limit;
etl.stats.oom_error_cmd_cnt++; etl.stats.oom_error_cmd_cnt++;

View file

@ -176,49 +176,198 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"] assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio @pytest.mark.asyncio
@dfly_args( @pytest.mark.parametrize(
{ "proactor_threads_param, maxmemory_param",
"proactor_threads": 1, [(1, 512 * (1024**2)), (1, 6 * (1024**3)), (4, 6 * (1024**3))],
"cache_mode": "true",
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
) )
async def test_cache_eviction_with_rss_deny_oom( async def test_cache_eviction_with_rss_deny_oom_simple_case(
async_client: aioredis.Redis, df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param
): ):
""" """
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
""" """
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()
max_memory = 5 * 1024 * 1024 * 1024 # 5G async_client = df_server.client()
rss_max_memory = int(max_memory * 0.8)
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory
val_size = 1024 * 5 # 5 kb val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size num_keys = data_fill_size // val_size
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size) await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
assert ( assert (
memory_info["used_memory"] < max_memory * 0.9 memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory." ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert ( assert (
memory_info["used_memory_rss"] > rss_max_memory * 0.9 memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
# Get RSS memory after creating new connections # Track eviction stats
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
while memory_info["used_memory_rss"] > rss_max_memory * 0.9: prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio:
# Wait for some time
await asyncio.sleep(1) await asyncio.sleep(1)
memory_info = await async_client.info("memory") memory_info = await async_client.info("memory")
logging.info( logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.' f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)}.'
) )
stats_info = await async_client.info("stats") stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1
if evicted_keys_repeat_count > 2:
break
stats_info = await async_client.info("stats")
prev_evicted_keys = stats_info["evicted_keys"]
# Wait for some time
await asyncio.sleep(1)
# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items."
assert stats_info["evicted_keys"] > 0, "We should evict some items."
@pytest.mark.asyncio
@pytest.mark.parametrize("proactor_threads_param, maxmemory_param", [(1, 6 * (1024**3))])
async def test_cache_eviction_with_rss_deny_oom_two_waves(
df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()
async_client = df_server.client()
max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)
data_fill_size = [
int((rss_oom_deny_ratio + 0.05) * max_memory),
int((1 - rss_oom_deny_ratio) * max_memory),
] # 85% of max_memory
val_size = 1024 * 5 # 5 kb
for i in range(2):
if i > 0:
await asyncio.sleep(5)
num_keys = data_fill_size[i] // val_size
logging.info(
f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}."
)
await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size)
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert (
memory_info["used_memory"] > rss_eviction_threshold
), "Used memory should be more then rss eviction threshold."
assert (
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
logging.info(
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)
# Track eviction stats
memory_info = await async_client.info("memory")
prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio:
# Wait for some time
await asyncio.sleep(1)
memory_info = await async_client.info("memory")
logging.info(
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1
if i == 0 and evicted_keys_repeat_count > 2:
break
assert evicted_keys_repeat_count < 6
stats_info = await async_client.info("stats")
prev_evicted_keys = stats_info["evicted_keys"]
# Wait for some time
await asyncio.sleep(1)
# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items."
assert stats_info["evicted_keys"] > 0, "We should evict some items."