From 129ff0b0f735482bb89e05c5fe2720cad2e9f25f Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Sun, 6 Oct 2024 08:19:24 +0300 Subject: [PATCH] chore: run memory decommit after snapshot load/save (#3828) Sometimes for large values during snapshot loading/saving we allocate a lot of extra memory. For that, we might need to manually run memory decommit for mimalloc to release memory pages back to the OS. This PR addresses that by manually running memory decommit after each shard finishes loading or saving a snapshot. --------- Signed-off-by: kostas --- src/server/memory_cmd.cc | 6 ++--- src/server/rdb_load.cc | 6 +++++ src/server/rdb_save.cc | 4 ++++ src/server/rdb_test.cc | 48 ++++++++++++--------------------------- src/server/replica.cc | 3 +++ src/server/server_state.h | 7 +++++- 6 files changed, 36 insertions(+), 38 deletions(-) diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index b3b70143c..7e590267b 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -144,10 +144,8 @@ void MemoryCmd::Run(CmdArgList args) { } if (sub_cmd == "DECOMMIT") { - shard_set->pool()->AwaitBrief([](unsigned, auto* pb) { - ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap | - ServerState::kGlibcmalloc); - }); + shard_set->pool()->AwaitBrief( + [](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); }); return cntx_->SendSimpleString("OK"); } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 97b760b3b..1e182374e 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2105,6 +2105,12 @@ RdbLoader::~RdbLoader() { break; delete item; } + + // Decommit local memory. + // We create an RdbLoader for each thread, so each one will Decommit for itself after + // full sync ends (since we explicitly reset the RdbLoader). + auto* tlocal = ServerState::tlocal(); + tlocal->DecommitMemory(ServerState::kAllMemory); } error_code RdbLoader::Load(io::Source* src) { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 267ca4328..5885cd4e8 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1406,6 +1406,10 @@ RdbSaver::RdbSaver(::io::Sink* sink, SaveMode save_mode, bool align_writes) { } RdbSaver::~RdbSaver() { + // Decommit local memory. + // We create an RdbSaver for each thread, so each one will Decommit for itself. + auto* tlocal = ServerState::tlocal(); + tlocal->DecommitMemory(ServerState::kAllMemory); } void RdbSaver::StartSnapshotInShard(bool stream_journal, const Cancellation* cll, diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 948fda693..cd45662c0 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -41,6 +41,15 @@ class RdbTest : public BaseFamilyTest { void SetUp(); io::FileSource GetSource(string name); + + std::error_code LoadRdb(const string& filename) { + return pp_->at(0)->Await([&] { + io::FileSource fs = GetSource(filename); + + RdbLoader loader(service_.get()); + return loader.Load(&fs); + }); + } }; void RdbTest::SetUp() { @@ -84,19 +93,12 @@ TEST_F(RdbTest, Crc) { } TEST_F(RdbTest, LoadEmpty) { - io::FileSource fs = GetSource("empty.rdb"); - RdbLoader loader(NULL); - auto ec = loader.Load(&fs); + auto ec = LoadRdb("empty.rdb"); CHECK(!ec); } TEST_F(RdbTest, LoadSmall6) { - io::FileSource fs = GetSource("redis6_small.rdb"); - RdbLoader loader{service_.get()}; - - // must run in proactor thread in order to avoid polluting the serverstate - // in the main, testing thread. - auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + auto ec = LoadRdb("redis6_small.rdb"); ASSERT_FALSE(ec) << ec.message(); @@ -128,12 +130,7 @@ TEST_F(RdbTest, LoadSmall6) { } TEST_F(RdbTest, Stream) { - io::FileSource fs = GetSource("redis6_stream.rdb"); - RdbLoader loader{service_.get()}; - - // must run in proactor thread in order to avoid polluting the serverstate - // in the main, testing thread. - auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + auto ec = LoadRdb("redis6_stream.rdb"); ASSERT_FALSE(ec) << ec.message(); @@ -447,12 +444,7 @@ TEST_F(RdbTest, JsonTest) { class HllRdbTest : public RdbTest, public testing::WithParamInterface {}; TEST_P(HllRdbTest, Hll) { - io::FileSource fs = GetSource("hll.rdb"); - RdbLoader loader{service_.get()}; - - // must run in proactor thread in order to avoid polluting the serverstate - // in the main, testing thread. - auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + auto ec = LoadRdb("hll.rdb"); ASSERT_FALSE(ec) << ec.message(); @@ -478,12 +470,7 @@ TEST_F(RdbTest, LoadSmall7) { // 2. A hashtable called my-hset encoded as RDB_TYPE_HASH_LISTPACK // 3. A set called my-set encoded as RDB_TYPE_SET_LISTPACK // 4. A zset called my-zset encoded as RDB_TYPE_ZSET_LISTPACK - io::FileSource fs = GetSource("redis7_small.rdb"); - RdbLoader loader{service_.get()}; - - // must run in proactor thread in order to avoid polluting the serverstate - // in the main, testing thread. - auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + auto ec = LoadRdb("redis7_small.rdb"); ASSERT_FALSE(ec) << ec.message(); @@ -520,12 +507,7 @@ TEST_F(RdbTest, RedisJson) { // JSON.SET json-obj $ // '{"company":"DragonflyDB","product":"Dragonfly","website":"https://dragondlydb.io","years-active":[2021,2022,2023,2024,"and // more!"]}' - io::FileSource fs = GetSource("redis_json.rdb"); - RdbLoader loader{service_.get()}; - - // must run in proactor thread in order to avoid polluting the serverstate - // in the main, testing thread. - auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + auto ec = LoadRdb("redis_json.rdb"); ASSERT_FALSE(ec) << ec.message(); diff --git a/src/server/replica.cc b/src/server/replica.cc index 106b9fcbf..f618164dd 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -157,6 +157,9 @@ void Replica::Stop() { // so we can freely release resources (connections). sync_fb_.JoinIfNeeded(); acks_fb_.JoinIfNeeded(); + for (auto& flow : shard_flows_) { + flow.reset(); + } } void Replica::Pause(bool pause) { diff --git a/src/server/server_state.h b/src/server/server_state.h index 1c678c162..f4b0cbac6 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -282,7 +282,12 @@ class ServerState { // public struct - to allow initialization. // Decommits 3 possible heaps according to the flags. // For decommit_glibcmalloc the heap is global for the process, for others it's specific only // for this thread. - enum { kDataHeap = 1, kBackingHeap = 2, kGlibcmalloc = 4 }; + enum { + kDataHeap = 1, + kBackingHeap = 2, + kGlibcmalloc = 4, + kAllMemory = kDataHeap | kBackingHeap | kGlibcmalloc + }; void DecommitMemory(uint8_t flags); // Exec descriptor frequency count for this thread.