diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index eb4ca8c5e..5dcfed328 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -630,9 +630,6 @@ void Connection::OnPostMigrateThread() { stats_ = &tl_facade_stats->conn_stats; ++stats_->num_conns; stats_->read_buf_capacity += io_buf_.Capacity(); - if (cc_->replica_conn) { - ++stats_->num_replicas; - } } void Connection::OnConnectionStart() { @@ -1840,10 +1837,6 @@ Connection::MemoryUsage Connection::GetMemoryUsage() const { void Connection::DecreaseStatsOnClose() { stats_->read_buf_capacity -= io_buf_.Capacity(); - // Update num_replicas if this was a replica connection. - if (cc_->replica_conn) { - --stats_->num_replicas; - } --stats_->num_conns; } diff --git a/src/facade/facade.cc b/src/facade/facade.cc index bf27324b6..1d3501d56 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 120u); + static_assert(kSizeConnStats == 112u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -34,7 +34,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(pipelined_cmd_latency); ADD(conn_received_cnt); ADD(num_conns); - ADD(num_replicas); ADD(num_blocked_clients); ADD(num_migrations); ADD(pipeline_throttle_count); diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 6d7ac4328..7f8b1b6e2 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -105,7 +105,6 @@ struct ConnectionStats { uint64_t conn_received_cnt = 0; uint32_t num_conns = 0; - uint32_t num_replicas = 0; uint32_t num_blocked_clients = 0; uint64_t num_migrations = 0; diff --git a/src/server/common.h b/src/server/common.h index 13e6db336..1772f4065 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -47,7 +47,7 @@ using RdbTypeFreqMap = absl::flat_hash_map; class CommandId; class Transaction; class EngineShard; -class ConnectionState; +struct ConnectionState; class Interpreter; class Namespaces; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 34e8ff150..b5493cdac 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1275,15 +1275,17 @@ void AppendMetricWithoutLabels(string_view name, string_view help, const absl::A AppendMetricValue(name, value, {}, {}, dest); } -void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* resp) { +void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd, + StringResponse* resp) { // Server metrics AppendMetricHeader("version", "", MetricType::GAUGE, &resp->body()); AppendMetricValue("version", 1, {"version"}, {GetVersion()}, &resp->body()); bool is_master = ServerState::tlocal()->is_master; + AppendMetricWithoutLabels("master", "1 if master 0 if replica", is_master ? 1 : 0, MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("uptime_in_seconds", "", m.uptime, MetricType::COUNTER, &resp->body()); + AppendMetricWithoutLabels("uptime_in_seconds", "", uptime, MetricType::COUNTER, &resp->body()); // Clients metrics const auto& conn_stats = m.facade_stats.conn_stats; @@ -1377,15 +1379,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* if (added) absl::StrAppend(&resp->body(), type_used_memory_metric); } - if (!m.master_side_replicas_info.empty()) { - ReplicationMemoryStats repl_mem; - dfly_cmd->GetReplicationMemoryStats(&repl_mem); - AppendMetricWithoutLabels( - "replication_streaming_bytes", "Stable sync replication memory usage", - repl_mem.streamer_buf_capacity_bytes, MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("replication_full_sync_bytes", "Full sync memory usage", - repl_mem.full_sync_buf_bytes, MetricType::GAUGE, &resp->body()); - } // Stats metrics AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt, @@ -1458,11 +1451,25 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* absl::StrAppend(&resp->body(), command_metrics); } - if (!m.master_side_replicas_info.empty()) { + if (m.replica_side_info) { // slave side + auto& replica_info = *m.replica_side_info; + AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects", + replica_info.reconnect_count, MetricType::COUNTER, &resp->body()); + } else { // Master side string replication_lag_metrics; + vector replicas_info = dfly_cmd->GetReplicasRoleInfo(); + + ReplicationMemoryStats repl_mem; + dfly_cmd->GetReplicationMemoryStats(&repl_mem); + AppendMetricWithoutLabels( + "replication_streaming_bytes", "Stable sync replication memory usage", + repl_mem.streamer_buf_capacity_bytes, MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("replication_full_sync_bytes", "Full sync memory usage", + repl_mem.full_sync_buf_bytes, MetricType::GAUGE, &resp->body()); + AppendMetricHeader("connected_replica_lag_records", "Lag in records of a connected replica.", MetricType::GAUGE, &replication_lag_metrics); - for (const auto& replica : m.master_side_replicas_info) { + for (const auto& replica : replicas_info) { AppendMetricValue("connected_replica_lag_records", replica.lsn_lag, {"replica_ip", "replica_port", "replica_state"}, {replica.address, absl::StrCat(replica.listening_port), replica.state}, @@ -1471,12 +1478,6 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* absl::StrAppend(&resp->body(), replication_lag_metrics); } - if (m.replica_side_info) { - auto& replica_info = *m.replica_side_info; - AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects", - replica_info.reconnect_count, MetricType::COUNTER, &resp->body()); - } - AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER, &resp->body()); double delay_seconds = m.fiber_switch_delay_usec * 1e-6; @@ -1533,7 +1534,8 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) { auto cb = [this](const util::http::QueryArgs& args, util::HttpContext* send) { StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok); - PrintPrometheusMetrics(this->GetMetrics(&namespaces->GetDefaultNamespace()), + uint64_t uptime = time(NULL) - start_time_; + PrintPrometheusMetrics(uptime, this->GetMetrics(&namespaces->GetDefaultNamespace()), this->dfly_cmd_.get(), &resp); return send->Invoke(std::move(resp)); @@ -1607,14 +1609,17 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder) double utime = dbl_time(ru.ru_utime); double systime = dbl_time(ru.ru_stime); + auto kind = ProactorBase::me()->GetKind(); + const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; Metrics m = GetMetrics(&namespaces->GetDefaultNamespace()); + uint64_t uptime = time(NULL) - start_time_; ADD_LINE(pid, getpid()); - ADD_LINE(uptime, m.uptime); + ADD_LINE(uptime, uptime); ADD_LINE(time, now); ADD_LINE(version, kGitTag); - ADD_LINE(libevent, "iouring"); + ADD_LINE(libevent, multiplex_api); ADD_LINE(pointer_size, sizeof(void*)); ADD_LINE(rusage_user, utime); ADD_LINE(rusage_system, systime); @@ -2083,7 +2088,7 @@ static void MergeDbSliceStats(const DbSlice::Stats& src, Metrics* dest) { void ServerFamily::ResetStat(Namespace* ns) { shard_set->pool()->AwaitBrief( - [registry = service_.mutable_registry(), this, ns](unsigned index, auto*) { + [registry = service_.mutable_registry(), ns](unsigned index, auto*) { registry->ResetCallStats(index); ns->GetCurrentDbSlice().ResetEvents(); facade::ResetStats(); @@ -2095,6 +2100,8 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { Metrics result; util::fb2::Mutex mu; + uint64_t start = absl::GetCurrentTimeNanos(); + auto cmd_stat_cb = [&dest = result.cmd_stats_map](string_view name, const CmdCallStats& stat) { auto& [calls, sum] = dest[absl::AsciiStrToLower(name)]; calls += stat.first; @@ -2117,7 +2124,6 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.coordinator_stats.Add(ss->stats); - result.uptime = time(NULL) - this->start_time_; result.qps += uint64_t(ss->MovingSum6()); result.facade_stats += *tl_facade_stats; result.serialization_bytes += SliceSnapshot::GetThreadLocalMemoryUsage(); @@ -2156,15 +2162,16 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { service_.proactor_pool().AwaitFiberOnAll(std::move(cb)); + uint64_t after_cb = absl::GetCurrentTimeNanos(); + // Normalize moving average stats result.qps /= 6; result.traverse_ttl_per_sec /= 6; result.delete_ttl_per_sec /= 6; bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master; - if (is_master) { - result.master_side_replicas_info = dfly_cmd_->GetReplicasRoleInfo(); - } else { + + if (!is_master) { auto info = GetReplicaSummary(); if (info) { result.replica_side_info = { @@ -2187,6 +2194,12 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.peak_stats = peak_stats_; + uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1'000'000; + if (delta_ms > 30) { + uint64_t cb_dur = (after_cb - start) / 1'000'000; + LOG(INFO) << "GetMetrics took " << delta_ms << " ms, out of which callback took " << cb_dur + << " ms"; + } return result; } @@ -2217,15 +2230,6 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil absl::StrAppend(&info, a1, ":", a2, "\r\n"); }; - uint64_t start = absl::GetCurrentTimeNanos(); - Metrics m = GetMetrics(cntx->ns); - uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1000'000; - LOG_IF(INFO, delta_ms > 100) << "GetMetrics took " << delta_ms << " ms"; - - DbStats total; - for (const auto& db_stats : m.db_stats) - total += db_stats; - if (should_enter("SERVER")) { auto kind = ProactorBase::me()->GetKind(); const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; @@ -2238,11 +2242,22 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("multiplexing_api", multiplex_api); append("tcp_port", GetFlag(FLAGS_port)); append("thread_count", service_.proactor_pool().size()); - size_t uptime = m.uptime; + + uint64_t uptime = time(NULL) - start_time_; append("uptime_in_seconds", uptime); append("uptime_in_days", uptime / (3600 * 24)); } + Metrics m; + // Save time by not calculating metrics if we don't need them. + if (!(section == "SERVER" || section == "REPLICATION")) { + m = GetMetrics(cntx->ns); + } + + DbStats total; + for (const auto& db_stats : m.db_stats) + total += db_stats; + if (should_enter("CLIENTS")) { append("connected_clients", m.facade_stats.conn_stats.num_conns); append("max_clients", GetFlag(FLAGS_maxclients)); @@ -2310,7 +2325,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("maxmemory_policy", "noeviction"); } - if (!m.master_side_replicas_info.empty()) { + if (!m.replica_side_info) { // master ReplicationMemoryStats repl_mem; dfly_cmd_->GetReplicationMemoryStats(&repl_mem); append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes); @@ -2472,17 +2487,23 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } if (should_enter("REPLICATION")) { - util::fb2::LockGuard lk(replicaof_mu_); + bool is_master = true; // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and // accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is // insufficient in this scenario. - if (!replica_) { + // Please note that we we do not use Metrics object here. + { + fb2::LockGuard lk(replicaof_mu_); + is_master = !replica_; + } + if (is_master) { + vector replicas_info = dfly_cmd_->GetReplicasRoleInfo(); append("role", "master"); - append("connected_slaves", m.facade_stats.conn_stats.num_replicas); - const auto& replicas = m.master_side_replicas_info; - for (size_t i = 0; i < replicas.size(); i++) { - auto& r = replicas[i]; + append("connected_slaves", replicas_info.size()); + + for (size_t i = 0; i < replicas_info.size(); i++) { + auto& r = replicas_info[i]; // e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync append(StrCat("slave", i), StrCat("ip=", r.address, ",port=", r.listening_port, ",state=", r.state, ",lag=", r.lsn_lag)); @@ -2505,6 +2526,8 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("slave_priority", GetFlag(FLAGS_replica_priority)); append("slave_read_only", 1); }; + fb2::LockGuard lk(replicaof_mu_); + replication_info_cb(replica_->GetSummary()); // Special case, when multiple masters replicate to a single replica. @@ -2895,9 +2918,6 @@ void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* string sync_id = absl::StrCat("SYNC", sid); cntx->conn_state.replication_info.repl_session_id = sid; - if (!cntx->replica_conn) { - ServerState::tl_connection_stats()->num_replicas += 1; - } cntx->replica_conn = true; // The response for 'capa dragonfly' is: diff --git a/src/server/server_family.h b/src/server/server_family.h index dd34d5261..92f47d286 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -89,7 +89,6 @@ struct Metrics { ServerState::Stats coordinator_stats; // stats on transaction running PeakStats peak_stats; - size_t uptime = 0; size_t qps = 0; size_t heap_used_bytes = 0; @@ -119,9 +118,6 @@ struct Metrics { absl::flat_hash_map connections_lib_name_ver_map; - // Replica info on the master side. - std::vector master_side_replicas_info; - struct ReplicaInfo { uint32_t reconnect_count; };