diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 2acc276b6..56be517fd 100755 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -3,6 +3,7 @@ // #include "facade/reply_builder.h" +#include #include #include #include @@ -39,6 +40,8 @@ const char* NullString(bool resp3) { return resp3 ? "_\r\n" : "$-1\r\n"; } +static thread_local SinkReplyBuilder::StatsType tl_stats; + } // namespace SinkReplyBuilder::MGetResponse::~MGetResponse() { @@ -58,7 +61,20 @@ void SinkReplyBuilder::CloseConnection() { ec_ = std::make_error_code(std::errc::connection_aborted); } +SinkReplyBuilder::StatsType SinkReplyBuilder::GetThreadLocalStats() { + return tl_stats; +} + void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { + int64_t before = absl::GetCurrentTimeNanos(); + SendStatsType stats_type = SendStatsType::kRegular; + + auto cleanup = absl::MakeCleanup([&]() { + int64_t after = absl::GetCurrentTimeNanos(); + tl_stats[stats_type].count++; + tl_stats[stats_type].total_duration += (after - before) / 1'000; + }); + has_replied_ = true; DCHECK(sink_); constexpr size_t kMaxBatchSize = 1024; @@ -70,6 +86,8 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { // Allow batching with up to kMaxBatchSize of data. if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) { + stats_type = SendStatsType::kBatch; + batch_.reserve(batch_.size() + bsize); for (unsigned i = 0; i < len; ++i) { std::string_view src((char*)v[i].iov_base, v[i].iov_len); diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index c3cc437c8..097b44427 100755 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -155,6 +155,27 @@ class SinkReplyBuilder { virtual size_t UsedMemory() const; + enum SendStatsType { + kRegular, // Send() operations that are written to sockets + kBatch, // Send() operations that are internally batched to a buffer + kNumTypes, // Number of types, do not use directly + }; + + struct SendStats { + int64_t count = 0; + int64_t total_duration = 0; + + SendStats& operator+=(const SendStats& other) { + count += other.count; + total_duration += other.total_duration; + return *this; + } + }; + + using StatsType = std::array; + + static StatsType GetThreadLocalStats(); + protected: void SendRaw(std::string_view str); // Sends raw without any formatting. void SendRawVec(absl::Span msg_vec); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 4d38a16fd..6329f2cdf 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -214,6 +214,7 @@ using namespace util; using detail::SaveStagesController; using http::StringResponse; using strings::HumanReadableNumBytes; +using SendStatsType = facade::SinkReplyBuilder::SendStatsType; namespace { @@ -825,6 +826,40 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes, MetricType::COUNTER, &resp->body()); + { + string send_latency_metrics; + constexpr string_view kReplyLatency = "reply_latency_seconds_total"; + AppendMetricHeader(kReplyLatency, "Reply latency per type", MetricType::COUNTER, + &send_latency_metrics); + + string send_count_metrics; + constexpr string_view kReplyCount = "reply_total"; + AppendMetricHeader(kReplyCount, "Reply count per type", MetricType::COUNTER, + &send_count_metrics); + + for (unsigned i = 0; i < SendStatsType::kNumTypes; ++i) { + auto& stats = m.reply_stats[i]; + string_view type; + switch (SendStatsType(i)) { + case SendStatsType::kRegular: + type = "regular"; + break; + case SendStatsType::kBatch: + type = "batch"; + break; + case SendStatsType::kNumTypes: + type = "other"; + break; + } + + AppendMetricValue(kReplyLatency, stats.total_duration * 1'000'000, {"type"}, {type}, + &send_latency_metrics); + AppendMetricValue(kReplyCount, stats.count, {"type"}, {type}, &send_count_metrics); + } + + absl::StrAppend(&resp->body(), send_latency_metrics); + absl::StrAppend(&resp->body(), send_count_metrics); + } // DB stats AppendMetricWithoutLabels("expired_keys_total", "", m.events.expired_keys, MetricType::COUNTER, @@ -1498,6 +1533,7 @@ Metrics ServerFamily::GetMetrics() const { auto cb = [&](unsigned index, ProactorBase* pb) { EngineShard* shard = EngineShard::tlocal(); ServerState* ss = ServerState::tlocal(); + auto reply_stats = SinkReplyBuilder::GetThreadLocalStats(); lock_guard lk(mu); @@ -1512,6 +1548,10 @@ Metrics ServerFamily::GetMetrics() const { result.qps += uint64_t(ss->MovingSum6()); result.conn_stats += ss->connection_stats; + for (size_t i = 0; i < reply_stats.size(); ++i) { + result.reply_stats[i] += reply_stats[i]; + } + if (shard) { result.heap_used_bytes += shard->UsedMemory(); MergeDbSliceStats(shard->db_slice().GetStats(), &result); @@ -1703,6 +1743,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("defrag_attempt_total", m.shard_stats.defrag_attempt_total); append("defrag_realloc_total", m.shard_stats.defrag_realloc_total); append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); + append("reply_count", m.reply_stats[SendStatsType::kRegular].count); + append("reply_latency_usec", m.reply_stats[SendStatsType::kRegular].total_duration); + append("reply_batch_count", m.reply_stats[SendStatsType::kBatch].count); + append("reply_batch_latency_usec", m.reply_stats[SendStatsType::kBatch].total_duration); } if (should_enter("TIERED", true)) { diff --git a/src/server/server_family.h b/src/server/server_family.h index 6ef954a13..825758ce7 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -10,6 +10,7 @@ #include "facade/conn_context.h" #include "facade/dragonfly_listener.h" #include "facade/redis_parser.h" +#include "facade/reply_builder.h" #include "server/channel_store.h" #include "server/engine_shard_set.h" #include "server/replica.h" @@ -77,6 +78,8 @@ struct Metrics { SearchStats search_stats; ServerState::Stats coordinator_stats; // stats on transaction running + facade::SinkReplyBuilder::StatsType reply_stats{}; // Stats for Send*() ops + PeakStats peak_stats; size_t uptime = 0;