From 15f54be762c6216a86d4d82d5815099eec43f051 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 22 Oct 2023 22:01:40 +0300 Subject: [PATCH] chore: measure rss memory and track its peak (#2054) Up until know we did not have cached rss metric in the process. This PR consolidates caching of all values together inside the EngineShard periodic fiber code. Also, we know expose rss_mem_current that can be used internally for identifying memory pressuring periods during the process run. Signed-off-by: Roman Gershman --- src/server/common.cc | 3 +++ src/server/common.h | 3 +++ src/server/engine_shard_set.cc | 35 ++++++++++++++++++++++++++++++++++ src/server/server_family.cc | 35 ++++------------------------------ 4 files changed, 45 insertions(+), 31 deletions(-) diff --git a/src/server/common.cc b/src/server/common.cc index d9fe428bb..e33ade8dc 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -64,6 +64,9 @@ string_view KeyLockArgs::GetLockKey(string_view key) { atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_current(0); +atomic_uint64_t rss_mem_current(0); +atomic_uint64_t rss_mem_peak(0); + unsigned kernel_version = 0; size_t max_memory_limit = 0; diff --git a/src/server/common.h b/src/server/common.h index 575780f30..5a95f99b4 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -175,6 +175,9 @@ const char* RdbTypeName(unsigned type); // Cached values, updated frequently to represent the correct state of the system. extern std::atomic_uint64_t used_mem_peak; extern std::atomic_uint64_t used_mem_current; +extern std::atomic_uint64_t rss_mem_current; +extern std::atomic_uint64_t rss_mem_peak; + extern size_t max_memory_limit; // malloc memory stats. diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 481d11981..ef506e3a2 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -13,6 +13,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "io/proc_reader.h" #include "server/blocking_controller.h" #include "server/search/doc_index.h" #include "server/server_state.h" @@ -621,12 +622,46 @@ void EngineShard::Heartbeat() { } void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { + bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic. + unsigned global_count = 0; + int64_t last_stats_time = time(nullptr); + while (true) { Heartbeat(); if (fiber_periodic_done_.WaitFor(period_ms)) { VLOG(2) << "finished running engine shard periodic task"; return; } + + if (runs_global_periodic) { + ++global_count; + + // Every 8 runs, update the global stats. + if (global_count % 8 == 0) { + uint64_t sum = 0; + const auto& stats = EngineShardSet::GetCachedStats(); + for (const auto& s : stats) + sum += s.used_memory.load(memory_order_relaxed); + + used_mem_current.store(sum, memory_order_relaxed); + + // Single writer, so no races. + if (sum > used_mem_peak.load(memory_order_relaxed)) + used_mem_peak.store(sum, memory_order_relaxed); + + int64_t cur_time = time(nullptr); + if (cur_time != last_stats_time) { + last_stats_time = cur_time; + io::Result sdata_res = io::ReadStatusInfo(); + if (sdata_res) { + size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; + rss_mem_current.store(total_rss, memory_order_relaxed); + if (rss_mem_peak.load(memory_order_relaxed) < total_rss) + rss_mem_peak.store(total_rss, memory_order_relaxed); + } + } + } + } } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 980311c4a..1d3e58528 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -507,27 +507,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector used_mem_peak.load(memory_order_relaxed)) - used_mem_peak.store(sum, memory_order_relaxed); - }; - - uint32_t cache_hz = max(GetFlag(FLAGS_hz) / 10, 1u); - uint32_t period_ms = max(1u, 1000 / cache_hz); - - stats_caching_task_ = - pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); }); - string flag_dir = GetFlag(FLAGS_dir); if (IsCloudPath(flag_dir)) { shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); }); @@ -1438,20 +1417,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("MEMORY")) { - io::Result sdata_res = io::ReadStatusInfo(); - append("used_memory", m.heap_used_bytes); append("used_memory_human", HumanReadableNumBytes(m.heap_used_bytes)); append("used_memory_peak", used_mem_peak.load(memory_order_relaxed)); - if (sdata_res.has_value()) { - size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; - append("used_memory_rss", rss); - append("used_memory_rss_human", HumanReadableNumBytes(rss)); - } else { - LOG_FIRST_N(ERROR, 10) << "Error fetching /proc/self/status stats. error " - << sdata_res.error().message(); - } + size_t rss = rss_mem_current.load(memory_order_relaxed); + append("used_memory_rss", rss); + append("used_memory_rss_human", HumanReadableNumBytes(rss)); + append("used_memory_peak_rss", rss_mem_peak.load(memory_order_relaxed)); append("comitted_memory", GetMallocCurrentCommitted());