feat(metrics): Add label for main and other listeners (#4739)

* feat(metrics): Add label for main and other listeners

The stats collected per connection are divided according to main or
other listener.

Metrics are decorated with labels listener= main or other.

The memcached listener is also labelled as main.

Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
This commit is contained in:
Abhijat Malviya 2025-03-12 18:33:04 +05:30 committed by GitHub
parent 0e35f788ec
commit ac33cd871b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 143 additions and 24 deletions

View file

@ -698,7 +698,7 @@ void Connection::OnPostMigrateThread() {
}
stats_ = &tl_facade_stats->conn_stats;
++stats_->num_conns;
IncrNumConns();
stats_->read_buf_capacity += io_buf_.Capacity();
}
@ -706,6 +706,10 @@ void Connection::OnConnectionStart() {
ThisFiber::SetName("DflyConnection");
stats_ = &tl_facade_stats->conn_stats;
if (const Listener* lsnr = static_cast<Listener*>(listener()); lsnr) {
is_main_ = lsnr->IsMainInterface();
}
}
void Connection::HandleRequests() {
@ -916,7 +920,16 @@ bool Connection::IsPrivileged() const {
}
bool Connection::IsMain() const {
return static_cast<Listener*>(listener())->IsMainInterface();
return is_main_;
}
bool Connection::IsMainOrMemcache() const {
if (is_main_) {
return true;
}
const Listener* lsnr = static_cast<Listener*>(listener());
return lsnr && lsnr->protocol() == Protocol::MEMCACHE;
}
void Connection::SetName(string name) {
@ -990,7 +1003,7 @@ void Connection::ConnectionFlow() {
ConfigureProvidedBuffer();
++stats_->num_conns;
IncrNumConns();
++stats_->conn_received_cnt;
stats_->read_buf_capacity += io_buf_.Capacity();
@ -1920,8 +1933,7 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
void Connection::DecreaseStatsOnClose() {
stats_->read_buf_capacity -= io_buf_.Capacity();
--stats_->num_conns;
DecrNumConns();
}
void Connection::BreakOnce(uint32_t ev_mask) {
@ -1993,6 +2005,20 @@ void Connection::MarkReadBufferConsumed() {
}
}
void Connection::IncrNumConns() {
if (IsMainOrMemcache())
++stats_->num_conns_main;
else
++stats_->num_conns_other;
}
void Connection::DecrNumConns() {
if (IsMainOrMemcache())
--stats_->num_conns_main;
else
--stats_->num_conns_other;
}
void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) {
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
@ -2059,11 +2085,11 @@ bool Connection::WeakRef::operator==(const WeakRef& other) const {
void ResetStats() {
auto& cstats = tl_facade_stats->conn_stats;
cstats.command_cnt = 0;
cstats.pipelined_cmd_cnt = 0;
cstats.conn_received_cnt = 0;
cstats.pipelined_cmd_cnt = 0;
cstats.command_cnt = 0;
cstats.command_cnt_main = 0;
cstats.command_cnt_other = 0;
cstats.io_read_cnt = 0;
cstats.io_read_bytes = 0;

View file

@ -261,6 +261,10 @@ class Connection : public util::Connection {
bool IsMain() const;
// In addition to the listener role being main, also returns true if the protocol is Memcached.
// This method returns true for customer facing listeners.
bool IsMainOrMemcache() const;
void SetName(std::string name);
void SetLibName(std::string name);
@ -409,6 +413,9 @@ class Connection : public util::Connection {
io::Bytes NextBundleBuffer(size_t total_len);
void MarkReadBufferConsumed();
void IncrNumConns();
void DecrNumConns();
std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber async_fb_; // async fiber (if started)
@ -474,6 +481,7 @@ class Connection : public util::Connection {
bool is_http_ : 1;
bool is_tls_ : 1;
bool recv_provided_ : 1;
bool is_main_ : 1;
};
};
};

View file

@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 120u);
static_assert(kSizeConnStats == 136u);
ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
@ -29,11 +29,13 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(pipeline_cmd_cache_bytes);
ADD(io_read_cnt);
ADD(io_read_bytes);
ADD(command_cnt);
ADD(command_cnt_main);
ADD(command_cnt_other);
ADD(pipelined_cmd_cnt);
ADD(pipelined_cmd_latency);
ADD(conn_received_cnt);
ADD(num_conns);
ADD(num_conns_main);
ADD(num_conns_other);
ADD(num_blocked_clients);
ADD(num_migrations);
ADD(num_recv_provided_calls);

View file

@ -99,12 +99,14 @@ struct ConnectionStats {
uint64_t io_read_cnt = 0;
size_t io_read_bytes = 0;
uint64_t command_cnt = 0;
uint64_t command_cnt_main = 0;
uint64_t command_cnt_other = 0;
uint64_t pipelined_cmd_cnt = 0;
uint64_t pipelined_cmd_latency = 0; // in microseconds
uint64_t conn_received_cnt = 0;
uint32_t num_conns = 0;
uint32_t num_conns_main = 0;
uint32_t num_conns_other = 0;
uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0;
uint64_t num_recv_provided_calls = 0;

View file

@ -104,6 +104,7 @@ ConnectionContext::ConnectionContext(facade::Connection* owner, acl::UserCredent
: facade::ConnectionContext(owner) {
if (owner) {
skip_acl_validation = owner->IsPrivileged();
has_main_or_memcache_listener = owner->IsMainOrMemcache();
}
keys = std::move(cred.keys);
@ -125,6 +126,9 @@ ConnectionContext::ConnectionContext(const ConnectionContext* owner, Transaction
skip_acl_validation = owner->skip_acl_validation;
acl_db_idx = owner->acl_db_idx;
ns = owner->ns;
if (owner->conn()) {
has_main_or_memcache_listener = owner->conn()->IsMainOrMemcache();
}
} else {
acl_commands = std::vector<uint64_t>(acl::NumberOfFamilies(), acl::NONE_COMMANDS);
}

View file

@ -317,6 +317,9 @@ class ConnectionContext : public facade::ConnectionContext {
// Reference to a FlowInfo for this connection if from a master to a replica.
FlowInfo* replication_flow = nullptr;
// The related connection is bound to main listener or serves the memcached protocol
bool has_main_or_memcache_listener = false;
private:
void EnableMonitoring(bool enable) {
subscriptions++; // required to support the monitoring

View file

@ -834,4 +834,16 @@ TEST_F(DflyEngineTest, ReplicaofRejectOnLoad) {
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
TEST_F(DflyEngineTest, CommandMetricLabels) {
EXPECT_EQ(Run({"SET", "foo", "bar"}), "OK");
EXPECT_EQ(Run({"GET", "foo"}), "bar");
const Metrics metrics = GetMetrics();
// The test connection counts as other
EXPECT_EQ(metrics.facade_stats.conn_stats.command_cnt_other, 2);
EXPECT_EQ(metrics.facade_stats.conn_stats.command_cnt_main, 0);
EXPECT_EQ(metrics.facade_stats.conn_stats.num_conns_main, 0);
EXPECT_EQ(metrics.facade_stats.conn_stats.num_conns_other, 0);
}
} // namespace dfly

View file

@ -1318,7 +1318,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, SinkReplyBui
DispatchMonitor(cntx, cid, tail_args);
}
ServerState::tlocal()->RecordCmd();
ServerState::tlocal()->RecordCmd(cntx->has_main_or_memcache_listener);
Transaction* tx = cntx->transaction;
auto& info = cntx->conn_state.tracking_info_;
const bool is_read_only = cid->opt_mask() & CO::READONLY;

View file

@ -1283,8 +1283,11 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
const auto& conn_stats = m.facade_stats.conn_stats;
AppendMetricWithoutLabels("max_clients", "Maximal number of clients", GetFlag(FLAGS_maxclients),
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("connected_clients", "", conn_stats.num_conns, MetricType::GAUGE,
&resp->body());
AppendMetricHeader("connected_clients", "", MetricType::GAUGE, &resp->body());
AppendMetricValue("connected_clients", conn_stats.num_conns_main, {"listener"}, {"main"},
&resp->body());
AppendMetricValue("connected_clients", conn_stats.num_conns_other, {"listener"}, {"other"},
&resp->body());
AppendMetricWithoutLabels("client_read_buffer_bytes", "", conn_stats.read_buf_capacity,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
@ -1381,8 +1384,11 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("commands_processed_total", "", conn_stats.command_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricHeader("commands_processed_total", "", MetricType::COUNTER, &resp->body());
AppendMetricValue("commands_processed_total", conn_stats.command_cnt_main, {"listener"}, {"main"},
&resp->body());
AppendMetricValue("commands_processed_total", conn_stats.command_cnt_other, {"listener"},
{"other"}, &resp->body());
AppendMetricWithoutLabels("keyspace_hits_total", "", m.events.hits, MetricType::COUNTER,
&resp->body());
AppendMetricWithoutLabels("keyspace_misses_total", "", m.events.misses, MetricType::COUNTER,
@ -1628,6 +1634,8 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
Metrics m = GetMetrics(&namespaces->GetDefaultNamespace());
uint64_t uptime = time(NULL) - start_time_;
const uint32_t total_conns =
m.facade_stats.conn_stats.num_conns_main + m.facade_stats.conn_stats.num_conns_other;
ADD_LINE(pid, getpid());
ADD_LINE(uptime, uptime);
ADD_LINE(time, now);
@ -1637,7 +1645,7 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
ADD_LINE(rusage_user, utime);
ADD_LINE(rusage_system, systime);
ADD_LINE(max_connections, -1);
ADD_LINE(curr_connections, m.facade_stats.conn_stats.num_conns);
ADD_LINE(curr_connections, total_conns);
ADD_LINE(total_connections, -1);
ADD_LINE(rejected_connections, -1);
ADD_LINE(bytes_read, m.facade_stats.conn_stats.io_read_bytes);
@ -2357,7 +2365,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
};
auto add_clients_info = [&] {
append("connected_clients", m.facade_stats.conn_stats.num_conns);
append("connected_clients",
m.facade_stats.conn_stats.num_conns_main + m.facade_stats.conn_stats.num_conns_other);
append("max_clients", GetFlag(FLAGS_maxclients));
append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity);
append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients);
@ -2445,7 +2454,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
auto& reply_stats = m.facade_stats.reply_stats;
append("total_connections_received", conn_stats.conn_received_cnt);
append("total_commands_processed", conn_stats.command_cnt);
append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other);
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);

View file

@ -210,8 +210,12 @@ class ServerState { // public struct - to allow initialization.
return qps_.SumTail();
}
void RecordCmd() {
++tl_connection_stats()->command_cnt;
void RecordCmd(const bool is_main_conn) {
if (is_main_conn) {
++tl_connection_stats()->command_cnt_main;
} else {
++tl_connection_stats()->command_cnt_other;
}
qps_.Inc();
}

View file

@ -1,7 +1,8 @@
import pytest
import redis
from prometheus_client.samples import Sample
from pymemcache import Client
from . import dfly_args
from .instance import DflyInstance
from .utility import *
@ -172,3 +173,51 @@ async def test_blocking_commands_should_not_show_up_in_slow_log(
# blpop does not show up, only the previous reset
assert reply[0]["command"] == "SLOWLOG RESET"
@dfly_args({"memcached_port": 11211, "admin_port": 1112})
async def test_metric_labels(
df_server: DflyInstance, async_client: aioredis.Redis, memcached_client: Client
):
result = await async_client.set("foo", "bar")
assert result, "Failed to set key"
result = await async_client.get("foo")
assert result == "bar", "Failed to read value"
def match_label_value(s: Sample, name, func):
assert "listener" in s.labels
if s.labels["listener"] == name:
assert func(s.value)
metrics = await df_server.metrics()
for sample in metrics["dragonfly_commands_processed"].samples:
match_label_value(sample, "main", lambda v: v > 0)
match_label_value(sample, "other", lambda v: v == 0)
for sample in metrics["dragonfly_connected_clients"].samples:
match_label_value(sample, "main", lambda v: v == 1)
match_label_value(sample, "other", lambda v: v == 0)
# Memcached client also counts as main
memcached_client.set("foo", "bar")
metrics = await df_server.metrics()
for sample in metrics["dragonfly_commands_processed"].samples:
match_label_value(sample, "main", lambda v: v > 0)
match_label_value(sample, "other", lambda v: v == 0)
for sample in metrics["dragonfly_connected_clients"].samples:
match_label_value(sample, "main", lambda v: v == 2)
match_label_value(sample, "other", lambda v: v == 0)
# admin client counts as other
async with aioredis.Redis(port=1112) as admin:
await admin.ping()
metrics = await df_server.metrics()
for sample in metrics["dragonfly_commands_processed"].samples:
match_label_value(sample, "main", lambda v: v > 0)
# memcached listener processes command as other
match_label_value(sample, "other", lambda v: v > 0)
for sample in metrics["dragonfly_connected_clients"].samples:
match_label_value(sample, "main", lambda v: v == 2)
match_label_value(sample, "other", lambda v: v == 1)