chore: optimize info command (#4137)

* chore: optimize info command

    Info command has a large latency when returning all the sections.
    But often a single section is required. Specifically,
    SERVER and REPLICATION sections are often fetched by clients
    or management components.

    This PR:
    1. Removes any hops for "INFO SERVER" command.
    2. Removes some redundant stats.
    3. Prints latency stats around GetMetrics command if it took to much.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* Update src/server/server_family.cc

Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
Signed-off-by: Roman Gershman <romange@gmail.com>

* chore: remove GetMetrics dependency from the REPLICATION section

Also, address comments

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* fix: clang build

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Signed-off-by: Roman Gershman <romange@gmail.com>
Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
Roman Gershman 2024-11-17 13:33:29 +02:00 committed by GitHub
parent 8e3b8ccbe3
commit 8bd2b9ed3e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 69 additions and 62 deletions

View file

@ -630,9 +630,6 @@ void Connection::OnPostMigrateThread() {
stats_ = &tl_facade_stats->conn_stats; stats_ = &tl_facade_stats->conn_stats;
++stats_->num_conns; ++stats_->num_conns;
stats_->read_buf_capacity += io_buf_.Capacity(); stats_->read_buf_capacity += io_buf_.Capacity();
if (cc_->replica_conn) {
++stats_->num_replicas;
}
} }
void Connection::OnConnectionStart() { void Connection::OnConnectionStart() {
@ -1840,10 +1837,6 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const {
void Connection::DecreaseStatsOnClose() { void Connection::DecreaseStatsOnClose() {
stats_->read_buf_capacity -= io_buf_.Capacity(); stats_->read_buf_capacity -= io_buf_.Capacity();
// Update num_replicas if this was a replica connection.
if (cc_->replica_conn) {
--stats_->num_replicas;
}
--stats_->num_conns; --stats_->num_conns;
} }

View file

@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct. // To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 120u); static_assert(kSizeConnStats == 112u);
ADD(read_buf_capacity); ADD(read_buf_capacity);
ADD(dispatch_queue_entries); ADD(dispatch_queue_entries);
@ -34,7 +34,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(pipelined_cmd_latency); ADD(pipelined_cmd_latency);
ADD(conn_received_cnt); ADD(conn_received_cnt);
ADD(num_conns); ADD(num_conns);
ADD(num_replicas);
ADD(num_blocked_clients); ADD(num_blocked_clients);
ADD(num_migrations); ADD(num_migrations);
ADD(pipeline_throttle_count); ADD(pipeline_throttle_count);

View file

@ -105,7 +105,6 @@ struct ConnectionStats {
uint64_t conn_received_cnt = 0; uint64_t conn_received_cnt = 0;
uint32_t num_conns = 0; uint32_t num_conns = 0;
uint32_t num_replicas = 0;
uint32_t num_blocked_clients = 0; uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0; uint64_t num_migrations = 0;

View file

@ -47,7 +47,7 @@ using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class CommandId; class CommandId;
class Transaction; class Transaction;
class EngineShard; class EngineShard;
class ConnectionState; struct ConnectionState;
class Interpreter; class Interpreter;
class Namespaces; class Namespaces;

View file

@ -1275,15 +1275,17 @@ void AppendMetricWithoutLabels(string_view name, string_view help, const absl::A
AppendMetricValue(name, value, {}, {}, dest); AppendMetricValue(name, value, {}, {}, dest);
} }
void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* resp) { void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd,
StringResponse* resp) {
// Server metrics // Server metrics
AppendMetricHeader("version", "", MetricType::GAUGE, &resp->body()); AppendMetricHeader("version", "", MetricType::GAUGE, &resp->body());
AppendMetricValue("version", 1, {"version"}, {GetVersion()}, &resp->body()); AppendMetricValue("version", 1, {"version"}, {GetVersion()}, &resp->body());
bool is_master = ServerState::tlocal()->is_master; bool is_master = ServerState::tlocal()->is_master;
AppendMetricWithoutLabels("master", "1 if master 0 if replica", is_master ? 1 : 0, AppendMetricWithoutLabels("master", "1 if master 0 if replica", is_master ? 1 : 0,
MetricType::GAUGE, &resp->body()); MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("uptime_in_seconds", "", m.uptime, MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("uptime_in_seconds", "", uptime, MetricType::COUNTER, &resp->body());
// Clients metrics // Clients metrics
const auto& conn_stats = m.facade_stats.conn_stats; const auto& conn_stats = m.facade_stats.conn_stats;
@ -1377,15 +1379,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
if (added) if (added)
absl::StrAppend(&resp->body(), type_used_memory_metric); absl::StrAppend(&resp->body(), type_used_memory_metric);
} }
if (!m.master_side_replicas_info.empty()) {
ReplicationMemoryStats repl_mem;
dfly_cmd->GetReplicationMemoryStats(&repl_mem);
AppendMetricWithoutLabels(
"replication_streaming_bytes", "Stable sync replication memory usage",
repl_mem.streamer_buf_capacity_bytes, MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("replication_full_sync_bytes", "Full sync memory usage",
repl_mem.full_sync_buf_bytes, MetricType::GAUGE, &resp->body());
}
// Stats metrics // Stats metrics
AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt, AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt,
@ -1458,11 +1451,25 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
absl::StrAppend(&resp->body(), command_metrics); absl::StrAppend(&resp->body(), command_metrics);
} }
if (!m.master_side_replicas_info.empty()) { if (m.replica_side_info) { // slave side
auto& replica_info = *m.replica_side_info;
AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects",
replica_info.reconnect_count, MetricType::COUNTER, &resp->body());
} else { // Master side
string replication_lag_metrics; string replication_lag_metrics;
vector<ReplicaRoleInfo> replicas_info = dfly_cmd->GetReplicasRoleInfo();
ReplicationMemoryStats repl_mem;
dfly_cmd->GetReplicationMemoryStats(&repl_mem);
AppendMetricWithoutLabels(
"replication_streaming_bytes", "Stable sync replication memory usage",
repl_mem.streamer_buf_capacity_bytes, MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("replication_full_sync_bytes", "Full sync memory usage",
repl_mem.full_sync_buf_bytes, MetricType::GAUGE, &resp->body());
AppendMetricHeader("connected_replica_lag_records", "Lag in records of a connected replica.", AppendMetricHeader("connected_replica_lag_records", "Lag in records of a connected replica.",
MetricType::GAUGE, &replication_lag_metrics); MetricType::GAUGE, &replication_lag_metrics);
for (const auto& replica : m.master_side_replicas_info) { for (const auto& replica : replicas_info) {
AppendMetricValue("connected_replica_lag_records", replica.lsn_lag, AppendMetricValue("connected_replica_lag_records", replica.lsn_lag,
{"replica_ip", "replica_port", "replica_state"}, {"replica_ip", "replica_port", "replica_state"},
{replica.address, absl::StrCat(replica.listening_port), replica.state}, {replica.address, absl::StrCat(replica.listening_port), replica.state},
@ -1471,12 +1478,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
absl::StrAppend(&resp->body(), replication_lag_metrics); absl::StrAppend(&resp->body(), replication_lag_metrics);
} }
if (m.replica_side_info) {
auto& replica_info = *m.replica_side_info;
AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects",
replica_info.reconnect_count, MetricType::COUNTER, &resp->body());
}
AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER, AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER,
&resp->body()); &resp->body());
double delay_seconds = m.fiber_switch_delay_usec * 1e-6; double delay_seconds = m.fiber_switch_delay_usec * 1e-6;
@ -1533,7 +1534,8 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) {
auto cb = [this](const util::http::QueryArgs& args, util::HttpContext* send) { auto cb = [this](const util::http::QueryArgs& args, util::HttpContext* send) {
StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok); StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok);
PrintPrometheusMetrics(this->GetMetrics(&namespaces->GetDefaultNamespace()), uint64_t uptime = time(NULL) - start_time_;
PrintPrometheusMetrics(uptime, this->GetMetrics(&namespaces->GetDefaultNamespace()),
this->dfly_cmd_.get(), &resp); this->dfly_cmd_.get(), &resp);
return send->Invoke(std::move(resp)); return send->Invoke(std::move(resp));
@ -1607,14 +1609,17 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder)
double utime = dbl_time(ru.ru_utime); double utime = dbl_time(ru.ru_utime);
double systime = dbl_time(ru.ru_stime); double systime = dbl_time(ru.ru_stime);
auto kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
Metrics m = GetMetrics(&namespaces->GetDefaultNamespace()); Metrics m = GetMetrics(&namespaces->GetDefaultNamespace());
uint64_t uptime = time(NULL) - start_time_;
ADD_LINE(pid, getpid()); ADD_LINE(pid, getpid());
ADD_LINE(uptime, m.uptime); ADD_LINE(uptime, uptime);
ADD_LINE(time, now); ADD_LINE(time, now);
ADD_LINE(version, kGitTag); ADD_LINE(version, kGitTag);
ADD_LINE(libevent, "iouring"); ADD_LINE(libevent, multiplex_api);
ADD_LINE(pointer_size, sizeof(void*)); ADD_LINE(pointer_size, sizeof(void*));
ADD_LINE(rusage_user, utime); ADD_LINE(rusage_user, utime);
ADD_LINE(rusage_system, systime); ADD_LINE(rusage_system, systime);
@ -2083,7 +2088,7 @@ static void MergeDbSliceStats(const DbSlice::Stats& src, Metrics* dest) {
void ServerFamily::ResetStat(Namespace* ns) { void ServerFamily::ResetStat(Namespace* ns) {
shard_set->pool()->AwaitBrief( shard_set->pool()->AwaitBrief(
[registry = service_.mutable_registry(), this, ns](unsigned index, auto*) { [registry = service_.mutable_registry(), ns](unsigned index, auto*) {
registry->ResetCallStats(index); registry->ResetCallStats(index);
ns->GetCurrentDbSlice().ResetEvents(); ns->GetCurrentDbSlice().ResetEvents();
facade::ResetStats(); facade::ResetStats();
@ -2095,6 +2100,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
Metrics result; Metrics result;
util::fb2::Mutex mu; util::fb2::Mutex mu;
uint64_t start = absl::GetCurrentTimeNanos();
auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) { auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) {
auto& [calls, sum] = dest[absl::AsciiStrToLower(name)]; auto& [calls, sum] = dest[absl::AsciiStrToLower(name)];
calls += stat.first; calls += stat.first;
@ -2117,7 +2124,6 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
result.coordinator_stats.Add(ss->stats); result.coordinator_stats.Add(ss->stats);
result.uptime = time(NULL) - this->start_time_;
result.qps += uint64_t(ss->MovingSum6()); result.qps += uint64_t(ss->MovingSum6());
result.facade_stats += *tl_facade_stats; result.facade_stats += *tl_facade_stats;
result.serialization_bytes += SliceSnapshot::GetThreadLocalMemoryUsage(); result.serialization_bytes += SliceSnapshot::GetThreadLocalMemoryUsage();
@ -2156,15 +2162,16 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
service_.proactor_pool().AwaitFiberOnAll(std::move(cb)); service_.proactor_pool().AwaitFiberOnAll(std::move(cb));
uint64_t after_cb = absl::GetCurrentTimeNanos();
// Normalize moving average stats // Normalize moving average stats
result.qps /= 6; result.qps /= 6;
result.traverse_ttl_per_sec /= 6; result.traverse_ttl_per_sec /= 6;
result.delete_ttl_per_sec /= 6; result.delete_ttl_per_sec /= 6;
bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master; bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master;
if (is_master) {
result.master_side_replicas_info = dfly_cmd_->GetReplicasRoleInfo(); if (!is_master) {
} else {
auto info = GetReplicaSummary(); auto info = GetReplicaSummary();
if (info) { if (info) {
result.replica_side_info = { result.replica_side_info = {
@ -2187,6 +2194,12 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
result.peak_stats = peak_stats_; result.peak_stats = peak_stats_;
uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1'000'000;
if (delta_ms > 30) {
uint64_t cb_dur = (after_cb - start) / 1'000'000;
LOG(INFO) << "GetMetrics took " << delta_ms << " ms, out of which callback took " << cb_dur
<< " ms";
}
return result; return result;
} }
@ -2217,15 +2230,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
absl::StrAppend(&info, a1, ":", a2, "\r\n"); absl::StrAppend(&info, a1, ":", a2, "\r\n");
}; };
uint64_t start = absl::GetCurrentTimeNanos();
Metrics m = GetMetrics(cntx->ns);
uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1000'000;
LOG_IF(INFO, delta_ms > 100) << "GetMetrics took " << delta_ms << " ms";
DbStats total;
for (const auto& db_stats : m.db_stats)
total += db_stats;
if (should_enter("SERVER")) { if (should_enter("SERVER")) {
auto kind = ProactorBase::me()->GetKind(); auto kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
@ -2238,11 +2242,22 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
append("multiplexing_api", multiplex_api); append("multiplexing_api", multiplex_api);
append("tcp_port", GetFlag(FLAGS_port)); append("tcp_port", GetFlag(FLAGS_port));
append("thread_count", service_.proactor_pool().size()); append("thread_count", service_.proactor_pool().size());
size_t uptime = m.uptime;
uint64_t uptime = time(NULL) - start_time_;
append("uptime_in_seconds", uptime); append("uptime_in_seconds", uptime);
append("uptime_in_days", uptime / (3600 * 24)); append("uptime_in_days", uptime / (3600 * 24));
} }
Metrics m;
// Save time by not calculating metrics if we don't need them.
if (!(section == "SERVER" || section == "REPLICATION")) {
m = GetMetrics(cntx->ns);
}
DbStats total;
for (const auto& db_stats : m.db_stats)
total += db_stats;
if (should_enter("CLIENTS")) { if (should_enter("CLIENTS")) {
append("connected_clients", m.facade_stats.conn_stats.num_conns); append("connected_clients", m.facade_stats.conn_stats.num_conns);
append("max_clients", GetFlag(FLAGS_maxclients)); append("max_clients", GetFlag(FLAGS_maxclients));
@ -2310,7 +2325,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
append("maxmemory_policy", "noeviction"); append("maxmemory_policy", "noeviction");
} }
if (!m.master_side_replicas_info.empty()) { if (!m.replica_side_info) { // master
ReplicationMemoryStats repl_mem; ReplicationMemoryStats repl_mem;
dfly_cmd_->GetReplicationMemoryStats(&repl_mem); dfly_cmd_->GetReplicationMemoryStats(&repl_mem);
append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes); append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes);
@ -2472,17 +2487,23 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
} }
if (should_enter("REPLICATION")) { if (should_enter("REPLICATION")) {
util::fb2::LockGuard lk(replicaof_mu_); bool is_master = true;
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and // ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
// insufficient in this scenario. // insufficient in this scenario.
if (!replica_) { // Please note that we we do not use Metrics object here.
{
fb2::LockGuard lk(replicaof_mu_);
is_master = !replica_;
}
if (is_master) {
vector<ReplicaRoleInfo> replicas_info = dfly_cmd_->GetReplicasRoleInfo();
append("role", "master"); append("role", "master");
append("connected_slaves", m.facade_stats.conn_stats.num_replicas); append("connected_slaves", replicas_info.size());
const auto& replicas = m.master_side_replicas_info;
for (size_t i = 0; i < replicas.size(); i++) { for (size_t i = 0; i < replicas_info.size(); i++) {
auto& r = replicas[i]; auto& r = replicas_info[i];
// e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync // e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync
append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port, append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port,
",state=", r.state, ",lag=", r.lsn_lag)); ",state=", r.state, ",lag=", r.lsn_lag));
@ -2505,6 +2526,8 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
append("slave_priority", GetFlag(FLAGS_replica_priority)); append("slave_priority", GetFlag(FLAGS_replica_priority));
append("slave_read_only", 1); append("slave_read_only", 1);
}; };
fb2::LockGuard lk(replicaof_mu_);
replication_info_cb(replica_->GetSummary()); replication_info_cb(replica_->GetSummary());
// Special case, when multiple masters replicate to a single replica. // Special case, when multiple masters replicate to a single replica.
@ -2895,9 +2918,6 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder*
string sync_id = absl::StrCat("SYNC", sid); string sync_id = absl::StrCat("SYNC", sid);
cntx->conn_state.replication_info.repl_session_id = sid; cntx->conn_state.replication_info.repl_session_id = sid;
if (!cntx->replica_conn) {
ServerState::tl_connection_stats()->num_replicas += 1;
}
cntx->replica_conn = true; cntx->replica_conn = true;
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> <version> // The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> <version>

View file

@ -89,7 +89,6 @@ struct Metrics {
ServerState::Stats coordinator_stats; // stats on transaction running ServerState::Stats coordinator_stats; // stats on transaction running
PeakStats peak_stats; PeakStats peak_stats;
size_t uptime = 0;
size_t qps = 0; size_t qps = 0;
size_t heap_used_bytes = 0; size_t heap_used_bytes = 0;
@ -119,9 +118,6 @@ struct Metrics {
absl::flat_hash_map<std::string, uint64_t> connections_lib_name_ver_map; absl::flat_hash_map<std::string, uint64_t> connections_lib_name_ver_map;
// Replica info on the master side.
std::vector<ReplicaRoleInfo> master_side_replicas_info;
struct ReplicaInfo { struct ReplicaInfo {
uint32_t reconnect_count; uint32_t reconnect_count;
}; };