This commit is contained in:
Stepan Bagritsevich 2025-05-08 13:04:32 +02:00 committed by GitHub
commit db0a6e8dfa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 281 additions and 63 deletions

View file

@ -819,7 +819,22 @@ Usage: dragonfly [FLAGS]
// export MIMALLOC_VERBOSE=1 to see the options before the override.
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
mi_option_enable(mi_option_purge_decommits);
DCHECK(mi_option_get(mi_option_reset_decommits) == 1);
mi_option_set(mi_option_purge_delay, 0);
DCHECK(!mi_option_get(mi_option_reset_delay));
/*
mi_option_arena_eager_commit
MIMALLOC_PAGE_RESET=1
MIMALLOC_RESET_DECOMMITS=1
MIMALLOC_ABANDONED_PAGE_RESET=1
MIMALLOC_RESET_DELAY=1
MIMALLOC_ARENA_EAGER_COMMIT=0
*/
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);

View file

@ -19,10 +19,12 @@ extern "C" {
#include "server/search/doc_index.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/tiering/common.h"
#include "server/transaction.h"
#include "util/fibers/proactor_base.h"
using namespace std;
using namespace ::dfly::tiering::literals;
ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running "
@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1,
"Eviction starts when the free memory (including RSS memory) drops below "
"eviction_memory_budget_threshold * max_memory_limit.");
ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB,
"The threshold of memory to force decommit when memory is under pressure.");
ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat);
namespace dfly {
@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl
return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0;
}
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
// TODO: Eviction due to rss usage is not working well as it causes eviction
// of to many keys untill we finally see decrease in rss. We need to improve
// this logic before we enable it.
/*
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
// We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
memory const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
// Try to evict more bytes if we are close to the rss memory limit
goal_bytes = std::max(
goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory,
shard_rss_memory_budget_threshold));
}
*/
return goal_bytes;
}
} // namespace
__thread EngineShard* EngineShard::shard_ = nullptr;
@ -812,6 +778,7 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
size_t deleted_bytes = 0;
size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0;
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
@ -823,6 +790,7 @@ void EngineShard::RetireExpiredAndEvict() {
if (expt->size() > 0) {
DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target);
deleted_bytes += stats.deleted_bytes;
eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes));
counter_[TTL_TRAVERSE].IncBy(stats.traversed);
counter_[TTL_DELETE].IncBy(stats.deleted);
@ -844,9 +812,87 @@ void EngineShard::RetireExpiredAndEvict() {
<< " bytes. Max eviction per heartbeat: "
<< GetFlag(FLAGS_max_eviction_per_heartbeat);
deleted_bytes += evicted_bytes;
eviction_goal -= std::min(eviction_goal, evicted_bytes);
}
}
if (deleted_bytes > 0) {
// Fast decommit without force
// No force to not preemt during decommit
mi_heap_t* heap = ServerState::tlocal()->data_heap();
mi_heap_collect(heap, false);
}
auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit;
eviction_state_.deleted_bytes_before_rss_update += deleted_bytes;
deleted_bytes_before_decommit += deleted_bytes;
if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) {
// Decommit with force
deleted_bytes_before_decommit = 0;
ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory);
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
if (sdata_res) {
const size_t total_rss = FetchRssMemory(sdata_res.value());
rss_mem_current.store(total_rss, memory_order_relaxed);
}
}
}
size_t EngineShard::CalculateEvictionBytes() {
const size_t shards_count = shard_set->size();
const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold);
const size_t shard_memory_budget_threshold =
size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count;
const size_t global_used_memory = used_mem_current.load(memory_order_relaxed);
// Calculate how many bytes we need to evict on this shard
size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory,
shard_memory_budget_threshold);
DVLOG(2) << "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory
<< ", memory limit: " << max_memory_limit;
// If rss_oom_deny_ratio is set, we should evict depending on rss memory too
const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio;
if (rss_oom_deny_ratio > 0.0) {
const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit);
/* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss
* memory */
const size_t shard_rss_memory_budget_threshold =
size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count;
// Calculate how much rss memory is used by all shards
const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed);
auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction;
auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update;
if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) {
deleted_bytes_before_rss_update -=
std::min(deleted_bytes_before_rss_update,
(global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count);
}
global_rss_memory_at_prev_eviction = global_used_rss_memory;
// Try to evict more bytes if we are close to the rss memory limit
const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard(
max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count,
shard_rss_memory_budget_threshold);
LOG_IF(INFO, rss_goal_bytes > 0)
<< "Rss memory goal bytes: " << rss_goal_bytes
<< ", rss used memory: " << global_used_rss_memory
<< ", rss memory limit: " << max_rss_memory
<< ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update;
goal_bytes = std::max(goal_bytes, rss_goal_bytes);
}
return goal_bytes;
}
void EngineShard::CacheStats() {

View file

@ -219,6 +219,12 @@ class EngineShard {
void ResetScanState();
};
struct EvictionTaskState {
size_t deleted_bytes_before_decommit = 0;
size_t deleted_bytes_before_rss_update = 0;
size_t global_rss_memory_at_prev_eviction = 0;
};
EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
// blocks the calling fiber.
@ -230,6 +236,9 @@ class EngineShard {
void Heartbeat();
void RetireExpiredAndEvict();
/* Calculates the number of bytes to evict based on memory and rss memory usage. */
size_t CalculateEvictionBytes();
void CacheStats();
// We are running a task that checks whether we need to
@ -270,6 +279,7 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t defrag_task_ = 0;
EvictionTaskState eviction_state_; // Used on eviction fiber
util::fb2::Fiber fiber_heartbeat_periodic_;
util::fb2::Done fiber_heartbeat_periodic_done_;

View file

@ -1007,9 +1007,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) {
uint64_t start_ns = absl::GetCurrentTimeNanos();
auto memory_stats = etl.GetMemoryUsage(start_ns);
if (memory_stats.used_mem > max_memory_limit ||
(etl.rss_oom_deny_ratio > 0 &&
memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) {
if (memory_stats.used_mem > max_memory_limit) {
DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss "
<< memory_stats.rss_mem << " ,limit " << max_memory_limit;
etl.stats.oom_error_cmd_cnt++;

View file

@ -176,49 +176,198 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory):
assert rss_before_eval * 1.01 > info["used_memory_rss"]
@pytest.mark.skip("rss eviction disabled")
@pytest.mark.asyncio
@dfly_args(
{
"proactor_threads": 1,
"cache_mode": "true",
"maxmemory": "5gb",
"rss_oom_deny_ratio": 0.8,
"max_eviction_per_heartbeat": 100,
}
@pytest.mark.parametrize(
"proactor_threads_param, maxmemory_param",
[(1, 512 * (1024**2)), (1, 6 * (1024**3)), (4, 6 * (1024**3))],
)
async def test_cache_eviction_with_rss_deny_oom(
async_client: aioredis.Redis,
async def test_cache_eviction_with_rss_deny_oom_simple_case(
df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()
max_memory = 5 * 1024 * 1024 * 1024 # 5G
rss_max_memory = int(max_memory * 0.8)
async_client = df_server.client()
data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory
max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory
val_size = 1024 * 5 # 5 kb
num_keys = data_fill_size // val_size
await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size)
# Test that used memory is less than 90% of max memory
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory."
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert (
memory_info["used_memory_rss"] > rss_max_memory * 0.9
), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)."
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
# Get RSS memory after creating new connections
# Track eviction stats
memory_info = await async_client.info("memory")
while memory_info["used_memory_rss"] > rss_max_memory * 0.9:
prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio:
# Wait for some time
await asyncio.sleep(1)
memory_info = await async_client.info("memory")
logging.info(
f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.'
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)}.'
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1
if evicted_keys_repeat_count > 2:
break
stats_info = await async_client.info("stats")
prev_evicted_keys = stats_info["evicted_keys"]
# Wait for some time
await asyncio.sleep(1)
# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items."
assert stats_info["evicted_keys"] > 0, "We should evict some items."
@pytest.mark.asyncio
@pytest.mark.parametrize("proactor_threads_param, maxmemory_param", [(1, 6 * (1024**3))])
async def test_cache_eviction_with_rss_deny_oom_two_waves(
df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param
):
"""
Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit
"""
df_server = df_factory.create(
proactor_threads=proactor_threads_param,
cache_mode="true",
maxmemory=maxmemory_param,
rss_oom_deny_ratio=0.8,
)
df_server.start()
async_client = df_server.client()
max_memory = maxmemory_param
rss_oom_deny_ratio = 0.8
eviction_memory_budget_threshold = 0.1 # 10% of max_memory
rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)
data_fill_size = [
int((rss_oom_deny_ratio + 0.05) * max_memory),
int((1 - rss_oom_deny_ratio) * max_memory),
] # 85% of max_memory
val_size = 1024 * 5 # 5 kb
for i in range(2):
if i > 0:
await asyncio.sleep(5)
num_keys = data_fill_size[i] // val_size
logging.info(
f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}."
)
await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size)
# Test that used memory is less than 90% of max memory to not to start eviction based on used_memory
memory_info = await async_client.info("memory")
assert (
memory_info["used_memory"] < max_memory * 0.9
), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory."
assert (
memory_info["used_memory"] > rss_eviction_threshold
), "Used memory should be more then rss eviction threshold."
assert (
memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio
), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage."
logging.info(
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)
# Track eviction stats
memory_info = await async_client.info("memory")
prev_evicted_keys = 0
evicted_keys_repeat_count = 0
while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio:
# Wait for some time
await asyncio.sleep(1)
memory_info = await async_client.info("memory")
logging.info(
f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.'
)
stats_info = await async_client.info("stats")
logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.')
# Check if evicted keys are not increasing
if prev_evicted_keys == stats_info["evicted_keys"]:
evicted_keys_repeat_count += 1
else:
prev_evicted_keys = stats_info["evicted_keys"]
evicted_keys_repeat_count = 1
if i == 0 and evicted_keys_repeat_count > 2:
break
assert evicted_keys_repeat_count < 6
stats_info = await async_client.info("stats")
prev_evicted_keys = stats_info["evicted_keys"]
# Wait for some time
await asyncio.sleep(1)
# Assert that no more keys are evicted
memory_info = await async_client.info("memory")
stats_info = await async_client.info("stats")
assert memory_info["used_memory"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert memory_info["used_memory"] < max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold
), "Used memory should be smaller than threshold."
assert memory_info["used_memory_rss"] > max_memory * (
rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08
), "We should not evict all items."
assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items."
assert stats_info["evicted_keys"] > 0, "We should evict some items."