From 116934b0088bf6525c7dde6152a84b4e076795e5 Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 8 Aug 2023 23:26:35 +0300 Subject: [PATCH] feat(server): add oom guard (#1650) 1. add flag maxmemory_ratio 2. When current used memory * maxmemory_ratio > maxmemory_limit denyoom commands will return oom error. Signed-off-by: adi_holden --- src/server/dragonfly_test.cc | 30 ++++++++++++++++++++++++++---- src/server/main_service.cc | 11 +++++++++++ src/server/server_state.cc | 9 +++++++++ src/server/server_state.h | 4 ++++ src/server/test_utils.cc | 1 + 5 files changed, 51 insertions(+), 4 deletions(-) diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 5ce45f92c..2f3d9af37 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -22,6 +22,7 @@ extern "C" { ABSL_DECLARE_FLAG(float, mem_defrag_threshold); ABSL_DECLARE_FLAG(std::vector, rename_command); +ABSL_DECLARE_FLAG(double, oom_deny_ratio); namespace dfly { @@ -99,6 +100,11 @@ class DflyRenameCommandTest : public DflyEngineTest { absl::SetFlag(&FLAGS_rename_command, std::vector({"flushall=myflushall", "flushdb="})); } + + void TearDown() { + absl::SetFlag(&FLAGS_rename_command, std::vector({""})); + DflyEngineTest::TearDown(); + } }; TEST_F(DflyRenameCommandTest, RenameCommand) { @@ -335,10 +341,10 @@ TEST_F(DflyEngineTest, FlushAll) { TEST_F(DflyEngineTest, OOM) { shard_set->TEST_EnableHeartBeat(); - max_memory_limit = 0; + max_memory_limit = 300000; size_t i = 0; RespExpr resp; - for (; i < 5000; i += 3) { + for (; i < 10000; i += 3) { resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2), "bar"}); if (resp != "OK") @@ -376,25 +382,41 @@ TEST_F(DflyEngineTest, OOM) { TEST_F(DflyEngineTest, Bug207) { shard_set->TEST_EnableHeartBeat(); shard_set->TEST_EnableCacheMode(); + absl::FlagSaver fs; + absl::SetFlag(&FLAGS_oom_deny_ratio, 4); - max_memory_limit = 0; + max_memory_limit = 300000; ssize_t i = 0; RespExpr resp; for (; i < 5000; ++i) { resp = Run({"setex", StrCat("key", i), "30", "bar"}); - // we evict some items because 5000 is too much when max_memory_limit is zero. + // we evict some items because 5000 is too much when max_memory_limit is 300000. ASSERT_EQ(resp, "OK"); } + auto evicted_count = [](const string& str) -> size_t { + const string matcher = "evicted_keys:"; + const auto pos = str.find(matcher) + matcher.size(); + const auto sub = str.substr(pos, 1); + return atoi(sub.c_str()); + }; + + resp = Run({"info", "stats"}); + EXPECT_GT(evicted_count(resp.GetString()), 0); + for (; i > 0; --i) { resp = Run({"setex", StrCat("key", i), "30", "bar"}); + ASSERT_EQ(resp, "OK"); } } TEST_F(DflyEngineTest, StickyEviction) { shard_set->TEST_EnableHeartBeat(); shard_set->TEST_EnableCacheMode(); + absl::FlagSaver fs; + absl::SetFlag(&FLAGS_oom_deny_ratio, 4); + max_memory_limit = 300000; string tmp_val(100, '.'); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index abc36b54c..758450bda 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -77,6 +77,9 @@ ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{}, "Limit on maximum-memory that is used by the database. " "0 - means the program will automatically determine its maximum memory usage. " "default: 0"); +ABSL_FLAG(double, oom_deny_ratio, 1.1, + "commands with flag denyoom will return OOM when the ratio between maxmemory and used " + "memory is above this value"); bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) { int64_t val; @@ -890,6 +893,14 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) uint64_t start_ns = ProactorBase::GetMonotonicTimeNs(), end_ns; + if (cid->opt_mask() & CO::DENYOOM) { + int64_t used_memory = etl.GetUsedMemory(start_ns); + double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio); + if (used_memory > (max_memory_limit * oom_deny_ratio)) { + return (*cntx)->SendError(kOutOfMemory); + } + } + // Create command transaction intrusive_ptr dist_trans; diff --git a/src/server/server_state.cc b/src/server/server_state.cc index f32c0c07a..ded063fb0 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -71,6 +71,15 @@ void ServerState::Destroy() { state_ = nullptr; } +uint64_t ServerState::GetUsedMemory(uint64_t now_ns) { + static constexpr uint64_t kCacheEveryNs = 1000; + if (now_ns > used_mem_last_update_ + kCacheEveryNs) { + used_mem_last_update_ = now_ns; + used_mem_cached_ = used_mem_current.load(std::memory_order_relaxed); + } + return used_mem_cached_; +} + bool ServerState::AllowInlineScheduling() const { // We can't allow inline scheduling during a full sync, because then journaling transactions // will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular diff --git a/src/server/server_state.h b/src/server/server_state.h index dc9d0040b..3a378e099 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization. gstate_ = s; } + uint64_t GetUsedMemory(uint64_t now_ns); + bool AllowInlineScheduling() const; // Borrow interpreter from internal manager. Return int with ReturnInterpreter. @@ -226,6 +228,8 @@ class ServerState { // public struct - to allow initialization. absl::flat_hash_map call_latency_histos_; uint32_t thread_index_ = 0; + uint64_t used_mem_cached_ = 0; // thread local cache of used_mem_current + uint64_t used_mem_last_update_ = 0; static __thread ServerState* state_; }; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 74f1d1fbd..13feb72c0 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -167,6 +167,7 @@ void BaseFamilyTest::ResetService() { Service::InitOpts opts; opts.disable_time_update = true; service_->Init(nullptr, {}, opts); + used_mem_current = 0; TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; auto cb = [&](EngineShard* s) { s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); };