diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index ad58c6a19..1008c2c45 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -816,7 +816,22 @@ Usage: dragonfly [FLAGS] // export MIMALLOC_VERBOSE=1 to see the options before the override. mi_option_enable(mi_option_show_errors); mi_option_set(mi_option_max_warnings, 0); + mi_option_enable(mi_option_purge_decommits); + DCHECK(mi_option_get(mi_option_reset_decommits) == 1); + + mi_option_set(mi_option_purge_delay, 0); + DCHECK(!mi_option_get(mi_option_reset_delay)); + + /* + mi_option_arena_eager_commit + MIMALLOC_PAGE_RESET=1 + MIMALLOC_RESET_DECOMMITS=1 + MIMALLOC_ABANDONED_PAGE_RESET=1 + MIMALLOC_RESET_DELAY=1 + + MIMALLOC_ARENA_EAGER_COMMIT=0 + */ fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index c7c544185..7822f3804 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -19,10 +19,12 @@ extern "C" { #include "server/search/doc_index.h" #include "server/server_state.h" #include "server/tiered_storage.h" +#include "server/tiering/common.h" #include "server/transaction.h" #include "util/fibers/proactor_base.h" using namespace std; +using namespace ::dfly::tiering::literals; ABSL_FLAG(float, mem_defrag_threshold, 0.7, "Minimum percentage of used memory relative to maxmemory cap before running " @@ -65,6 +67,9 @@ ABSL_FLAG(double, eviction_memory_budget_threshold, 0.1, "Eviction starts when the free memory (including RSS memory) drops below " "eviction_memory_budget_threshold * max_memory_limit."); +ABSL_FLAG(uint64_t, force_decommit_threshold, 8_MB, + "The threshold of memory to force decommit when memory is under pressure."); + ABSL_DECLARE_FLAG(uint32_t, max_eviction_per_heartbeat); namespace dfly { @@ -216,45 +221,6 @@ size_t CalculateHowManyBytesToEvictOnShard(size_t global_memory_limit, size_t gl return shard_budget < shard_memory_threshold ? (shard_memory_threshold - shard_budget) : 0; } -/* Calculates the number of bytes to evict based on memory and rss memory usage. */ -size_t CalculateEvictionBytes() { - const size_t shards_count = shard_set->size(); - const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold); - - const size_t shard_memory_budget_threshold = - size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count; - - const size_t global_used_memory = used_mem_current.load(memory_order_relaxed); - - // Calculate how many bytes we need to evict on this shard - size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory, - shard_memory_budget_threshold); - - // TODO: Eviction due to rss usage is not working well as it causes eviction - // of to many keys untill we finally see decrease in rss. We need to improve - // this logic before we enable it. - /* - const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; - // If rss_oom_deny_ratio is set, we should evict depending on rss memory too - if (rss_oom_deny_ratio > 0.0) { - const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit); - // We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss - memory const size_t shard_rss_memory_budget_threshold = - size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count; - - // Calculate how much rss memory is used by all shards - const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed); - - // Try to evict more bytes if we are close to the rss memory limit - goal_bytes = std::max( - goal_bytes, CalculateHowManyBytesToEvictOnShard(max_rss_memory, global_used_rss_memory, - shard_rss_memory_budget_threshold)); - } - */ - - return goal_bytes; -} - } // namespace __thread EngineShard* EngineShard::shard_ = nullptr; @@ -794,6 +760,7 @@ void EngineShard::RetireExpiredAndEvict() { DbContext db_cntx; db_cntx.time_now_ms = GetCurrentTimeMs(); + size_t deleted_bytes = 0; size_t eviction_goal = GetFlag(FLAGS_enable_heartbeat_eviction) ? CalculateEvictionBytes() : 0; for (unsigned i = 0; i < db_slice.db_array_size(); ++i) { @@ -805,6 +772,7 @@ void EngineShard::RetireExpiredAndEvict() { if (expt->size() > pt->size() / 4) { DbSlice::DeleteExpiredStats stats = db_slice.DeleteExpiredStep(db_cntx, ttl_delete_target); + deleted_bytes += stats.deleted_bytes; eviction_goal -= std::min(eviction_goal, size_t(stats.deleted_bytes)); counter_[TTL_TRAVERSE].IncBy(stats.traversed); counter_[TTL_DELETE].IncBy(stats.deleted); @@ -820,9 +788,87 @@ void EngineShard::RetireExpiredAndEvict() { << " bytes. Max eviction per heartbeat: " << GetFlag(FLAGS_max_eviction_per_heartbeat); + deleted_bytes += evicted_bytes; eviction_goal -= std::min(eviction_goal, evicted_bytes); } } + + if (deleted_bytes > 0) { + // Fast decommit without force + // No force to not preemt during decommit + mi_heap_t* heap = ServerState::tlocal()->data_heap(); + mi_heap_collect(heap, false); + } + + auto& deleted_bytes_before_decommit = eviction_state_.deleted_bytes_before_decommit; + eviction_state_.deleted_bytes_before_rss_update += deleted_bytes; + deleted_bytes_before_decommit += deleted_bytes; + + if (deleted_bytes_before_decommit >= absl::GetFlag(FLAGS_force_decommit_threshold)) { + // Decommit with force + deleted_bytes_before_decommit = 0; + ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); + + io::Result sdata_res = io::ReadStatusInfo(); + if (sdata_res) { + const size_t total_rss = FetchRssMemory(sdata_res.value()); + rss_mem_current.store(total_rss, memory_order_relaxed); + } + } +} + +size_t EngineShard::CalculateEvictionBytes() { + const size_t shards_count = shard_set->size(); + const double eviction_memory_budget_threshold = GetFlag(FLAGS_eviction_memory_budget_threshold); + + const size_t shard_memory_budget_threshold = + size_t(max_memory_limit * eviction_memory_budget_threshold) / shards_count; + + const size_t global_used_memory = used_mem_current.load(memory_order_relaxed); + + // Calculate how many bytes we need to evict on this shard + size_t goal_bytes = CalculateHowManyBytesToEvictOnShard(max_memory_limit, global_used_memory, + shard_memory_budget_threshold); + + DVLOG(2) << "Memory goal bytes: " << goal_bytes << ", used memory: " << global_used_memory + << ", memory limit: " << max_memory_limit; + + // If rss_oom_deny_ratio is set, we should evict depending on rss memory too + const double rss_oom_deny_ratio = ServerState::tlocal()->rss_oom_deny_ratio; + if (rss_oom_deny_ratio > 0.0) { + const size_t max_rss_memory = size_t(rss_oom_deny_ratio * max_memory_limit); + /* We start eviction when we have less than eviction_memory_budget_threshold * 100% of free rss + * memory */ + const size_t shard_rss_memory_budget_threshold = + size_t(max_rss_memory * eviction_memory_budget_threshold) / shards_count; + + // Calculate how much rss memory is used by all shards + const size_t global_used_rss_memory = rss_mem_current.load(memory_order_relaxed); + + auto& global_rss_memory_at_prev_eviction = eviction_state_.global_rss_memory_at_prev_eviction; + auto& deleted_bytes_before_rss_update = eviction_state_.deleted_bytes_before_rss_update; + if (global_used_rss_memory < eviction_state_.global_rss_memory_at_prev_eviction) { + deleted_bytes_before_rss_update -= + std::min(deleted_bytes_before_rss_update, + (global_rss_memory_at_prev_eviction - global_used_rss_memory) / shards_count); + } + + global_rss_memory_at_prev_eviction = global_used_rss_memory; + + // Try to evict more bytes if we are close to the rss memory limit + const size_t rss_goal_bytes = CalculateHowManyBytesToEvictOnShard( + max_rss_memory, global_used_rss_memory - deleted_bytes_before_rss_update * shards_count, + shard_rss_memory_budget_threshold); + + LOG_IF(INFO, rss_goal_bytes > 0) + << "Rss memory goal bytes: " << rss_goal_bytes + << ", rss used memory: " << global_used_rss_memory + << ", rss memory limit: " << max_rss_memory + << ", deleted_bytes_before_rss_update: " << deleted_bytes_before_rss_update; + + goal_bytes = std::max(goal_bytes, rss_goal_bytes); + } + return goal_bytes; } void EngineShard::CacheStats() { diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 05c918a93..146d7f90f 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -212,6 +212,12 @@ class EngineShard { void ResetScanState(); }; + struct EvictionTaskState { + size_t deleted_bytes_before_decommit = 0; + size_t deleted_bytes_before_rss_update = 0; + size_t global_rss_memory_at_prev_eviction = 0; + }; + EngineShard(util::ProactorBase* pb, mi_heap_t* heap); // blocks the calling fiber. @@ -223,6 +229,9 @@ class EngineShard { void Heartbeat(); void RetireExpiredAndEvict(); + /* Calculates the number of bytes to evict based on memory and rss memory usage. */ + size_t CalculateEvictionBytes(); + void CacheStats(); // We are running a task that checks whether we need to @@ -263,6 +272,7 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; + EvictionTaskState eviction_state_; // Used on eviction fiber util::fb2::Fiber fiber_heartbeat_periodic_; util::fb2::Done fiber_heartbeat_periodic_done_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 09f7ac827..afe41ab8d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -998,9 +998,7 @@ bool ShouldDenyOnOOM(const CommandId* cid) { uint64_t start_ns = absl::GetCurrentTimeNanos(); auto memory_stats = etl.GetMemoryUsage(start_ns); - if (memory_stats.used_mem > max_memory_limit || - (etl.rss_oom_deny_ratio > 0 && - memory_stats.rss_mem > (max_memory_limit * etl.rss_oom_deny_ratio))) { + if (memory_stats.used_mem > max_memory_limit) { DLOG(WARNING) << "Out of memory, used " << memory_stats.used_mem << " ,rss " << memory_stats.rss_mem << " ,limit " << max_memory_limit; etl.stats.oom_error_cmd_cnt++; diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index d871371df..5b15bc1e4 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -176,49 +176,198 @@ async def test_eval_with_oom(df_factory: DflyInstanceFactory): assert rss_before_eval * 1.01 > info["used_memory_rss"] -@pytest.mark.skip("rss eviction disabled") @pytest.mark.asyncio -@dfly_args( - { - "proactor_threads": 1, - "cache_mode": "true", - "maxmemory": "5gb", - "rss_oom_deny_ratio": 0.8, - "max_eviction_per_heartbeat": 100, - } +@pytest.mark.parametrize( + "proactor_threads_param, maxmemory_param", + [(1, 512 * (1024**2)), (1, 6 * (1024**3)), (4, 6 * (1024**3))], ) -async def test_cache_eviction_with_rss_deny_oom( - async_client: aioredis.Redis, +async def test_cache_eviction_with_rss_deny_oom_simple_case( + df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param ): """ Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit """ + df_server = df_factory.create( + proactor_threads=proactor_threads_param, + cache_mode="true", + maxmemory=maxmemory_param, + rss_oom_deny_ratio=0.8, + ) + df_server.start() - max_memory = 5 * 1024 * 1024 * 1024 # 5G - rss_max_memory = int(max_memory * 0.8) + async_client = df_server.client() - data_fill_size = int(0.9 * rss_max_memory) # 95% of rss_max_memory + max_memory = maxmemory_param + rss_oom_deny_ratio = 0.8 + eviction_memory_budget_threshold = 0.1 # 10% of max_memory + + data_fill_size = int((rss_oom_deny_ratio + 0.05) * max_memory) # 85% of max_memory val_size = 1024 * 5 # 5 kb num_keys = data_fill_size // val_size await async_client.execute_command("DEBUG", "POPULATE", num_keys, "key", val_size) - # Test that used memory is less than 90% of max memory + + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory memory_info = await async_client.info("memory") assert ( memory_info["used_memory"] < max_memory * 0.9 - ), "Used memory should be less than 90% of max memory." + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." assert ( - memory_info["used_memory_rss"] > rss_max_memory * 0.9 - ), "RSS memory should be less than 90% of rss max memory (max_memory * rss_oom_deny_ratio)." + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." - # Get RSS memory after creating new connections + # Track eviction stats memory_info = await async_client.info("memory") - while memory_info["used_memory_rss"] > rss_max_memory * 0.9: + prev_evicted_keys = 0 + evicted_keys_repeat_count = 0 + while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio: + # Wait for some time await asyncio.sleep(1) + memory_info = await async_client.info("memory") logging.info( - f'Current rss: {memory_info["used_memory_rss"]}. rss eviction threshold: {rss_max_memory * 0.9}.' + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold)}.' ) + stats_info = await async_client.info("stats") logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + # Check if evicted keys are not increasing + if prev_evicted_keys == stats_info["evicted_keys"]: + evicted_keys_repeat_count += 1 + else: + prev_evicted_keys = stats_info["evicted_keys"] + evicted_keys_repeat_count = 1 + + if evicted_keys_repeat_count > 2: + break + + stats_info = await async_client.info("stats") + prev_evicted_keys = stats_info["evicted_keys"] + + # Wait for some time + await asyncio.sleep(1) + + # Assert that no more keys are evicted + memory_info = await async_client.info("memory") + stats_info = await async_client.info("stats") + + assert memory_info["used_memory"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08 + ), "We should not evict all items." + assert memory_info["used_memory"] < max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold + ), "Used memory should be smaller than threshold." + assert memory_info["used_memory_rss"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08 + ), "We should not evict all items." + assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items." + assert stats_info["evicted_keys"] > 0, "We should evict some items." + + +@pytest.mark.asyncio +@pytest.mark.parametrize("proactor_threads_param, maxmemory_param", [(1, 6 * (1024**3))]) +async def test_cache_eviction_with_rss_deny_oom_two_waves( + df_factory: DflyInstanceFactory, proactor_threads_param, maxmemory_param +): + """ + Test to verify that cache eviction is triggered even if used memory is small but rss memory is above limit + """ + df_server = df_factory.create( + proactor_threads=proactor_threads_param, + cache_mode="true", + maxmemory=maxmemory_param, + rss_oom_deny_ratio=0.8, + ) + df_server.start() + + async_client = df_server.client() + + max_memory = maxmemory_param + rss_oom_deny_ratio = 0.8 + eviction_memory_budget_threshold = 0.1 # 10% of max_memory + + rss_eviction_threshold = max_memory * (rss_oom_deny_ratio - eviction_memory_budget_threshold) + + data_fill_size = [ + int((rss_oom_deny_ratio + 0.05) * max_memory), + int((1 - rss_oom_deny_ratio) * max_memory), + ] # 85% of max_memory + + val_size = 1024 * 5 # 5 kb + for i in range(2): + if i > 0: + await asyncio.sleep(5) + + num_keys = data_fill_size[i] // val_size + logging.info( + f"Populating data for wave {i}. Data fill size: {data_fill_size[i]}. Number of keys: {num_keys}." + ) + await async_client.execute_command("DEBUG", "POPULATE", num_keys, f"key{i}", val_size) + + # Test that used memory is less than 90% of max memory to not to start eviction based on used_memory + memory_info = await async_client.info("memory") + assert ( + memory_info["used_memory"] < max_memory * 0.9 + ), "Used memory should be less than 90% of max memory to not to start eviction based on used_memory." + assert ( + memory_info["used_memory"] > rss_eviction_threshold + ), "Used memory should be more then rss eviction threshold." + assert ( + memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio + ), "Used RSS memory should be more than 80% of rss max memory (max_memory * rss_oom_deny_ratio) to start eviction based on rss memory usage." + + logging.info( + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' + ) + + # Track eviction stats + memory_info = await async_client.info("memory") + prev_evicted_keys = 0 + evicted_keys_repeat_count = 0 + while memory_info["used_memory_rss"] > max_memory * rss_oom_deny_ratio: + # Wait for some time + await asyncio.sleep(1) + + memory_info = await async_client.info("memory") + logging.info( + f'Current used memory: {memory_info["used_memory"]}, current used rss: {memory_info["used_memory_rss"]}, rss eviction threshold: {rss_eviction_threshold}.' + ) + + stats_info = await async_client.info("stats") + logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + # Check if evicted keys are not increasing + if prev_evicted_keys == stats_info["evicted_keys"]: + evicted_keys_repeat_count += 1 + else: + prev_evicted_keys = stats_info["evicted_keys"] + evicted_keys_repeat_count = 1 + + if i == 0 and evicted_keys_repeat_count > 2: + break + + assert evicted_keys_repeat_count < 6 + + stats_info = await async_client.info("stats") + prev_evicted_keys = stats_info["evicted_keys"] + + # Wait for some time + await asyncio.sleep(1) + + # Assert that no more keys are evicted + memory_info = await async_client.info("memory") + stats_info = await async_client.info("stats") + + assert memory_info["used_memory"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08 + ), "We should not evict all items." + assert memory_info["used_memory"] < max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold + ), "Used memory should be smaller than threshold." + assert memory_info["used_memory_rss"] > max_memory * ( + rss_oom_deny_ratio - eviction_memory_budget_threshold - 0.08 + ), "We should not evict all items." + assert stats_info["evicted_keys"] == prev_evicted_keys, "We should not evict more items." + assert stats_info["evicted_keys"] > 0, "We should evict some items."