feat(server): Add reply count & latency metrics (#2340)

* feat(server): Add reply count & latency metrics

* fixes
This commit is contained in:
Shahar Mike 2023-12-26 23:02:45 +02:00 committed by GitHub
parent 5b81ccda18
commit 16a0becea5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 86 additions and 0 deletions

View file

@ -3,6 +3,7 @@
//
#include "facade/reply_builder.h"
#include <absl/cleanup/cleanup.h>
#include <absl/container/fixed_array.h>
#include <absl/strings/numbers.h>
#include <absl/strings/str_cat.h>
@ -39,6 +40,8 @@ const char* NullString(bool resp3) {
return resp3 ? "_\r\n" : "$-1\r\n";
}
static thread_local SinkReplyBuilder::StatsType tl_stats;
} // namespace
SinkReplyBuilder::MGetResponse::~MGetResponse() {
@ -58,7 +61,20 @@ void SinkReplyBuilder::CloseConnection() {
ec_ = std::make_error_code(std::errc::connection_aborted);
}
SinkReplyBuilder::StatsType SinkReplyBuilder::GetThreadLocalStats() {
return tl_stats;
}
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
int64_t before = absl::GetCurrentTimeNanos();
SendStatsType stats_type = SendStatsType::kRegular;
auto cleanup = absl::MakeCleanup([&]() {
int64_t after = absl::GetCurrentTimeNanos();
tl_stats[stats_type].count++;
tl_stats[stats_type].total_duration += (after - before) / 1'000;
});
has_replied_ = true;
DCHECK(sink_);
constexpr size_t kMaxBatchSize = 1024;
@ -70,6 +86,8 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
// Allow batching with up to kMaxBatchSize of data.
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
stats_type = SendStatsType::kBatch;
batch_.reserve(batch_.size() + bsize);
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);

View file

@ -155,6 +155,27 @@ class SinkReplyBuilder {
virtual size_t UsedMemory() const;
enum SendStatsType {
kRegular, // Send() operations that are written to sockets
kBatch, // Send() operations that are internally batched to a buffer
kNumTypes, // Number of types, do not use directly
};
struct SendStats {
int64_t count = 0;
int64_t total_duration = 0;
SendStats& operator+=(const SendStats& other) {
count += other.count;
total_duration += other.total_duration;
return *this;
}
};
using StatsType = std::array<SendStats, SendStatsType::kNumTypes>;
static StatsType GetThreadLocalStats();
protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void SendRawVec(absl::Span<const std::string_view> msg_vec);

View file

@ -214,6 +214,7 @@ using namespace util;
using detail::SaveStagesController;
using http::StringResponse;
using strings::HumanReadableNumBytes;
using SendStatsType = facade::SinkReplyBuilder::SendStatsType;
namespace {
@ -825,6 +826,40 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes,
MetricType::COUNTER, &resp->body());
{
string send_latency_metrics;
constexpr string_view kReplyLatency = "reply_latency_seconds_total";
AppendMetricHeader(kReplyLatency, "Reply latency per type", MetricType::COUNTER,
&send_latency_metrics);
string send_count_metrics;
constexpr string_view kReplyCount = "reply_total";
AppendMetricHeader(kReplyCount, "Reply count per type", MetricType::COUNTER,
&send_count_metrics);
for (unsigned i = 0; i < SendStatsType::kNumTypes; ++i) {
auto& stats = m.reply_stats[i];
string_view type;
switch (SendStatsType(i)) {
case SendStatsType::kRegular:
type = "regular";
break;
case SendStatsType::kBatch:
type = "batch";
break;
case SendStatsType::kNumTypes:
type = "other";
break;
}
AppendMetricValue(kReplyLatency, stats.total_duration * 1'000'000, {"type"}, {type},
&send_latency_metrics);
AppendMetricValue(kReplyCount, stats.count, {"type"}, {type}, &send_count_metrics);
}
absl::StrAppend(&resp->body(), send_latency_metrics);
absl::StrAppend(&resp->body(), send_count_metrics);
}
// DB stats
AppendMetricWithoutLabels("expired_keys_total", "", m.events.expired_keys, MetricType::COUNTER,
@ -1498,6 +1533,7 @@ Metrics ServerFamily::GetMetrics() const {
auto cb = [&](unsigned index, ProactorBase* pb) {
EngineShard* shard = EngineShard::tlocal();
ServerState* ss = ServerState::tlocal();
auto reply_stats = SinkReplyBuilder::GetThreadLocalStats();
lock_guard lk(mu);
@ -1512,6 +1548,10 @@ Metrics ServerFamily::GetMetrics() const {
result.qps += uint64_t(ss->MovingSum6());
result.conn_stats += ss->connection_stats;
for (size_t i = 0; i < reply_stats.size(); ++i) {
result.reply_stats[i] += reply_stats[i];
}
if (shard) {
result.heap_used_bytes += shard->UsedMemory();
MergeDbSliceStats(shard->db_slice().GetStats(), &result);
@ -1703,6 +1743,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("reply_count", m.reply_stats[SendStatsType::kRegular].count);
append("reply_latency_usec", m.reply_stats[SendStatsType::kRegular].total_duration);
append("reply_batch_count", m.reply_stats[SendStatsType::kBatch].count);
append("reply_batch_latency_usec", m.reply_stats[SendStatsType::kBatch].total_duration);
}
if (should_enter("TIERED", true)) {

View file

@ -10,6 +10,7 @@
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/redis_parser.h"
#include "facade/reply_builder.h"
#include "server/channel_store.h"
#include "server/engine_shard_set.h"
#include "server/replica.h"
@ -77,6 +78,8 @@ struct Metrics {
SearchStats search_stats;
ServerState::Stats coordinator_stats; // stats on transaction running
facade::SinkReplyBuilder::StatsType reply_stats{}; // Stats for Send*() ops
PeakStats peak_stats;
size_t uptime = 0;