diff --git a/src/server/common.cc b/src/server/common.cc index d9fe428bb..e33ade8dc 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -64,6 +64,9 @@ string_view KeyLockArgs::GetLockKey(string_view key) { atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_current(0); +atomic_uint64_t rss_mem_current(0); +atomic_uint64_t rss_mem_peak(0); + unsigned kernel_version = 0; size_t max_memory_limit = 0; diff --git a/src/server/common.h b/src/server/common.h index 575780f30..5a95f99b4 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -175,6 +175,9 @@ const char* RdbTypeName(unsigned type); // Cached values, updated frequently to represent the correct state of the system. extern std::atomic_uint64_t used_mem_peak; extern std::atomic_uint64_t used_mem_current; +extern std::atomic_uint64_t rss_mem_current; +extern std::atomic_uint64_t rss_mem_peak; + extern size_t max_memory_limit; // malloc memory stats. diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 481d11981..ef506e3a2 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -13,6 +13,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "io/proc_reader.h" #include "server/blocking_controller.h" #include "server/search/doc_index.h" #include "server/server_state.h" @@ -621,12 +622,46 @@ void EngineShard::Heartbeat() { } void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) { + bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic. + unsigned global_count = 0; + int64_t last_stats_time = time(nullptr); + while (true) { Heartbeat(); if (fiber_periodic_done_.WaitFor(period_ms)) { VLOG(2) << "finished running engine shard periodic task"; return; } + + if (runs_global_periodic) { + ++global_count; + + // Every 8 runs, update the global stats. + if (global_count % 8 == 0) { + uint64_t sum = 0; + const auto& stats = EngineShardSet::GetCachedStats(); + for (const auto& s : stats) + sum += s.used_memory.load(memory_order_relaxed); + + used_mem_current.store(sum, memory_order_relaxed); + + // Single writer, so no races. + if (sum > used_mem_peak.load(memory_order_relaxed)) + used_mem_peak.store(sum, memory_order_relaxed); + + int64_t cur_time = time(nullptr); + if (cur_time != last_stats_time) { + last_stats_time = cur_time; + io::Result sdata_res = io::ReadStatusInfo(); + if (sdata_res) { + size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; + rss_mem_current.store(total_rss, memory_order_relaxed); + if (rss_mem_peak.load(memory_order_relaxed) < total_rss) + rss_mem_peak.store(total_rss, memory_order_relaxed); + } + } + } + } } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 980311c4a..1d3e58528 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -507,27 +507,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector used_mem_peak.load(memory_order_relaxed)) - used_mem_peak.store(sum, memory_order_relaxed); - }; - - uint32_t cache_hz = max(GetFlag(FLAGS_hz) / 10, 1u); - uint32_t period_ms = max(1u, 1000 / cache_hz); - - stats_caching_task_ = - pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); }); - string flag_dir = GetFlag(FLAGS_dir); if (IsCloudPath(flag_dir)) { shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); }); @@ -1438,20 +1417,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("MEMORY")) { - io::Result sdata_res = io::ReadStatusInfo(); - append("used_memory", m.heap_used_bytes); append("used_memory_human", HumanReadableNumBytes(m.heap_used_bytes)); append("used_memory_peak", used_mem_peak.load(memory_order_relaxed)); - if (sdata_res.has_value()) { - size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages; - append("used_memory_rss", rss); - append("used_memory_rss_human", HumanReadableNumBytes(rss)); - } else { - LOG_FIRST_N(ERROR, 10) << "Error fetching /proc/self/status stats. error " - << sdata_res.error().message(); - } + size_t rss = rss_mem_current.load(memory_order_relaxed); + append("used_memory_rss", rss); + append("used_memory_rss_human", HumanReadableNumBytes(rss)); + append("used_memory_peak_rss", rss_mem_peak.load(memory_order_relaxed)); append("comitted_memory", GetMallocCurrentCommitted());