Memory accounting

Bring back application level used-memory tracking.
Use internal mimalloc api for extracting comitted memory stats.
This is much better performance-wise because calling mi_heap_visit_blocks
becomes very slow for larger database sizes.
This commit is contained in:
Roman Gershman 2022-03-11 01:25:01 +02:00
parent 92475dd47a
commit 770fe0fe47
14 changed files with 113 additions and 39 deletions

View file

@ -51,7 +51,6 @@ int main(int argc, char* argv[]) {
mi_option_enable(mi_option_large_os_pages);
mi_option_enable(mi_option_show_errors);
mi_option_set(mi_option_max_warnings, 0);
_mi_options_init();
uring::UringPool pp{1024};

View file

@ -20,6 +20,12 @@ using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers;
namespace {
vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init
} // namespace
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
@ -107,10 +113,12 @@ void EngineShard::DestroyThreadLocal() {
return;
uint32_t index = shard_->db_slice_.shard_id();
mi_heap_t* tlh = shard_->mi_resource_.heap();
shard_->~EngineShard();
mi_free(shard_);
shard_ = nullptr;
CompactObj::InitThreadLocal(nullptr);
mi_heap_delete(tlh);
VLOG(1) << "Shard reset " << index;
}
@ -476,6 +484,7 @@ bool EngineShard::HasResultConverged(TxId notifyid) const {
}
void EngineShard::CacheStats() {
#if 0
mi_heap_t* tlh = mi_resource_.heap();
struct Sum {
size_t used = 0;
@ -493,19 +502,22 @@ void EngineShard::CacheStats() {
DVLOG(1) << "block_size " << block_size << "/" << area->block_size << ", reserved "
<< area->reserved << " comitted " << area->committed << " used: " << area->used;
return true; // continue iteration
};
mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum);
stats_.heap_used_bytes = sum.used;
stats_.heap_comitted_bytes = sum.comitted;
#endif
// mi_heap_visit_blocks(tlh, false /* visit all blocks*/, visit_cb, &sum);
mi_stats_merge();
// stats_.heap_used_bytes = sum.used;
stats_.heap_used_bytes =
mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal();
cached_stats[db_slice_.shard_id()].used_memory.store(stats_.heap_used_bytes,
memory_order_relaxed);
// stats_.heap_comitted_bytes = sum.comitted;
}
void EngineShardSet::Init(uint32_t sz) {
CHECK_EQ(0u, size());
cached_stats.resize(sz);
shard_queue_.resize(sz);
}
@ -515,4 +527,8 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
shard_queue_[es->shard_id()] = es->GetFiberQueue();
}
const vector<EngineShardSet::CachedStats>& EngineShardSet::GetCachedStats() {
return cached_stats;
}
} // namespace dfly

View file

@ -13,8 +13,8 @@ extern "C" {
#include <xxhash.h>
#include "base/string_view_sso.h"
#include "core/tx_queue.h"
#include "core/mi_memory_resource.h"
#include "core/tx_queue.h"
#include "server/db_slice.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h"
@ -25,7 +25,7 @@ namespace dfly {
class EngineShard {
public:
struct Stats {
uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run.
// number of bytes that were allocated by the application (with mi_mallocxxx methods).
@ -35,7 +35,7 @@ class EngineShard {
// number of bytes comitted by the allocator library (i.e. mmapped into physical memory).
//
size_t heap_comitted_bytes = 0;
// size_t heap_comitted_bytes = 0;
};
// EngineShard() is private down below.
@ -184,6 +184,16 @@ class EngineShard {
class EngineShardSet {
public:
struct CachedStats {
std::atomic_uint64_t used_memory;
CachedStats() : used_memory(0) {
}
CachedStats(const CachedStats& o) : used_memory(o.used_memory.load()) {
}
};
explicit EngineShardSet(util::ProactorPool* pp) : pp_(pp) {
}
@ -198,6 +208,8 @@ class EngineShardSet {
void Init(uint32_t size);
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
static const std::vector<CachedStats>& GetCachedStats();
// Uses a shard queue to dispatch. Callback runs in a dedicated fiber.
template <typename F> auto Await(ShardId sid, F&& f) {
return shard_queue_[sid]->Await(std::forward<F>(f));

View file

@ -308,6 +308,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor);
cmd_req.Init(&pp_, {"type"});
}
@ -322,6 +323,7 @@ void Service::Shutdown() {
engine_varz.reset();
request_latency_usec.Shutdown();
// We mark that we are shuttind down.
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
// to shutdown all the runtime components that depend on EngineShard.
@ -331,6 +333,9 @@ void Service::Shutdown() {
cmd_req.Shutdown();
shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
// wait for all the pending callbacks to stop.
boost::this_fiber::sleep_for(10ms);
}
void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) {

View file

@ -7,7 +7,7 @@
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h> // for master_id_ generation.
#include <absl/strings/match.h>
#include <malloc.h>
#include <mimalloc-types.h>
#include <sys/resource.h>
#include <filesystem>
@ -39,6 +39,8 @@ DEFINE_string(requirepass, "", "password for AUTH authentication");
DECLARE_uint32(port);
extern "C" mi_stats_t _mi_stats_main;
namespace dfly {
using namespace std;
@ -70,6 +72,8 @@ error_code CreateDirs(fs::path dir_path) {
return ec;
}
atomic_uint64_t used_mem_peak(0);
} // namespace
ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) {
@ -84,12 +88,29 @@ ServerFamily::~ServerFamily() {
void ServerFamily::Init(util::AcceptServer* acceptor) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
pb_task_ = ess_.pool()->GetNextProactor();
auto cache_cb = [] {
uint64_t sum = 0;
const auto& stats = EngineShardSet::GetCachedStats();
for (const auto& s : stats)
sum += s.used_memory.load(memory_order_relaxed);
// Single writer, so no races.
if (sum > used_mem_peak.load(memory_order_relaxed))
used_mem_peak.store(sum, memory_order_relaxed);
};
task_10ms_ = pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(10, cache_cb); });
}
void ServerFamily::Shutdown() {
VLOG(1) << "ServerFamily::Shutdown";
service_.proactor_pool().GetNextProactor()->Await([this] {
pb_task_->Await([this] {
pb_task_->CancelPeriodic(task_10ms_);
task_10ms_ = 0;
unique_lock lk(replica_of_mu_);
if (replica_) {
replica_->Stop();
@ -321,7 +342,7 @@ Metrics ServerFamily::GetMetrics() const {
result.events += db_stats.events;
EngineShard::Stats shard_stats = shard->stats();
result.heap_comitted_bytes += shard_stats.heap_comitted_bytes;
// result.heap_comitted_bytes += shard_stats.heap_comitted_bytes;
result.heap_used_bytes += shard_stats.heap_used_bytes;
}
};
@ -385,7 +406,7 @@ tcp_port:)";
absl::StrAppend(&info, "used_memory:", m.heap_used_bytes, "\n");
absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes), "\n");
absl::StrAppend(&info, "comitted_memory:", m.heap_comitted_bytes, "\n");
absl::StrAppend(&info, "comitted_memory:", _mi_stats_main.committed.current, "\n");
if (sdata_res.has_value()) {
absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n");
@ -395,13 +416,12 @@ tcp_port:)";
LOG(ERROR) << "Error fetching /proc/self/status stats";
}
// TBD: should be the max of all seen used_memory values.
absl::StrAppend(&info, "used_memory_peak:", -1, "\n");
absl::StrAppend(&info, "used_memory_peak:", used_mem_peak.load(memory_order_relaxed), "\n");
// Blob - all these cases where the key/objects are represented by a single blob allocated on
// heap. For example, strings or intsets. members of lists, sets, zsets etc
// are not accounted for to avoid complex computations. In some cases, when number of members is
// known we approximate their allocations by taking 16 bytes per member.
// are not accounted for to avoid complex computations. In some cases, when number of members
// is known we approximate their allocations by taking 16 bytes per member.
absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n");
absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n");
absl::StrAppend(&info, "num_buckets:", m.db.bucket_count, "\n");
@ -440,9 +460,9 @@ tcp_port:)";
} else {
absl::StrAppend(&info, "role:slave\n");
// it's safe to access replica_ because replica_ is created before etl.is_master set to false
// and cleared after etl.is_master is set to true. And since the code here that checks for
// is_master and copies shared_ptr is atomic, it1 should be correct.
// it's safe to access replica_ because replica_ is created before etl.is_master set to
// false and cleared after etl.is_master is set to true. And since the code here that checks
// for is_master and copies shared_ptr is atomic, it1 should be correct.
auto replica_ptr = replica_;
Replica::Info rinfo = replica_ptr->GetInfo();
absl::StrAppend(&info, "master_host:", rinfo.host, "\n");

View file

@ -77,10 +77,12 @@ class ServerFamily {
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
uint32_t task_10ms_ = 0;
Service& service_;
EngineShardSet& ess_;
util::AcceptServer* acceptor_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;
::boost::fibers::mutex replica_of_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
@ -89,6 +91,7 @@ class ServerFamily {
std::atomic<int64_t> last_save_; // in seconds.
GlobalState global_state_;
time_t start_time_ = 0; // in seconds, epoch time.
};
} // namespace dfly