feat(server): Memory tracker (#2501)

* feat(server): Memory tracker

* PR comments
This commit is contained in:
Shahar Mike 2024-01-30 12:44:07 +02:00 committed by GitHub
parent ad90602bc2
commit b2bdb0f683
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 388 additions and 37 deletions

View file

@ -8,6 +8,9 @@
#include <mimalloc.h>
#include "base/io_buf.h"
#include "base/logging.h"
#include "core/allocation_tracker.h"
#include "facade/cmd_arg_parser.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/engine_shard_set.h"
@ -41,7 +44,7 @@ bool MiArenaVisit(const mi_heap_t* heap, const mi_heap_area_t* area, void* block
return true;
};
std::string MallocStats(bool backing, unsigned tid) {
std::string MallocStatsCb(bool backing, unsigned tid) {
string str;
uint64_t start = absl::GetCurrentTimeNanos();
@ -121,35 +124,12 @@ void MemoryCmd::Run(CmdArgList args) {
}
if (sub_cmd == "MALLOC-STATS") {
uint32_t tid = 0;
bool backing = false;
if (args.size() >= 2) {
ToUpper(&args[1]);
return MallocStats(args);
}
unsigned tid_indx = 1;
if (ArgS(args, tid_indx) == "BACKING") {
++tid_indx;
backing = true;
}
if (args.size() > tid_indx && !absl::SimpleAtoi(ArgS(args, tid_indx), &tid)) {
return cntx_->SendError(kInvalidIntErr);
}
}
if (backing && tid >= shard_set->pool()->size()) {
return cntx_->SendError(
absl::StrCat("Thread id must be less than ", shard_set->pool()->size()));
}
if (!backing && tid >= shard_set->size()) {
return cntx_->SendError(absl::StrCat("Thread id must be less than ", shard_set->size()));
}
string res = shard_set->pool()->at(tid)->AwaitBrief([=] { return MallocStats(backing, tid); });
auto* rb = static_cast<RedisReplyBuilder*>(cntx_->reply_builder());
return rb->SendVerbatimString(res);
if (sub_cmd == "TRACK") {
args.remove_prefix(1);
return Track(args);
}
string err = UnknownSubCmd(sub_cmd, "MEMORY");
@ -270,6 +250,38 @@ void MemoryCmd::Stats() {
}
}
void MemoryCmd::MallocStats(CmdArgList args) {
uint32_t tid = 0;
bool backing = false;
if (args.size() >= 2) {
ToUpper(&args[1]);
unsigned tid_indx = 1;
if (ArgS(args, tid_indx) == "BACKING") {
++tid_indx;
backing = true;
}
if (args.size() > tid_indx && !absl::SimpleAtoi(ArgS(args, tid_indx), &tid)) {
return cntx_->SendError(kInvalidIntErr);
}
}
if (backing && tid >= shard_set->pool()->size()) {
return cntx_->SendError(
absl::StrCat("Thread id must be less than ", shard_set->pool()->size()));
}
if (!backing && tid >= shard_set->size()) {
return cntx_->SendError(absl::StrCat("Thread id must be less than ", shard_set->size()));
}
string res = shard_set->pool()->at(tid)->AwaitBrief([=] { return MallocStatsCb(backing, tid); });
auto* rb = static_cast<RedisReplyBuilder*>(cntx_->reply_builder());
return rb->SendVerbatimString(res);
}
void MemoryCmd::Usage(std::string_view key) {
ShardId sid = Shard(key, shard_set->size());
ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this]() -> ssize_t {
@ -289,4 +301,95 @@ void MemoryCmd::Usage(std::string_view key) {
rb->SendLong(memory_usage);
}
// Allow tracking of memory allocation via `new` and `delete` based on input criteria.
//
// MEMORY TRACK ADD <lower-bound> <upper-bound> <sample-odds>
// - Sets up tracking memory allocations in the (inclusive) range [lower, upper]
// - sample-odds indicates how many of the allocations will be logged, there 0 means none, 1 means
// all, and everything in between is linear
// - There could be at most 4 tracking placed in parallel
//
// MEMORY TRACK REMOVE <lower-bound> <upper-bound>
// - Removes all memory tracking added which match bounds
// - Could remove 0, 1 or more
//
// MEMORY TRACK CLEAR
// - Removes all memory tracking
//
// MEMORY TRACK GET
// - Returns an array with all active tracking
//
// This command is not documented in `MEMORY HELP` because it's meant to be used internally.
void MemoryCmd::Track(CmdArgList args) {
#ifndef DFLY_ENABLE_MEMORY_TRACKING
return cntx_->SendError("MEMORY TRACK must be enabled at build time.");
#endif
CmdArgParser parser(args);
string_view sub_cmd = parser.ToUpper().Next();
if (parser.HasError()) {
return cntx_->SendError(parser.Error()->MakeReply());
}
if (sub_cmd == "ADD") {
auto [lower_bound, upper_bound, odds] = parser.Next<size_t, size_t, double>();
if (parser.HasError()) {
return cntx_->SendError(parser.Error()->MakeReply());
}
atomic_bool error;
shard_set->pool()->Await([&](unsigned index, auto*) {
if (!AllocationTracker::Get().Add(
{.lower_bound = lower_bound, .upper_bound = upper_bound, .sample_odds = odds})) {
error.store(true);
}
});
if (error.load()) {
return cntx_->SendError("Unable to add tracker");
} else {
return cntx_->SendOk();
}
}
if (sub_cmd == "REMOVE") {
auto [lower_bound, upper_bound] = parser.Next<size_t, size_t>();
if (parser.HasError()) {
return cntx_->SendError(parser.Error()->MakeReply());
}
atomic_bool error;
shard_set->pool()->Await([&](unsigned index, auto*) {
if (!AllocationTracker::Get().Remove(lower_bound, upper_bound)) {
error.store(true);
}
});
if (error.load()) {
return cntx_->SendError("Unable to remove tracker");
} else {
return cntx_->SendOk();
}
}
if (sub_cmd == "CLEAR") {
shard_set->pool()->Await([&](unsigned index, auto*) { AllocationTracker::Get().Clear(); });
return cntx_->SendOk();
}
if (sub_cmd == "GET") {
auto ranges = AllocationTracker::Get().GetRanges();
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx_->reply_builder());
rb->StartArray(ranges.size());
for (const auto& range : ranges) {
rb->SendSimpleString(
absl::StrCat(range.lower_bound, ",", range.upper_bound, ",", range.sample_odds));
}
return;
}
return cntx_->SendError(kSyntaxErrType);
}
} // namespace dfly