mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: measure rss memory and track its peak (#2054)
Up until know we did not have cached rss metric in the process. This PR consolidates caching of all values together inside the EngineShard periodic fiber code. Also, we know expose rss_mem_current that can be used internally for identifying memory pressuring periods during the process run. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
d9cb7453fb
commit
15f54be762
4 changed files with 45 additions and 31 deletions
|
@ -64,6 +64,9 @@ string_view KeyLockArgs::GetLockKey(string_view key) {
|
|||
|
||||
atomic_uint64_t used_mem_peak(0);
|
||||
atomic_uint64_t used_mem_current(0);
|
||||
atomic_uint64_t rss_mem_current(0);
|
||||
atomic_uint64_t rss_mem_peak(0);
|
||||
|
||||
unsigned kernel_version = 0;
|
||||
size_t max_memory_limit = 0;
|
||||
|
||||
|
|
|
@ -175,6 +175,9 @@ const char* RdbTypeName(unsigned type);
|
|||
// Cached values, updated frequently to represent the correct state of the system.
|
||||
extern std::atomic_uint64_t used_mem_peak;
|
||||
extern std::atomic_uint64_t used_mem_current;
|
||||
extern std::atomic_uint64_t rss_mem_current;
|
||||
extern std::atomic_uint64_t rss_mem_peak;
|
||||
|
||||
extern size_t max_memory_limit;
|
||||
|
||||
// malloc memory stats.
|
||||
|
|
|
@ -13,6 +13,7 @@ extern "C" {
|
|||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "io/proc_reader.h"
|
||||
#include "server/blocking_controller.h"
|
||||
#include "server/search/doc_index.h"
|
||||
#include "server/server_state.h"
|
||||
|
@ -621,12 +622,46 @@ void EngineShard::Heartbeat() {
|
|||
}
|
||||
|
||||
void EngineShard::RunPeriodic(std::chrono::milliseconds period_ms) {
|
||||
bool runs_global_periodic = (shard_id() == 0); // Only shard 0 runs global periodic.
|
||||
unsigned global_count = 0;
|
||||
int64_t last_stats_time = time(nullptr);
|
||||
|
||||
while (true) {
|
||||
Heartbeat();
|
||||
if (fiber_periodic_done_.WaitFor(period_ms)) {
|
||||
VLOG(2) << "finished running engine shard periodic task";
|
||||
return;
|
||||
}
|
||||
|
||||
if (runs_global_periodic) {
|
||||
++global_count;
|
||||
|
||||
// Every 8 runs, update the global stats.
|
||||
if (global_count % 8 == 0) {
|
||||
uint64_t sum = 0;
|
||||
const auto& stats = EngineShardSet::GetCachedStats();
|
||||
for (const auto& s : stats)
|
||||
sum += s.used_memory.load(memory_order_relaxed);
|
||||
|
||||
used_mem_current.store(sum, memory_order_relaxed);
|
||||
|
||||
// Single writer, so no races.
|
||||
if (sum > used_mem_peak.load(memory_order_relaxed))
|
||||
used_mem_peak.store(sum, memory_order_relaxed);
|
||||
|
||||
int64_t cur_time = time(nullptr);
|
||||
if (cur_time != last_stats_time) {
|
||||
last_stats_time = cur_time;
|
||||
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
|
||||
if (sdata_res) {
|
||||
size_t total_rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
|
||||
rss_mem_current.store(total_rss, memory_order_relaxed);
|
||||
if (rss_mem_peak.load(memory_order_relaxed) < total_rss)
|
||||
rss_mem_peak.store(total_rss, memory_order_relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -507,27 +507,6 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
|
|||
fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads)));
|
||||
}
|
||||
|
||||
// Unlike EngineShard::Heartbeat that runs independently in each shard thread,
|
||||
// this callback runs in a single thread and it aggregates globally stats from all the shards.
|
||||
auto cache_cb = [] {
|
||||
uint64_t sum = 0;
|
||||
const auto& stats = EngineShardSet::GetCachedStats();
|
||||
for (const auto& s : stats)
|
||||
sum += s.used_memory.load(memory_order_relaxed);
|
||||
|
||||
used_mem_current.store(sum, memory_order_relaxed);
|
||||
|
||||
// Single writer, so no races.
|
||||
if (sum > used_mem_peak.load(memory_order_relaxed))
|
||||
used_mem_peak.store(sum, memory_order_relaxed);
|
||||
};
|
||||
|
||||
uint32_t cache_hz = max(GetFlag(FLAGS_hz) / 10, 1u);
|
||||
uint32_t period_ms = max(1u, 1000 / cache_hz);
|
||||
|
||||
stats_caching_task_ =
|
||||
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });
|
||||
|
||||
string flag_dir = GetFlag(FLAGS_dir);
|
||||
if (IsCloudPath(flag_dir)) {
|
||||
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
|
||||
|
@ -1438,20 +1417,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (should_enter("MEMORY")) {
|
||||
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
|
||||
|
||||
append("used_memory", m.heap_used_bytes);
|
||||
append("used_memory_human", HumanReadableNumBytes(m.heap_used_bytes));
|
||||
append("used_memory_peak", used_mem_peak.load(memory_order_relaxed));
|
||||
|
||||
if (sdata_res.has_value()) {
|
||||
size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
|
||||
append("used_memory_rss", rss);
|
||||
append("used_memory_rss_human", HumanReadableNumBytes(rss));
|
||||
} else {
|
||||
LOG_FIRST_N(ERROR, 10) << "Error fetching /proc/self/status stats. error "
|
||||
<< sdata_res.error().message();
|
||||
}
|
||||
size_t rss = rss_mem_current.load(memory_order_relaxed);
|
||||
append("used_memory_rss", rss);
|
||||
append("used_memory_rss_human", HumanReadableNumBytes(rss));
|
||||
append("used_memory_peak_rss", rss_mem_peak.load(memory_order_relaxed));
|
||||
|
||||
append("comitted_memory", GetMallocCurrentCommitted());
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue