fix(server): Add additional metrics (#1975)

* fix(server): Clean up metrics collection
* feat(server): Replication memory metrics
* fix(server): Limit dispatch queue size

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-10-06 14:16:22 +03:00 committed by GitHub
parent 2d28b48481
commit e84d9a65d8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 307 additions and 172 deletions

View file

@ -43,6 +43,9 @@ ABSL_FLAG(string, admin_bind, "",
ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
"Amount of memory to use for request cache in bytes - per IO thread.");
ABSL_FLAG(uint64_t, pipeline_queue_limit, 1ULL << 27, // 128MB
"Amount of memory to use for storing pipelined commands in bytes - per IO thread");
ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
ABSL_FLAG(uint64_t, pipeline_squash, 0,
@ -91,13 +94,17 @@ bool MatchHttp11Line(string_view line) {
}
constexpr size_t kMinReadSize = 256;
constexpr size_t kMaxDispatchQMemory = 5_MB;
thread_local uint32_t free_req_release_weight = 0;
} // namespace
thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
thread_local Connection::QueueBackpressure Connection::queue_backpressure_;
void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await([this] { return bytes.load(memory_order_relaxed) <= limit; });
}
struct Connection::Shutdown {
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
@ -226,7 +233,6 @@ void Connection::DispatchOperations::operator()(const AclUpdateMessage& msg) {
void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
++stats->async_writes_cnt;
unsigned i = 0;
array<string_view, 4> arr;
if (pub_msg.pattern.empty()) {
@ -272,6 +278,9 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
creation_time_ = time(nullptr);
last_interaction_ = creation_time_;
id_ = next_id.fetch_add(1, memory_order_relaxed);
if (queue_backpressure_.limit == 0)
queue_backpressure_.limit = absl::GetFlag(FLAGS_pipeline_queue_limit);
}
Connection::~Connection() {
@ -847,8 +856,8 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};
uint64_t request_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
uint64_t request_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);
uint64_t prev_epoch = fb2::FiberSwitchEpoch();
@ -874,13 +883,17 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
builder->SetBatchMode(dispatch_q_.size() > 1);
auto recycle = [this, request_cache_limit](MessageHandle msg) {
dispatch_q_bytes_.fetch_sub(msg.UsedMemory(), memory_order_relaxed);
size_t used_mem = msg.UsedMemory();
queue_backpressure_.bytes.fetch_sub(used_mem, memory_order_relaxed);
stats_->dispatch_queue_bytes -= used_mem;
stats_->dispatch_queue_entries--;
// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
dispatch_q_cmds_count_--;
if (stats_->pipeline_cache_capacity < request_cache_limit) {
stats_->pipeline_cache_capacity += (*pipe)->StorageCapacity();
if (stats_->pipeline_cmd_cache_bytes < request_cache_limit) {
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
pipeline_req_pool_.push_back(move(*pipe));
}
}
@ -930,7 +943,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
recycle(move(msg));
}
evc_bp_.notify();
queue_backpressure_.ec.notify();
}
cc_->conn_closing = true;
@ -977,7 +990,7 @@ void Connection::ShrinkPipelinePool() {
if (free_req_release_weight > stats_->num_conns) {
free_req_release_weight = 0;
stats_->pipeline_cache_capacity -= pipeline_req_pool_.back()->StorageCapacity();
stats_->pipeline_cmd_cache_bytes -= pipeline_req_pool_.back()->StorageCapacity();
pipeline_req_pool_.pop_back();
}
}
@ -988,7 +1001,7 @@ Connection::PipelineMessagePtr Connection::GetFromPipelinePool() {
free_req_release_weight = 0; // Reset the release weight.
auto ptr = move(pipeline_req_pool_.back());
stats_->pipeline_cache_capacity -= ptr->StorageCapacity();
stats_->pipeline_cmd_cache_bytes -= ptr->StorageCapacity();
pipeline_req_pool_.pop_back();
return ptr;
}
@ -1053,12 +1066,16 @@ void Connection::SendAsync(MessageHandle msg) {
dispatch_q_.insert(it, std::move(msg));
};
dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed);
size_t used_mem = msg.UsedMemory();
queue_backpressure_.bytes.fetch_add(used_mem, memory_order_relaxed);
stats_->dispatch_queue_entries++;
stats_->dispatch_queue_bytes += used_mem;
if (std::holds_alternative<AclUpdateMessage>(msg.handle)) {
// We need to reorder the queue, since multiple updates might happen before we
// pop the message, invalidating the correct order since we always push at the front
place_in_dispatch_q(std::move(msg));
} else {
dispatch_q_.push_back(std::move(msg));
}
@ -1073,8 +1090,7 @@ void Connection::SendAsync(MessageHandle msg) {
}
void Connection::EnsureAsyncMemoryBudget() {
evc_bp_.await(
[this] { return dispatch_q_bytes_.load(memory_order_relaxed) <= kMaxDispatchQMemory; });
queue_backpressure_.EnsureBelowLimit();
}
std::string Connection::RemoteEndpointStr() const {

View file

@ -133,7 +133,7 @@ class Connection : public util::Connection {
// Add acl update to dispatch queue.
void SendAclUpdateAsync(AclUpdateMessage msg);
// Must be called before Send_Async to ensure the connection dispatch queue is not overfilled.
// Must be called before SendAsync to ensure the connection dispatch queue is not overfilled.
// Blocks until free space is available.
void EnsureAsyncMemoryBudget();
@ -236,9 +236,7 @@ class Connection : public util::Connection {
dfly::EventCount evc_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
std::atomic_uint64_t dispatch_q_bytes_ = 0; // memory usage of all entries
dfly::EventCount evc_bp_; // backpressure for memory limit
size_t dispatch_q_cmds_count_; // how many queued async commands
size_t dispatch_q_cmds_count_; // how many queued async commands
base::IoBuf io_buf_; // used in io loop and parsers
std::unique_ptr<RedisParser> redis_parser_;
@ -275,6 +273,18 @@ class Connection : public util::Connection {
// Aggregated while handling pipelines,
// graudally released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
// Keeps track of total per-thread sizes of dispatch queues to
// limit memory taken up by pipelined / pubsub commands and slow down clients
// producing them to quickly via EnsureAsyncMemoryBudget.
struct QueueBackpressure {
void EnsureBelowLimit(); // block until memory usage is above limit
dfly::EventCount ec;
std::atomic_size_t bytes = 0;
size_t limit = 0;
};
static thread_local QueueBackpressure queue_backpressure_;
};
} // namespace facade

View file

@ -27,17 +27,18 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 136u);
static_assert(kSizeConnStats == 144u);
ADD(read_buf_capacity);
ADD(pipeline_cache_capacity);
ADD(dispatch_queue_entries);
ADD(dispatch_queue_bytes);
ADD(pipeline_cmd_cache_bytes);
ADD(io_read_cnt);
ADD(io_read_bytes);
ADD(io_write_cnt);
ADD(io_write_bytes);
ADD(command_cnt);
ADD(pipelined_cmd_cnt);
ADD(async_writes_cnt);
ADD(conn_received_cnt);
ADD(num_conns);
ADD(num_replicas);
@ -143,8 +144,13 @@ RedisReplyBuilder* ConnectionContext::operator->() {
CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key,
int8_t last_key, int8_t step, uint32_t acl_categories)
: name_(name), opt_mask_(mask), arity_(arity), first_key_(first_key), last_key_(last_key),
step_key_(step), acl_categories_(acl_categories) {
: name_(name),
opt_mask_(mask),
arity_(arity),
first_key_(first_key),
last_key_(last_key),
step_key_(step),
acl_categories_(acl_categories) {
}
uint32_t CommandId::OptCount(uint32_t mask) {

View file

@ -39,18 +39,20 @@ struct CmdArgListFormatter {
struct ConnectionStats {
absl::flat_hash_map<std::string, uint64_t> err_count_map;
size_t read_buf_capacity = 0;
size_t pipeline_cache_capacity = 0;
size_t read_buf_capacity = 0; // total capacity of input buffers
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
size_t pipeline_cmd_cache_bytes = 0;
size_t io_read_cnt = 0;
size_t io_read_bytes = 0;
size_t io_write_cnt = 0;
size_t io_write_bytes = 0;
uint64_t command_cnt = 0;
uint64_t pipelined_cmd_cnt = 0;
// Writes count that happened via DispatchOperations call.
uint64_t async_writes_cnt = 0;
uint64_t conn_received_cnt = 0;
uint32_t num_conns = 0;

View file

@ -680,7 +680,7 @@ shared_ptr<DflyCmd::ReplicaInfo> DflyCmd::GetReplicaInfo(uint32_t sync_id) {
return {};
}
std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() {
std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() const {
std::vector<ReplicaRoleInfo> vec;
unique_lock lk(mu_);
@ -694,6 +694,29 @@ std::vector<ReplicaRoleInfo> DflyCmd::GetReplicasRoleInfo() {
return vec;
}
void DflyCmd::GetReplicationMemoryStats(ReplicationMemoryStats* stats) const {
Mutex stats_mu;
lock_guard lk_main{mu_}; // prevent state changes
auto cb = [this, &stats, &stats_mu](EngineShard* shard) {
lock_guard lk{stats_mu};
for (const auto& [_, info] : replica_infos_) {
lock_guard repl_lk{info->mu};
if (info->flows.empty())
continue;
const auto& flow = info->flows[shard->shard_id()];
if (flow.streamer)
stats->streamer_buf_capacity_bytes_ += flow.streamer->GetTotalBufferCapacities();
if (flow.saver)
stats->full_sync_buf_bytes_ += flow.saver->GetTotalBuffersSize();
}
};
shard_set->RunBlockingInParallel(cb);
}
pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
std::string_view id_str, RedisReplyBuilder* rb) {
unique_lock lk(mu_);
@ -714,6 +737,7 @@ pair<uint32_t, shared_ptr<DflyCmd::ReplicaInfo>> DflyCmd::GetReplicaInfoOrReply(
}
std::map<uint32_t, LSN> DflyCmd::ReplicationLags() const {
DCHECK(!mu_.try_lock()); // expects to be under global lock
if (replica_infos_.empty())
return {};

View file

@ -26,11 +26,13 @@ class ServerFamily;
class RdbSaver;
class JournalStreamer;
struct ReplicaRoleInfo;
struct ReplicationMemoryStats;
// Stores information related to a single flow.
struct FlowInfo {
FlowInfo();
~FlowInfo();
// Shutdown associated socket if its still open.
void TryShutdownSocket();
@ -40,6 +42,7 @@ struct FlowInfo {
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
std::unique_ptr<JournalStreamer> streamer;
std::string eof_token;
DflyVersion version;
std::optional<LSN> start_partial_sync_at;
@ -134,7 +137,9 @@ class DflyCmd {
// Create new sync session.
std::pair<uint32_t, std::shared_ptr<ReplicaInfo>> CreateSyncSession(ConnectionContext* cntx);
std::vector<ReplicaRoleInfo> GetReplicasRoleInfo();
std::vector<ReplicaRoleInfo> GetReplicasRoleInfo() const;
void GetReplicationMemoryStats(ReplicationMemoryStats* out) const;
// Sets metadata.
void SetDflyClientVersion(ConnectionContext* cntx, DflyVersion version);
@ -218,7 +223,7 @@ class DflyCmd {
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;
Mutex mu_; // Guard global operations. See header top for locking levels.
mutable Mutex mu_; // Guard global operations. See header top for locking levels.
};
} // namespace dfly

View file

@ -75,4 +75,9 @@ bool BufferedStreamerBase::IsStopped() {
bool BufferedStreamerBase::IsStalled() {
return buffered_ > max_buffered_cnt_ || producer_buf_.InputLen() > max_buffered_mem_;
}
size_t BufferedStreamerBase::GetTotalBufferCapacities() const {
return consumer_buf_.Capacity() + producer_buf_.Capacity();
}
} // namespace dfly

View file

@ -25,6 +25,10 @@ class BufferedStreamerBase : public io::Sink {
: cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} {
}
public:
size_t GetTotalBufferCapacities() const;
protected:
// Write some data into the internal buffer.
//
// Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small

View file

@ -29,6 +29,8 @@ class JournalStreamer : protected BufferedStreamerBase {
// and manual cleanup.
void Cancel();
using BufferedStreamerBase::GetTotalBufferCapacities;
private:
// Writer fiber that steals buffer contents and writes them to dest.
void WriterFb(io::Sink* dest);

View file

@ -2043,7 +2043,7 @@ VarzValue::Map Service::GetVarzStats() {
Metrics m = server_family_.GetMetrics();
DbStats db_stats;
for (const auto& s : m.db) {
for (const auto& s : m.db_stats) {
db_stats += s;
}

View file

@ -511,9 +511,9 @@ TEST_F(MultiTest, MultiOOO) {
// OOO works in LOCK_AHEAD mode.
int mode = absl::GetFlag(FLAGS_multi_exec_mode);
if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC)
EXPECT_EQ(200, metrics.ooo_tx_transaction_cnt);
EXPECT_EQ(200, metrics.coordinator_stats.ooo_tx_cnt);
else
EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt);
EXPECT_EQ(0, metrics.coordinator_stats.ooo_tx_cnt);
}
// Lua scripts lock their keys ahead and thus can run out of order.
@ -548,8 +548,9 @@ TEST_F(MultiTest, EvalOOO) {
}
auto metrics = GetMetrics();
EXPECT_EQ(1 + 2 * kTimes,
metrics.eval_io_coordination_cnt + metrics.eval_shardlocal_coordination_cnt);
auto sum = metrics.coordinator_stats.eval_io_coordination_cnt +
metrics.coordinator_stats.eval_shardlocal_coordination_cnt;
EXPECT_EQ(1 + 2 * kTimes, sum);
}
// Run MULTI/EXEC commands in parallel, where each command is:
@ -619,7 +620,7 @@ TEST_F(MultiTest, ExecGlobalFallback) {
Run({"set", "a", "1"}); // won't run ooo, because it became part of global
Run({"move", "a", "1"});
Run({"exec"});
EXPECT_EQ(0, GetMetrics().ooo_tx_transaction_cnt);
EXPECT_EQ(0, GetMetrics().coordinator_stats.ooo_tx_cnt);
// Check non atomic mode does not fall back to global.
absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::NON_ATOMIC);

View file

@ -692,6 +692,10 @@ error_code RdbSerializer::SendFullSyncCut() {
return WriteRaw(buf);
}
size_t RdbSerializer::GetTotalBufferCapacity() const {
return mem_buf_.Capacity();
}
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
mem_buf_.Reserve(mem_buf_.InputLen() + buf.size());
IoBuf::Bytes dest = mem_buf_.AppendBuffer();
@ -902,17 +906,18 @@ class RdbSaver::Impl {
error_code ConsumeChannel(const Cancellation* cll);
error_code Flush() {
if (aligned_buf_)
return aligned_buf_->Flush();
return error_code{};
}
void FillFreqMap(RdbTypeFreqMap* dest) const;
error_code SaveAuxFieldStrStr(string_view key, string_view val);
void Cancel();
size_t GetTotalBuffersSize() const;
error_code Flush() {
return aligned_buf_ ? aligned_buf_->Flush() : error_code{};
}
size_t Size() const {
return shard_snapshots_.size();
}
@ -920,12 +925,11 @@ class RdbSaver::Impl {
RdbSerializer* serializer() {
return &meta_serializer_;
}
io::Sink* sink() {
return sink_;
}
void Cancel();
private:
unique_ptr<SliceSnapshot>& GetSnapshot(EngineShard* shard);
@ -936,10 +940,15 @@ class RdbSaver::Impl {
SliceSnapshot::RecordChannel channel_;
bool push_to_sink_with_order_ = false;
std::optional<AlignedBuffer> aligned_buf_;
CompressionMode
compression_mode_; // Single entry compression is compatible with redis rdb snapshot
// Multi entry compression is available only on df snapshot, this will
// make snapshot size smaller and opreation faster.
// Single entry compression is compatible with redis rdb snapshot
// Multi entry compression is available only on df snapshot, this will
// make snapshot size smaller and opreation faster.
CompressionMode compression_mode_;
struct Stats {
size_t pulled_bytes = 0;
} stats_;
};
// We pass K=sz to say how many producers are pushing data in order to maintain
@ -1014,15 +1023,12 @@ std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::InternalPo
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
error_code io_error;
size_t channel_bytes = 0;
std::optional<SliceSnapshot::DbRecord> record;
RecordsPopper records_popper(push_to_sink_with_order_, &channel_);
// we can not exit on io-error since we spawn fibers that push data.
// TODO: we may signal them to stop processing and exit asap in case of the error.
while ((record = records_popper.Pop())) {
if (io_error || cll->IsCancelled())
continue;
@ -1032,7 +1038,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
continue;
DVLOG(2) << "Pulled " << record->id;
channel_bytes += record->value.size();
stats_.pulled_bytes += record->value.size();
io_error = sink_->Write(io::Buffer(record->value));
if (io_error) {
@ -1044,12 +1050,12 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
size_t pushed_bytes = 0;
for (auto& ptr : shard_snapshots_) {
ptr->Join();
pushed_bytes += ptr->channel_bytes();
pushed_bytes += ptr->pushed_bytes();
}
DCHECK(!record.has_value() || !channel_.TryPop(*record));
VLOG(1) << "Channel pulled bytes: " << channel_bytes << " pushed bytes: " << pushed_bytes;
VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes << " pushed bytes: " << pushed_bytes;
return io_error;
}
@ -1090,6 +1096,17 @@ void RdbSaver::Impl::Cancel() {
snapshot->Join();
}
size_t RdbSaver::Impl::GetTotalBuffersSize() const {
DCHECK_EQ(shard_snapshots_.size(), 1u) << "Only supported for dragonfly replication";
auto& snapshot = shard_snapshots_.front();
// Calculate number of enqueued bytes as difference between pushed and pulled
size_t enqueued_bytes = snapshot->pushed_bytes() - stats_.pulled_bytes;
size_t serializer_bytes = snapshot->GetTotalBufferCapacity();
return enqueued_bytes + serializer_bytes;
}
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
for (auto& ptr : shard_snapshots_) {
const RdbTypeFreqMap& src_map = ptr->freq_map();
@ -1266,6 +1283,10 @@ void RdbSaver::Cancel() {
impl_->Cancel();
}
size_t RdbSaver::GetTotalBuffersSize() const {
return impl_->GetTotalBuffersSize();
}
void RdbSerializer::AllocateCompressorOnce() {
if (compressor_impl_) {
return;

View file

@ -108,6 +108,10 @@ class RdbSaver {
return save_mode_;
}
// Can only be called for dragonfly replication.
// Get total size of all rdb serializer buffers and items currently placed in channel
size_t GetTotalBuffersSize() const;
private:
class Impl;
@ -168,6 +172,8 @@ class RdbSerializer {
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();
size_t GetTotalBufferCapacity() const;
private:
// Prepare internal buffer for flush. Compress it.
io::Bytes PrepareFlush();

View file

@ -285,9 +285,9 @@ TEST_F(RdbTest, SaveManyDbs) {
});
auto metrics = GetMetrics();
ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count);
ASSERT_EQ(2, metrics.db_stats.size());
EXPECT_EQ(50000, metrics.db_stats[0].key_count);
EXPECT_EQ(10000, metrics.db_stats[1].key_count);
auto save_fb = pp_->at(0)->LaunchFiber([&] {
RespExpr resp = Run({"save"});
@ -317,10 +317,10 @@ TEST_F(RdbTest, SaveManyDbs) {
EXPECT_EQ(resp, "OK");
metrics = GetMetrics();
ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count);
if (metrics.db[1].key_count != 10000) {
ASSERT_EQ(2, metrics.db_stats.size());
EXPECT_EQ(50000, metrics.db_stats[0].key_count);
EXPECT_EQ(10000, metrics.db_stats[1].key_count);
if (metrics.db_stats[1].key_count != 10000) {
Run({"select", "1"});
resp = Run({"scan", "0", "match", "ab*"});
StringVec vec = StrArray(resp.GetVec()[1]);

View file

@ -271,6 +271,10 @@ void RebuildAllSearchIndices(Service* service) {
});
}
template <typename T> void UpdateMax(T* maxv, T current) {
*maxv = std::max(*maxv, current);
}
} // namespace
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
@ -753,9 +757,10 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
AppendMetricHeader("db_keys_expiring", "Total number of expiring keys by DB", MetricType::GAUGE,
&db_key_expire_metrics);
for (size_t i = 0; i < m.db.size(); ++i) {
AppendMetricValue("db_keys", m.db[i].key_count, {"db"}, {StrCat("db", i)}, &db_key_metrics);
AppendMetricValue("db_keys_expiring", m.db[i].expire_count, {"db"}, {StrCat("db", i)},
for (size_t i = 0; i < m.db_stats.size(); ++i) {
AppendMetricValue("db_keys", m.db_stats[i].key_count, {"db"}, {StrCat("db", i)},
&db_key_metrics);
AppendMetricValue("db_keys_expiring", m.db_stats[i].expire_count, {"db"}, {StrCat("db", i)},
&db_key_expire_metrics);
}
@ -1151,7 +1156,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
auto& stats = sstate.connection_stats;
stats.err_count_map.clear();
stats.command_cnt = 0;
stats.async_writes_cnt = 0;
stats.pipelined_cmd_cnt = 0;
});
return (*cntx)->SendOk();
@ -1208,12 +1213,12 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
}
}
static void MergeInto(const DbSlice::Stats& src, Metrics* dest) {
if (src.db_stats.size() > dest->db.size())
dest->db.resize(src.db_stats.size());
for (size_t i = 0; i < src.db_stats.size(); ++i) {
dest->db[i] += src.db_stats[i];
}
static void MergeDbSliceStats(const DbSlice::Stats& src, Metrics* dest) {
if (src.db_stats.size() > dest->db_stats.size())
dest->db_stats.resize(src.db_stats.size());
for (size_t i = 0; i < src.db_stats.size(); ++i)
dest->db_stats[i] += src.db_stats[i];
dest->events += src.events;
dest->small_string_bytes += src.small_string_bytes;
@ -1221,57 +1226,60 @@ static void MergeInto(const DbSlice::Stats& src, Metrics* dest) {
Metrics ServerFamily::GetMetrics() const {
Metrics result;
Mutex mu;
auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) {
auto& [calls, sum] = dest[string{name}];
calls += stat.first;
sum += stat.second;
};
auto cb = [&](unsigned index, ProactorBase* pb) {
EngineShard* shard = EngineShard::tlocal();
ServerState* ss = ServerState::tlocal();
lock_guard lk(mu);
result.uptime = time(NULL) - this->start_time_;
result.coordinator_stats += ss->stats;
result.conn_stats += ss->connection_stats;
result.qps += uint64_t(ss->MovingSum6());
result.ooo_tx_transaction_cnt += ss->stats.ooo_tx_cnt;
result.eval_io_coordination_cnt += ss->stats.eval_io_coordination_cnt;
result.eval_shardlocal_coordination_cnt += ss->stats.eval_shardlocal_coordination_cnt;
result.eval_squashed_flushes += ss->stats.eval_squashed_flushes;
result.tx_schedule_cancel_cnt += ss->stats.tx_schedule_cancel_cnt;
service_.mutable_registry()->MergeCallStats(
index, [&dest_map = result.cmd_stats_map](string_view name, const CmdCallStats& src) {
auto& ent = dest_map[string{name}];
ent.first += src.first;
ent.second += src.second;
});
result.uptime = time(NULL) - this->start_time_;
result.qps += uint64_t(ss->MovingSum6());
if (shard) {
MergeInto(shard->db_slice().GetStats(), &result);
result.heap_used_bytes += shard->UsedMemory();
if (auto ts = shard->tiered_storage(); ts) {
result.tiered_stats += ts->GetStats();
}
if (auto si = shard->search_indices(); si) {
result.search_stats += si->GetStats();
}
MergeDbSliceStats(shard->db_slice().GetStats(), &result);
result.shard_stats += shard->stats();
if (shard->tiered_storage())
result.tiered_stats += shard->tiered_storage()->GetStats();
if (shard->search_indices())
result.search_stats += shard->search_indices()->GetStats();
result.traverse_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_TRAVERSE);
result.delete_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_DELETE);
}
service_.mutable_registry()->MergeCallStats(index, cmd_stat_cb);
};
service_.proactor_pool().AwaitFiberOnAll(std::move(cb));
result.qps /= 6; // normalize moving average stats
// Normalize moving average stats
result.qps /= 6;
result.traverse_ttl_per_sec /= 6;
result.delete_ttl_per_sec /= 6;
result.is_master = false;
if (ServerState::tlocal() && ServerState::tlocal()->is_master) {
result.is_master = true;
result.is_master = ServerState::tlocal() && ServerState::tlocal()->is_master;
if (result.is_master)
result.replication_metrics = dfly_cmd_->GetReplicasRoleInfo();
}
// Update peak stats
lock_guard lk{peak_stats_mu_};
UpdateMax(&peak_stats_.conn_dispatch_queue_bytes, result.conn_stats.dispatch_queue_bytes);
UpdateMax(&peak_stats_.conn_read_buf_capacity, result.conn_stats.read_buf_capacity);
result.peak_stats = peak_stats_;
return result;
}
@ -1291,26 +1299,27 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
string info;
auto should_enter = [&](string_view name, bool hidden = false) {
bool res = (!hidden && section.empty()) || section == "ALL" || section == name;
if (res && !info.empty())
info.append("\r\n");
return res;
if ((!hidden && section.empty()) || section == "ALL" || section == name) {
auto normalized_name = string{name.substr(0, 1)} + absl::AsciiStrToLower(name.substr(1));
absl::StrAppend(&info, info.empty() ? "" : "\r\n", "# ", normalized_name, "\r\n");
return true;
}
return false;
};
auto append = [&info](absl::AlphaNum a1, absl::AlphaNum a2) {
absl::StrAppend(&info, a1, ":", a2, "\r\n");
};
#define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n")
Metrics m = GetMetrics();
DbStats total;
for (const auto& db_stats : m.db_stats)
total += db_stats;
if (should_enter("SERVER")) {
auto kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
ADD_HEADER("# Server");
append("redis_version", kRedisVersion);
append("dragonfly_version", GetVersion());
append("redis_mode", "standalone");
@ -1323,29 +1332,20 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("uptime_in_days", uptime / (3600 * 24));
}
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
DbStats total;
for (const auto& db_stats : m.db) {
total += db_stats;
}
if (should_enter("CLIENTS")) {
ADD_HEADER("# Clients");
append("connected_clients", m.conn_stats.num_conns);
append("client_read_buffer_bytes", m.conn_stats.read_buf_capacity);
append("blocked_clients", m.conn_stats.num_blocked_clients);
append("dispatch_queue_entries", m.conn_stats.dispatch_queue_entries);
}
if (should_enter("MEMORY")) {
ADD_HEADER("# Memory");
io::Result<io::StatusData> sdata_res = io::ReadStatusInfo();
append("used_memory", m.heap_used_bytes);
append("used_memory_human", HumanReadableNumBytes(m.heap_used_bytes));
append("used_memory_peak", used_mem_peak.load(memory_order_relaxed));
append("comitted_memory", GetMallocCurrentCommitted());
if (sdata_res.has_value()) {
size_t rss = sdata_res->vm_rss + sdata_res->hugetlb_pages;
append("used_memory_rss", rss);
@ -1355,6 +1355,11 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
<< sdata_res.error().message();
}
append("comitted_memory", GetMallocCurrentCommitted());
append("maxmemory", max_memory_limit);
append("maxmemory_human", HumanReadableNumBytes(max_memory_limit));
// Blob - all these cases where the key/objects are represented by a single blob allocated on
// heap. For example, strings or intsets. members of lists, sets, zsets etc
// are not accounted for to avoid complex computations. In some cases, when number of members
@ -1369,12 +1374,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("listpack_blobs", total.listpack_blob_cnt);
append("listpack_bytes", total.listpack_bytes);
append("small_string_bytes", m.small_string_bytes);
append("pipeline_cache_bytes", m.conn_stats.pipeline_cache_capacity);
append("maxmemory", max_memory_limit);
append("maxmemory_human", HumanReadableNumBytes(max_memory_limit));
append("pipeline_cache_bytes", m.conn_stats.pipeline_cmd_cache_bytes);
append("dispatch_queue_bytes", m.conn_stats.dispatch_queue_bytes);
append("dispatch_queue_peak_bytes", m.peak_stats.conn_dispatch_queue_bytes);
append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity);
if (GetFlag(FLAGS_cache_mode)) {
append("cache_mode", "cache");
// PHP Symphony needs this field to work.
append("maxmemory_policy", "eviction");
} else {
@ -1382,11 +1389,16 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
// Compatible with redis based frameworks.
append("maxmemory_policy", "noeviction");
}
if (m.is_master && !m.replication_metrics.empty()) {
ReplicationMemoryStats repl_mem;
dfly_cmd_->GetReplicationMemoryStats(&repl_mem);
append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes_);
append("replication_full_sync_buffer_bytes", repl_mem.full_sync_buf_bytes_);
}
}
if (should_enter("STATS")) {
ADD_HEADER("# Stats");
append("total_connections_received", m.conn_stats.conn_received_cnt);
append("total_commands_processed", m.conn_stats.command_cnt);
append("instantaneous_ops_per_sec", m.qps);
@ -1410,18 +1422,17 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("keyspace_misses", m.events.misses);
append("total_reads_processed", m.conn_stats.io_read_cnt);
append("total_writes_processed", m.conn_stats.io_write_cnt);
append("async_writes_count", m.conn_stats.async_writes_cnt);
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("eval_io_coordination_total", m.eval_io_coordination_cnt);
append("eval_shardlocal_coordination_total", m.eval_shardlocal_coordination_cnt);
append("eval_squashed_flushes", m.eval_squashed_flushes);
append("tx_schedule_cancel_total", m.tx_schedule_cancel_cnt);
append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt);
append("eval_shardlocal_coordination_total",
m.coordinator_stats.eval_shardlocal_coordination_cnt);
append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes);
append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt);
}
if (should_enter("TIERED", true)) {
ADD_HEADER("# TIERED");
append("tiered_entries", total.tiered_entries);
append("tiered_bytes", total.tiered_size);
append("tiered_reads", m.tiered_stats.tiered_reads);
@ -1433,7 +1444,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("PERSISTENCE", true)) {
ADD_HEADER("# PERSISTENCE");
decltype(last_save_info_) save_info;
{
lock_guard lk(save_mu_);
@ -1453,8 +1463,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("REPLICATION")) {
ADD_HEADER("# Replication");
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
@ -1487,20 +1495,14 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("COMMANDSTATS", true)) {
ADD_HEADER("# Commandstats");
auto unknown_cmd = service_.UknownCmdMap();
auto append_sorted = [&append](string_view prefix, auto display) {
sort(display.begin(), display.end());
for (const auto& k_v : display) {
append(StrCat(prefix, k_v.first), k_v.second);
}
};
vector<pair<string_view, string>> commands;
for (const auto& [name, stats] : m.cmd_stats_map) {
const auto calls = stats.first, sum = stats.second;
commands.push_back(
@ -1509,29 +1511,28 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
",")});
}
auto unknown_cmd = service_.UknownCmdMap();
append_sorted("cmdstat_", move(commands));
append_sorted("unknown_",
vector<pair<string_view, uint64_t>>(unknown_cmd.cbegin(), unknown_cmd.cend()));
}
if (should_enter("SEARCH", true)) {
ADD_HEADER("# Search");
append("search_memory", m.search_stats.used_memory);
append("search_num_indices", m.search_stats.num_indices);
append("search_num_entries", m.search_stats.num_entries);
}
if (should_enter("ERRORSTATS", true)) {
ADD_HEADER("# Errorstats");
for (const auto& k_v : m.conn_stats.err_count_map) {
append(k_v.first, k_v.second);
}
}
if (should_enter("KEYSPACE")) {
ADD_HEADER("# Keyspace");
for (size_t i = 0; i < m.db.size(); ++i) {
const auto& stats = m.db[i];
for (size_t i = 0; i < m.db_stats.size(); ++i) {
const auto& stats = m.db_stats[i];
bool show = (i == 0) || (stats.key_count > 0);
if (show) {
string val = StrCat("keys=", stats.key_count, ",expires=", stats.expire_count,
@ -1543,7 +1544,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
#ifndef __APPLE__
if (should_enter("CPU")) {
ADD_HEADER("# CPU");
struct rusage ru, cu, tu;
getrusage(RUSAGE_SELF, &ru);
getrusage(RUSAGE_CHILDREN, &cu);
@ -1558,7 +1558,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
#endif
if (should_enter("CLUSTER")) {
ADD_HEADER("# Cluster");
append("cluster_enabled", ClusterConfig::IsEnabledOrEmulated());
}

View file

@ -13,6 +13,7 @@
#include "server/channel_store.h"
#include "server/engine_shard_set.h"
#include "server/replica.h"
#include "server/server_state.h"
namespace util {
@ -50,32 +51,42 @@ struct ReplicaRoleInfo {
uint64_t lsn_lag;
};
struct ReplicationMemoryStats {
size_t streamer_buf_capacity_bytes_ = 0; // total capacities of streamer buffers
size_t full_sync_buf_bytes_ = 0; // total bytes used for full sync buffers
};
// Global peak stats recorded after aggregating metrics over all shards.
// Note that those values are only updated during GetMetrics calls.
struct PeakStats {
size_t conn_dispatch_queue_bytes = 0; // peak value of conn_stats.dispatch_queue_bytes
size_t conn_read_buf_capacity = 0; // peak of total read buf capcacities
};
// Aggregated metrics over multiple sources on all shards
struct Metrics {
std::vector<DbStats> db;
SliceEvents events;
TieredStats tiered_stats;
SliceEvents events; // general keyspace stats
std::vector<DbStats> db_stats; // dbsize stats
EngineShard::Stats shard_stats; // per-shard stats
facade::ConnectionStats conn_stats; // client stats and buffer sizes
TieredStats tiered_stats; // stats for tiered storage
SearchStats search_stats;
EngineShard::Stats shard_stats;
ServerState::Stats coordinator_stats; // stats on transaction running
PeakStats peak_stats;
size_t uptime = 0;
size_t qps = 0;
size_t heap_used_bytes = 0;
size_t heap_comitted_bytes = 0;
size_t small_string_bytes = 0;
uint64_t ooo_tx_transaction_cnt = 0;
uint64_t eval_io_coordination_cnt = 0;
uint64_t eval_shardlocal_coordination_cnt = 0;
uint64_t eval_squashed_flushes = 0;
uint64_t tx_schedule_cancel_cnt = 0;
uint32_t traverse_ttl_per_sec = 0;
uint32_t delete_ttl_per_sec = 0;
std::map<std::string, std::pair<uint64_t, uint64_t>> cmd_stats_map; // command call frequencies
bool is_master = true;
facade::ConnectionStats conn_stats;
// command statistics; see CommandId.
std::map<std::string, std::pair<uint64_t, uint64_t>> cmd_stats_map;
std::vector<ReplicaRoleInfo> replication_metrics;
};
@ -250,6 +261,9 @@ class ServerFamily {
Done schedule_done_;
std::unique_ptr<FiberQueueThreadPool> fq_threadpool_;
std::shared_ptr<detail::SnapshotStorage> snapshot_storage_;
mutable Mutex peak_stats_mu_;
mutable PeakStats peak_stats_;
};
} // namespace dfly

View file

@ -23,6 +23,17 @@ namespace dfly {
__thread ServerState* ServerState::state_ = nullptr;
ServerState::Stats& ServerState::Stats::operator+=(const ServerState::Stats& other) {
this->ooo_tx_cnt += other.ooo_tx_cnt;
this->eval_io_coordination_cnt += other.eval_io_coordination_cnt;
this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt;
this->eval_squashed_flushes += other.eval_squashed_flushes;
this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt;
static_assert(sizeof(Stats) == 5 * 8);
return *this;
}
void MonitorsRepo::Add(facade::Connection* connection) {
VLOG(1) << "register connection "
<< " at address 0x" << std::hex << (const void*)connection << " for thread "

View file

@ -98,6 +98,8 @@ class ServerState { // public struct - to allow initialization.
uint64_t eval_squashed_flushes = 0;
uint64_t tx_schedule_cancel_cnt = 0;
Stats& operator+=(const Stats& other);
};
static ServerState* tlocal() {

View file

@ -273,7 +273,8 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
size_t serialized = sfile.val.size();
if (serialized == 0)
return 0;
stats_.channel_bytes += serialized;
stats_.pushed_bytes += serialized;
auto id = rec_id_++;
DVLOG(2) << "Pushed " << id;
@ -330,4 +331,8 @@ void SliceSnapshot::CloseRecordChannel() {
}
}
size_t SliceSnapshot::GetTotalBufferCapacity() const {
return serializer_->GetTotalBufferCapacity();
}
} // namespace dfly

View file

@ -114,14 +114,16 @@ class SliceSnapshot {
return snapshot_version_;
}
size_t channel_bytes() const {
return stats_.channel_bytes;
size_t pushed_bytes() const {
return stats_.pushed_bytes;
}
const RdbTypeFreqMap& freq_map() const {
return type_freq_map_;
}
size_t GetTotalBufferCapacity() const;
private:
DbSlice* db_slice_;
DbTableArray db_array_;
@ -146,7 +148,7 @@ class SliceSnapshot {
uint64_t rec_id_ = 0;
struct Stats {
size_t channel_bytes = 0;
size_t pushed_bytes = 0;
size_t loop_serialized = 0, skipped = 0, side_saved = 0;
size_t savecb_calls = 0;
} stats_;

View file

@ -48,7 +48,7 @@ TEST_F(StringFamilyTest, SetGet) {
EXPECT_EQ(Run({"get", "key"}), "2");
auto metrics = GetMetrics();
EXPECT_EQ(6, metrics.ooo_tx_transaction_cnt);
EXPECT_EQ(6, metrics.coordinator_stats.ooo_tx_cnt);
}
TEST_F(StringFamilyTest, Incr) {

View file

@ -67,26 +67,26 @@ TEST_F(TieredStorageTest, Basic) {
EXPECT_EQ(5000, CheckedInt({"dbsize"}));
Metrics m = GetMetrics();
EXPECT_GT(m.db[0].tiered_entries, 0u);
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
FillExternalKeys(5000);
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
DbStats stats = m.db[0];
DbStats stats = m.db_stats[0];
LOG(INFO) << stats;
unsigned tiered_entries = m.db[0].tiered_entries;
unsigned tiered_entries = m.db_stats[0].tiered_entries;
EXPECT_GT(tiered_entries, 0u);
string resp = CheckedString({"debug", "object", "k1"});
EXPECT_THAT(resp, HasSubstr("spill_len"));
m = GetMetrics();
LOG(INFO) << m.db[0];
ASSERT_EQ(tiered_entries, m.db[0].tiered_entries);
LOG(INFO) << m.db_stats[0];
ASSERT_EQ(tiered_entries, m.db_stats[0].tiered_entries);
Run({"del", "k1"});
m = GetMetrics();
EXPECT_EQ(m.db[0].tiered_entries, tiered_entries - 1);
EXPECT_EQ(m.db_stats[0].tiered_entries, tiered_entries - 1);
}
} // namespace dfly