mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
feat(server): add oom guard (#1650)
1. add flag maxmemory_ratio 2. When current used memory * maxmemory_ratio > maxmemory_limit denyoom commands will return oom error. Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
7c99d2d111
commit
116934b008
5 changed files with 51 additions and 4 deletions
|
@ -22,6 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
|
ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
|
||||||
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
|
ABSL_DECLARE_FLAG(std::vector<std::string>, rename_command);
|
||||||
|
ABSL_DECLARE_FLAG(double, oom_deny_ratio);
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
|
@ -99,6 +100,11 @@ class DflyRenameCommandTest : public DflyEngineTest {
|
||||||
absl::SetFlag(&FLAGS_rename_command,
|
absl::SetFlag(&FLAGS_rename_command,
|
||||||
std::vector<std::string>({"flushall=myflushall", "flushdb="}));
|
std::vector<std::string>({"flushall=myflushall", "flushdb="}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TearDown() {
|
||||||
|
absl::SetFlag(&FLAGS_rename_command, std::vector<std::string>({""}));
|
||||||
|
DflyEngineTest::TearDown();
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
TEST_F(DflyRenameCommandTest, RenameCommand) {
|
TEST_F(DflyRenameCommandTest, RenameCommand) {
|
||||||
|
@ -335,10 +341,10 @@ TEST_F(DflyEngineTest, FlushAll) {
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, OOM) {
|
TEST_F(DflyEngineTest, OOM) {
|
||||||
shard_set->TEST_EnableHeartBeat();
|
shard_set->TEST_EnableHeartBeat();
|
||||||
max_memory_limit = 0;
|
max_memory_limit = 300000;
|
||||||
size_t i = 0;
|
size_t i = 0;
|
||||||
RespExpr resp;
|
RespExpr resp;
|
||||||
for (; i < 5000; i += 3) {
|
for (; i < 10000; i += 3) {
|
||||||
resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2),
|
resp = Run({"mset", StrCat("key", i), "bar", StrCat("key", i + 1), "bar", StrCat("key", i + 2),
|
||||||
"bar"});
|
"bar"});
|
||||||
if (resp != "OK")
|
if (resp != "OK")
|
||||||
|
@ -376,25 +382,41 @@ TEST_F(DflyEngineTest, OOM) {
|
||||||
TEST_F(DflyEngineTest, Bug207) {
|
TEST_F(DflyEngineTest, Bug207) {
|
||||||
shard_set->TEST_EnableHeartBeat();
|
shard_set->TEST_EnableHeartBeat();
|
||||||
shard_set->TEST_EnableCacheMode();
|
shard_set->TEST_EnableCacheMode();
|
||||||
|
absl::FlagSaver fs;
|
||||||
|
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
|
||||||
|
|
||||||
max_memory_limit = 0;
|
max_memory_limit = 300000;
|
||||||
|
|
||||||
ssize_t i = 0;
|
ssize_t i = 0;
|
||||||
RespExpr resp;
|
RespExpr resp;
|
||||||
for (; i < 5000; ++i) {
|
for (; i < 5000; ++i) {
|
||||||
resp = Run({"setex", StrCat("key", i), "30", "bar"});
|
resp = Run({"setex", StrCat("key", i), "30", "bar"});
|
||||||
// we evict some items because 5000 is too much when max_memory_limit is zero.
|
// we evict some items because 5000 is too much when max_memory_limit is 300000.
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
auto evicted_count = [](const string& str) -> size_t {
|
||||||
|
const string matcher = "evicted_keys:";
|
||||||
|
const auto pos = str.find(matcher) + matcher.size();
|
||||||
|
const auto sub = str.substr(pos, 1);
|
||||||
|
return atoi(sub.c_str());
|
||||||
|
};
|
||||||
|
|
||||||
|
resp = Run({"info", "stats"});
|
||||||
|
EXPECT_GT(evicted_count(resp.GetString()), 0);
|
||||||
|
|
||||||
for (; i > 0; --i) {
|
for (; i > 0; --i) {
|
||||||
resp = Run({"setex", StrCat("key", i), "30", "bar"});
|
resp = Run({"setex", StrCat("key", i), "30", "bar"});
|
||||||
|
ASSERT_EQ(resp, "OK");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, StickyEviction) {
|
TEST_F(DflyEngineTest, StickyEviction) {
|
||||||
shard_set->TEST_EnableHeartBeat();
|
shard_set->TEST_EnableHeartBeat();
|
||||||
shard_set->TEST_EnableCacheMode();
|
shard_set->TEST_EnableCacheMode();
|
||||||
|
absl::FlagSaver fs;
|
||||||
|
absl::SetFlag(&FLAGS_oom_deny_ratio, 4);
|
||||||
|
|
||||||
max_memory_limit = 300000;
|
max_memory_limit = 300000;
|
||||||
|
|
||||||
string tmp_val(100, '.');
|
string tmp_val(100, '.');
|
||||||
|
|
|
@ -77,6 +77,9 @@ ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{},
|
||||||
"Limit on maximum-memory that is used by the database. "
|
"Limit on maximum-memory that is used by the database. "
|
||||||
"0 - means the program will automatically determine its maximum memory usage. "
|
"0 - means the program will automatically determine its maximum memory usage. "
|
||||||
"default: 0");
|
"default: 0");
|
||||||
|
ABSL_FLAG(double, oom_deny_ratio, 1.1,
|
||||||
|
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
|
||||||
|
"memory is above this value");
|
||||||
|
|
||||||
bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) {
|
bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) {
|
||||||
int64_t val;
|
int64_t val;
|
||||||
|
@ -890,6 +893,14 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
|
|
||||||
uint64_t start_ns = ProactorBase::GetMonotonicTimeNs(), end_ns;
|
uint64_t start_ns = ProactorBase::GetMonotonicTimeNs(), end_ns;
|
||||||
|
|
||||||
|
if (cid->opt_mask() & CO::DENYOOM) {
|
||||||
|
int64_t used_memory = etl.GetUsedMemory(start_ns);
|
||||||
|
double oom_deny_ratio = GetFlag(FLAGS_oom_deny_ratio);
|
||||||
|
if (used_memory > (max_memory_limit * oom_deny_ratio)) {
|
||||||
|
return (*cntx)->SendError(kOutOfMemory);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Create command transaction
|
// Create command transaction
|
||||||
intrusive_ptr<Transaction> dist_trans;
|
intrusive_ptr<Transaction> dist_trans;
|
||||||
|
|
||||||
|
|
|
@ -71,6 +71,15 @@ void ServerState::Destroy() {
|
||||||
state_ = nullptr;
|
state_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t ServerState::GetUsedMemory(uint64_t now_ns) {
|
||||||
|
static constexpr uint64_t kCacheEveryNs = 1000;
|
||||||
|
if (now_ns > used_mem_last_update_ + kCacheEveryNs) {
|
||||||
|
used_mem_last_update_ = now_ns;
|
||||||
|
used_mem_cached_ = used_mem_current.load(std::memory_order_relaxed);
|
||||||
|
}
|
||||||
|
return used_mem_cached_;
|
||||||
|
}
|
||||||
|
|
||||||
bool ServerState::AllowInlineScheduling() const {
|
bool ServerState::AllowInlineScheduling() const {
|
||||||
// We can't allow inline scheduling during a full sync, because then journaling transactions
|
// We can't allow inline scheduling during a full sync, because then journaling transactions
|
||||||
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
|
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
|
||||||
|
|
|
@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization.
|
||||||
gstate_ = s;
|
gstate_ = s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t GetUsedMemory(uint64_t now_ns);
|
||||||
|
|
||||||
bool AllowInlineScheduling() const;
|
bool AllowInlineScheduling() const;
|
||||||
|
|
||||||
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
|
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
|
||||||
|
@ -226,6 +228,8 @@ class ServerState { // public struct - to allow initialization.
|
||||||
|
|
||||||
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
||||||
uint32_t thread_index_ = 0;
|
uint32_t thread_index_ = 0;
|
||||||
|
uint64_t used_mem_cached_ = 0; // thread local cache of used_mem_current
|
||||||
|
uint64_t used_mem_last_update_ = 0;
|
||||||
|
|
||||||
static __thread ServerState* state_;
|
static __thread ServerState* state_;
|
||||||
};
|
};
|
||||||
|
|
|
@ -167,6 +167,7 @@ void BaseFamilyTest::ResetService() {
|
||||||
Service::InitOpts opts;
|
Service::InitOpts opts;
|
||||||
opts.disable_time_update = true;
|
opts.disable_time_update = true;
|
||||||
service_->Init(nullptr, {}, opts);
|
service_->Init(nullptr, {}, opts);
|
||||||
|
used_mem_current = 0;
|
||||||
|
|
||||||
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
|
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
|
||||||
auto cb = [&](EngineShard* s) { s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); };
|
auto cb = [&](EngineShard* s) { s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); };
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue