feat(server): Better connection memory tracking (#2205)

This commit is contained in:
Shahar Mike 2023-11-26 14:51:52 +02:00 committed by GitHub
parent d6044edbab
commit d6292ba6fd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 230 additions and 31 deletions

125
src/core/heap_size.h Normal file
View file

@ -0,0 +1,125 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
// This file provides utilities to *estimate* heap memory usage of classes.
// The main function exposed here is HeapSize() (with various overloads).
// It supports simple structs (returns 0), std::string (returns capacity if it's larger than SSO)
// and common containers, such as std::vector, std::deque, absl::flat_hash_map and unique_ptr.
//
// Example usage:
// absl::flat_hash_map<std::string, std::vector<std::unique_ptr<int>>> m;
// ...
// size_t size = HeapSize(m);
#pragma once
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
#include <absl/types/span.h>
#include <deque>
#include <string>
#include <string_view>
#include <type_traits>
#include <vector>
namespace dfly {
namespace heap_size_detail {
template <typename T> constexpr bool StackOnlyType() {
return std::is_trivial_v<T> || std::is_same_v<T, std::string_view>;
}
template <typename T, typename = void> struct has_used_mem : std::false_type {};
template <typename T>
struct has_used_mem<T, std::void_t<decltype(&T::UsedMemory)>> : std::true_type {};
template <typename Container> size_t AccumulateContainer(const Container& c);
} // namespace heap_size_detail
inline size_t HeapSize(const std::string& s) {
constexpr size_t kSmallStringOptSize = 15;
return s.capacity() > kSmallStringOptSize ? s.capacity() : 0UL;
}
template <typename T, std::enable_if_t<heap_size_detail::has_used_mem<T>::value, bool> = true>
size_t HeapSize(const T& t) {
return t.UsedMemory();
}
template <typename T, std::enable_if_t<heap_size_detail::StackOnlyType<T>(), bool> = true>
size_t HeapSize(const T& t) {
return 0;
}
template <typename T> size_t HeapSize(absl::Span<T>) {
return 0;
}
// Declare first, so that we can use these "recursively"
template <typename T> size_t HeapSize(const std::vector<T>& v);
template <typename T> size_t HeapSize(const std::unique_ptr<T>& t);
template <typename T> size_t HeapSize(const std::deque<T>& d);
template <typename T1, typename T2> size_t HeapSize(const std::pair<T1, T2>& p);
template <typename T, size_t N> size_t HeapSize(const absl::InlinedVector<T, N>& v);
template <typename K, typename V> size_t HeapSize(const absl::flat_hash_map<K, V>& m);
template <typename T> size_t HeapSize(const std::unique_ptr<T>& t) {
if (t == nullptr) {
return 0;
} else {
return sizeof(T) + HeapSize(*t);
}
}
template <typename T> size_t HeapSize(const std::vector<T>& v) {
return (v.capacity() * sizeof(T)) + heap_size_detail::AccumulateContainer(v);
}
template <typename T> size_t HeapSize(const std::deque<T>& d) {
return (d.size() * sizeof(T)) + heap_size_detail::AccumulateContainer(d);
}
template <typename T1, typename T2> size_t HeapSize(const std::pair<T1, T2>& p) {
return HeapSize(p.first) + HeapSize(p.second);
}
template <typename T, size_t N> size_t HeapSize(const absl::InlinedVector<T, N>& v) {
size_t size = 0;
if (v.capacity() > N) {
size += v.capacity() * sizeof(T);
}
size += heap_size_detail::AccumulateContainer(v);
return size;
}
template <typename K, typename V> size_t HeapSize(const absl::flat_hash_map<K, V>& m) {
size_t size = m.capacity() * sizeof(typename absl::flat_hash_map<K, V>::value_type);
if constexpr (!heap_size_detail::StackOnlyType<K>() || !heap_size_detail::StackOnlyType<V>()) {
for (const auto& kv : m) {
size += HeapSize(kv);
}
}
return size;
}
namespace heap_size_detail {
template <typename Container> size_t AccumulateContainer(const Container& c) {
size_t size = 0;
if constexpr (!heap_size_detail::StackOnlyType<typename Container::value_type>()) {
for (const auto& e : c) {
size += HeapSize(e);
}
}
return size;
}
} // namespace heap_size_detail
} // namespace dfly

View file

@ -8,6 +8,7 @@
#include <memory>
#include "core/heap_size.h"
#include "facade/acl_commands_def.h"
#include "facade/facade_types.h"
#include "facade/reply_builder.h"
@ -20,8 +21,6 @@ class ConnectionContext {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);
// We won't have any virtual methods, probably. However, since we allocate a derived class,
// we need to declare a virtual d-tor, so we could properly delete it from Connection code.
virtual ~ConnectionContext() {
}
@ -69,6 +68,10 @@ class ConnectionContext {
rbuilder_->SendOk();
}
virtual size_t UsedMemory() const {
return dfly::HeapSize(rbuilder_);
}
// connection state / properties.
bool conn_closing : 1;
bool req_auth : 1;

View file

@ -13,6 +13,7 @@
#include "absl/strings/str_cat.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/heap_size.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/memcache_parser.h"
@ -1315,4 +1316,21 @@ void Connection::RequestAsyncMigration(util::fb2::ProactorBase* dest) {
migration_request_ = dest;
}
Connection::MemoryUsage Connection::GetMemoryUsage() const {
size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) +
dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) +
dfly::HeapSize(memcache_parser_) + dfly::HeapSize(redis_parser_) +
dfly::HeapSize(cc_);
// We add a hardcoded 9k value to accomodate for the part of the Fiber stack that is in use.
// The allocated stack is actually larger (~130k), but only a small fraction of that (9k
// according to our checks) is actually part of the RSS.
mem += 9'000;
return {
.mem = mem,
.buf_mem = io_buf_.GetMemoryUsage(),
};
}
} // namespace facade

View file

@ -208,9 +208,11 @@ class Connection : public util::Connection {
return name_;
}
base::IoBuf::MemoryUsage GetMemoryUsage() const {
return io_buf_.GetMemoryUsage();
}
struct MemoryUsage {
size_t mem = 0;
base::IoBuf::MemoryUsage buf_mem;
};
MemoryUsage GetMemoryUsage() const;
ConnectionContext* cntx();

View file

@ -6,6 +6,7 @@
#include <absl/strings/numbers.h>
#include "base/logging.h"
#include "core/heap_size.h"
namespace facade {
@ -133,10 +134,10 @@ void RedisParser::StashState(RespExpr::Vec* res) {
if (ebuf.empty() && last_stashed_index_ + 1 == cur.size())
break;
if (!ebuf.empty() && !e.has_support) {
BlobPtr ptr(new uint8_t[ebuf.size()]);
memcpy(ptr.get(), ebuf.data(), ebuf.size());
ebuf = Buffer{ptr.get(), ebuf.size()};
buf_stash_.push_back(std::move(ptr));
Blob blob(ebuf.size());
memcpy(blob.data(), ebuf.data(), ebuf.size());
ebuf = Buffer{blob.data(), blob.size()};
buf_stash_.push_back(std::move(blob));
e.has_support = true;
}
}
@ -421,9 +422,9 @@ auto RedisParser::ConsumeBulk(Buffer str) -> Result {
DVLOG(1) << "Extending bulk stash to size " << bulk_str.size();
} else {
DVLOG(1) << "New bulk stash size " << bulk_len_;
std::unique_ptr<uint8_t[]> nb(new uint8_t[bulk_len_]);
memcpy(nb.get(), str.data(), len);
bulk_str = Buffer{nb.get(), len};
vector<uint8_t> nb(bulk_len_);
memcpy(nb.data(), str.data(), len);
bulk_str = Buffer{nb.data(), len};
buf_stash_.emplace_back(move(nb));
is_broken_token_ = true;
cached_expr_->back().has_support = true;
@ -461,13 +462,17 @@ void RedisParser::ExtendLastString(Buffer str) {
Buffer& last_str = get<Buffer>(cached_expr_->back().u);
DCHECK(last_str.data() == buf_stash_.back().get());
DCHECK(last_str.data() == buf_stash_.back().data());
std::unique_ptr<uint8_t[]> nb(new uint8_t[last_str.size() + str.size()]);
memcpy(nb.get(), last_str.data(), last_str.size());
memcpy(nb.get() + last_str.size(), str.data(), str.size());
last_str = RespExpr::Buffer{nb.get(), last_str.size() + str.size()};
vector<uint8_t> nb(last_str.size() + str.size());
memcpy(nb.data(), last_str.data(), last_str.size());
memcpy(nb.data() + last_str.size(), str.data(), str.size());
last_str = RespExpr::Buffer{nb.data(), last_str.size() + str.size()};
buf_stash_.back() = std::move(nb);
}
size_t RedisParser::UsedMemory() const {
return dfly::HeapSize(parse_stack_) + dfly::HeapSize(stash_) + dfly::HeapSize(buf_stash_);
}
} // namespace facade

View file

@ -61,6 +61,8 @@ class RedisParser {
return stash_;
}
size_t UsedMemory() const;
private:
void InitStart(uint8_t prefix_b, RespVec* res);
void StashState(RespVec* res);
@ -98,8 +100,8 @@ class RedisParser {
absl::InlinedVector<std::pair<uint32_t, RespVec*>, 4> parse_stack_;
std::vector<std::unique_ptr<RespVec>> stash_;
using BlobPtr = std::unique_ptr<uint8_t[]>;
std::vector<BlobPtr> buf_stash_;
using Blob = std::vector<uint8_t>;
std::vector<Blob> buf_stash_;
RespVec* cached_expr_ = nullptr;
uint32_t max_arr_len_;

View file

@ -10,6 +10,7 @@
#include "absl/strings/escaping.h"
#include "base/logging.h"
#include "core/heap_size.h"
#include "facade/error.h"
using namespace std;
@ -173,6 +174,10 @@ void SinkReplyBuilder::FlushBatch() {
}
}
size_t SinkReplyBuilder::UsedMemory() const {
return dfly::HeapSize(batch_) + dfly::HeapSize(err_count_);
}
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
}

View file

@ -153,6 +153,8 @@ class SinkReplyBuilder {
void ExpectReply();
bool HasReplied() const;
virtual size_t UsedMemory() const;
protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void SendRawVec(absl::Span<const std::string_view> msg_vec);

View file

@ -53,6 +53,10 @@ class RespExpr {
: std::nullopt;
}
size_t UsedMemory() const {
return 0;
}
static const char* TypeName(Type t);
static void VecToArgList(const Vec& src, CmdArgVec* dest);

View file

@ -5,6 +5,7 @@
#include "server/conn_context.h"
#include "base/logging.h"
#include "core/heap_size.h"
#include "server/acl/acl_commands_def.h"
#include "server/command_registry.h"
#include "server/engine_shard_set.h"
@ -232,6 +233,11 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action,
(*this)->SendLong(count);
}
size_t ConnectionContext::UsedMemory() const {
return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(authed_username) +
dfly::HeapSize(acl_commands);
}
void ConnectionState::ExecInfo::Clear() {
state = EXEC_INACTIVE;
body.clear();

View file

@ -190,6 +190,8 @@ class ConnectionContext : public facade::ConnectionContext {
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
void CancelBlocking(); // Cancel an ongoing blocking transaction if there is one.
size_t UsedMemory() const override;
// Whether this connection is a connection from a replica to its master.
// This flag is true only on replica side, where we need to setup a special ConnectionContext
// instance that helps applying commands coming from master.

View file

@ -11,6 +11,7 @@
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/engine_shard_set.h"
#include "server/main_service.h"
#include "server/server_family.h"
#include "server/server_state.h"
#include "server/snapshot.h"
@ -157,10 +158,12 @@ namespace {
struct ConnectionMemoryUsage {
size_t connection_count = 0;
size_t connection_size = 0;
size_t pipelined_bytes = 0;
base::IoBuf::MemoryUsage connections_memory;
size_t replication_connection_count = 0;
size_t replication_connection_size = 0;
base::IoBuf::MemoryUsage replication_memory;
};
@ -169,15 +172,22 @@ ConnectionMemoryUsage GetConnectionMemoryUsage(ServerFamily* server) {
for (auto* listener : server->GetListeners()) {
listener->TraverseConnections([&](unsigned thread_index, util::Connection* conn) {
if (conn == nullptr) {
return;
}
auto* dfly_conn = static_cast<facade::Connection*>(conn);
auto* cntx = static_cast<ConnectionContext*>(dfly_conn->cntx());
if (cntx->replication_flow == nullptr) {
auto usage = dfly_conn->GetMemoryUsage();
if (cntx == nullptr || cntx->replication_flow == nullptr) {
mems[thread_index].connection_count++;
mems[thread_index].connections_memory += dfly_conn->GetMemoryUsage();
mems[thread_index].connection_size += usage.mem;
mems[thread_index].connections_memory += usage.buf_mem;
} else {
mems[thread_index].replication_connection_count++;
mems[thread_index].replication_memory += dfly_conn->GetMemoryUsage();
mems[thread_index].replication_connection_size += usage.mem;
mems[thread_index].replication_memory += usage.buf_mem;
}
if (cntx != nullptr) {
@ -190,12 +200,21 @@ ConnectionMemoryUsage GetConnectionMemoryUsage(ServerFamily* server) {
});
}
shard_set->pool()->Await([&](unsigned index, auto*) {
mems[index].pipelined_bytes +=
server->service().GetThreadLocalConnectionStats()->pipeline_cmd_cache_bytes;
mems[index].pipelined_bytes +=
server->service().GetThreadLocalConnectionStats()->dispatch_queue_bytes;
});
ConnectionMemoryUsage mem;
for (const auto& m : mems) {
mem.connection_count += m.connection_count;
mem.pipelined_bytes += m.pipelined_bytes;
mem.connection_size += m.connection_size;
mem.connections_memory += m.connections_memory;
mem.replication_connection_count += m.replication_connection_count;
mem.replication_connection_size += m.replication_connection_size;
mem.replication_memory += m.replication_memory;
}
return mem;
@ -228,17 +247,21 @@ void MemoryCmd::Stats() {
// Connection stats, excluding replication connections
stats.push_back({"connections.count", connection_memory.connection_count});
PushMemoryUsageStats(
connection_memory.connections_memory, "connections",
connection_memory.connections_memory.GetTotalSize() + connection_memory.pipelined_bytes,
&stats);
stats.push_back({"connections.direct_bytes", connection_memory.connection_size});
PushMemoryUsageStats(connection_memory.connections_memory, "connections",
connection_memory.connections_memory.GetTotalSize() +
connection_memory.pipelined_bytes + connection_memory.connection_size,
&stats);
stats.push_back({"connections.pipeline_bytes", connection_memory.pipelined_bytes});
// Replication connection stats
stats.push_back(
{"replication.connections_count", connection_memory.replication_connection_count});
stats.push_back({"replication.direct_bytes", connection_memory.replication_connection_size});
PushMemoryUsageStats(connection_memory.replication_memory, "replication",
connection_memory.replication_memory.GetTotalSize(), &stats);
connection_memory.replication_memory.GetTotalSize() +
connection_memory.replication_connection_size,
&stats);
atomic<size_t> serialization_memory = 0;
shard_set->pool()->AwaitFiberOnAll(

View file

@ -13,6 +13,7 @@ extern "C" {
#include <absl/strings/str_cat.h>
#include "base/logging.h"
#include "core/heap_size.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
@ -29,6 +30,10 @@ namespace {
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
} // namespace
size_t SliceSnapshot::DbRecord::size() const {
return HeapSize(value);
}
SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode)
: db_slice_(slice), dest_(dest), compression_mode_(compression_mode) {
db_array_ = slice->databases();

View file

@ -52,10 +52,7 @@ class SliceSnapshot {
uint64_t id;
std::string value;
size_t size() const {
constexpr size_t kSmallStringOptSize = 15;
return value.capacity() > kSmallStringOptSize ? value.capacity() : 0UL;
}
size_t size() const;
};
using RecordChannel = SizeTrackingChannel<DbRecord, base::mpmc_bounded_queue<DbRecord>>;