From 41508f241af17da2e95b99edb4f6b459c4a968be Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 5 May 2025 12:03:49 +0300 Subject: [PATCH 1/2] chore: remove global current_reply_size Before - we used a global atomic var current_reply_size that was constantly updated by all threads when reply buffers grew for squashing. Now, we use per thread variables. The disadvantage - the metric is less precise because we first allocate buffers in a shard thread but update the thread local metrics in the coordinator threads. I think the tradeoff is fair and the fact that the metric is updated with delay is not crucial. Signed-off-by: Roman Gershman --- src/server/multi_command_squasher.cc | 20 ++++++++++++-------- src/server/multi_command_squasher.h | 8 +------- src/server/server_family.cc | 4 ++-- src/server/server_state.cc | 3 ++- src/server/server_state.h | 1 + 5 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 0a9369f3f..2bead3b3e 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -62,8 +62,6 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { } // namespace -atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; - MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* service, const Opts& opts) : cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr}, opts_{opts} { @@ -178,8 +176,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); dispatched.reply = crb.Take(); - current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); - + sinfo.total_reply_size += Size(dispatched.reply); continue; } } @@ -191,7 +188,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx); dispatched.reply = crb.Take(); - current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); + sinfo.total_reply_size += Size(dispatched.reply); // Assert commands made no persistent state changes to stub context state const auto& local_state = local_cntx.conn_state; @@ -246,6 +243,13 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { uint64_t after_hop = proactor->GetMonotonicTimeNs(); bool aborted = false; + ServerState* fresh_ss = ServerState::SafeTLocal(); + + size_t total_reply_size = 0; + for (auto& sinfo : sharded_) { + total_reply_size += sinfo.total_reply_size; + } + fresh_ss->stats.current_reply_size += total_reply_size; for (auto idx : order_) { auto& sinfo = sharded_[idx]; DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size()); @@ -253,14 +257,14 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { auto& reply = sinfo.dispatched[sinfo.reply_id++].reply; aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply); - current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed); CapturingReplyBuilder::Apply(std::move(reply), rb); if (aborted) break; } uint64_t after_reply = proactor->GetMonotonicTimeNs(); - ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; - ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; + fresh_ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; + fresh_ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; + fresh_ss->stats.current_reply_size -= total_reply_size; for (auto& sinfo : sharded_) { sinfo.dispatched.clear(); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 9aea56880..1d5afa437 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -34,10 +34,6 @@ class MultiCommandSquasher { return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb); } - static size_t GetRepliesMemSize() { - return current_reply_size_.load(std::memory_order_relaxed); - } - private: // Per-shard execution info. struct ShardExecInfo { @@ -50,6 +46,7 @@ class MultiCommandSquasher { }; std::vector dispatched; // Dispatched commands unsigned reply_id = 0; + size_t total_reply_size = 0; // Total size of replies boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard }; @@ -94,9 +91,6 @@ class MultiCommandSquasher { size_t num_shards_ = 0; std::vector tmp_keylist_; - - // we increase size in one thread and decrease in another - static atomic_uint64_t current_reply_size_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 9c7129cdf..f81e6f799 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1329,7 +1329,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("commands_squashing_replies_bytes", "", - MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE, + m.coordinator_stats.current_reply_size, MetricType::GAUGE, &resp->body()); string connections_libs; AppendMetricHeader("connections_libs", "Total number of connections by libname:ver", @@ -2468,7 +2468,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity); append("tls_bytes", m.tls_bytes); append("snapshot_serialization_bytes", m.serialization_bytes); - append("commands_squashing_replies_bytes", MultiCommandSquasher::GetRepliesMemSize()); + append("commands_squashing_replies_bytes", m.coordinator_stats.current_reply_size); if (GetFlag(FLAGS_cache_mode)) { append("cache_mode", "cache"); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 34344ab77..6bf9435c9 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -37,7 +37,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 20 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 21 * 8, "Stats size mismatch"); #define ADD(x) this->x += (other.x) @@ -64,6 +64,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { ADD(compressed_blobs); ADD(oom_error_cmd_cnt); + ADD(current_reply_size); ADD(conn_timeout_events); if (this->tx_width_freq_arr.size() > 0) { DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size()); diff --git a/src/server/server_state.h b/src/server/server_state.h index 79a8ec885..071c5607f 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -132,6 +132,7 @@ class ServerState { // public struct - to allow initialization. // Number of times we rejected command dispatch due to OOM condition. uint64_t oom_error_cmd_cnt = 0; + size_t current_reply_size = 0; uint32_t conn_timeout_events = 0; std::valarray tx_width_freq_arr; From 23ef7b72e5a9a044130b1327313b80d6e58eaaf1 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 5 May 2025 14:43:09 +0300 Subject: [PATCH 2/2] chore: track squash reply allocation as early as they are created Before - we used a global atomic var current_reply_size that was constantly updated by all threads when reply buffers grew for squashing. Now, we use per thread atomic variables that track reply buffer size for the I/O thread that issues squashing. The shard threads contend less because they update multiple atomic variables. Moreover, now we can adjust IsPipelineBufferOverLimit to take into account squashing_current_reply_size as well. Signed-off-by: Roman Gershman --- src/facade/facade.cc | 26 ++++++++++++++++++-- src/facade/facade_types.h | 6 +++++ src/server/multi_command_squasher.cc | 36 ++++++++++++++++++---------- src/server/multi_command_squasher.h | 4 +++- src/server/server_family.cc | 10 ++++---- src/server/server_state.cc | 3 +-- src/server/server_state.h | 1 - 7 files changed, 64 insertions(+), 22 deletions(-) diff --git a/src/facade/facade.cc b/src/facade/facade.cc index b1b560bf2..9af13bf4f 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -44,8 +44,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { return *this; } +ReplyStats::ReplyStats(ReplyStats&& other) noexcept { + *this = other; +} + ReplyStats& ReplyStats::operator+=(const ReplyStats& o) { - static_assert(sizeof(ReplyStats) == 72u + kSanitizerOverhead); + static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead); ADD(io_write_cnt); ADD(io_write_bytes); @@ -56,12 +60,30 @@ ReplyStats& ReplyStats::operator+=(const ReplyStats& o) { ADD(script_error_count); send_stats += o.send_stats; - + squashing_current_reply_size.fetch_add(o.squashing_current_reply_size.load(memory_order_relaxed), + memory_order_relaxed); return *this; } #undef ADD +ReplyStats& ReplyStats::operator=(const ReplyStats& o) { + static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead); + + if (this == &o) { + return *this; + } + + send_stats = o.send_stats; + io_write_cnt = o.io_write_cnt; + io_write_bytes = o.io_write_bytes; + err_count = o.err_count; + script_error_count = o.script_error_count; + squashing_current_reply_size.store(o.squashing_current_reply_size.load(memory_order_relaxed), + memory_order_relaxed); + return *this; +} + string WrongNumArgsError(string_view cmd) { return absl::StrCat("wrong number of arguments for '", absl::AsciiStrToLower(cmd), "' command"); } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 70f0feea5..0a77f2743 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -140,7 +140,13 @@ struct ReplyStats { absl::flat_hash_map err_count; size_t script_error_count = 0; + // This variable can be updated directly from shard threads when they allocate memory for replies. + std::atomic squashing_current_reply_size{0}; + + ReplyStats() = default; + ReplyStats(ReplyStats&& other) noexcept; ReplyStats& operator+=(const ReplyStats& other); + ReplyStats& operator=(const ReplyStats& other); }; struct FacadeStats { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 2bead3b3e..726210e1e 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -30,8 +30,8 @@ void CheckConnStateClean(const ConnectionState& state) { DCHECK(!state.subscribe_info); } -size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { - size_t payload_size = sizeof(facade::CapturingReplyBuilder::Payload); +size_t Size(const CapturingReplyBuilder::Payload& payload) { + size_t payload_size = sizeof(CapturingReplyBuilder::Payload); return visit( Overloaded{ [&](monostate) { return payload_size; }, @@ -71,8 +71,12 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio } MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) { - if (sharded_.empty()) + if (sharded_.empty()) { sharded_.resize(shard_set->size()); + for (size_t i = 0; i < sharded_.size(); i++) { + sharded_[i].reply_size_total_ptr = &tl_facade_stats->reply_stats.squashing_current_reply_size; + } + } auto& sinfo = sharded_[sid]; if (!sinfo.local_tx) { @@ -133,7 +137,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredC return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } -bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) { +bool MultiCommandSquasher::ExecuteStandalone(RedisReplyBuilder* rb, const StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted auto args = cmd->ArgList(&tmp_keylist_); @@ -161,7 +165,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v DCHECK(!sinfo.dispatched.empty()); auto* local_tx = sinfo.local_tx.get(); - facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); + CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); ConnectionContext local_cntx{cntx_, local_tx}; if (cntx_->conn()) { local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged(); @@ -169,14 +173,21 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v CmdArgVec arg_vec; + auto move_reply = [&sinfo](CapturingReplyBuilder::Payload&& src, + CapturingReplyBuilder::Payload* dst) { + *dst = std::move(src); + size_t sz = Size(*dst); + sinfo.reply_size_delta += sz; + sinfo.reply_size_total_ptr->fetch_add(sz, std::memory_order_relaxed); + }; + for (auto& dispatched : sinfo.dispatched) { auto args = dispatched.cmd->ArgList(&arg_vec); if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); - dispatched.reply = crb.Take(); - sinfo.total_reply_size += Size(dispatched.reply); + move_reply(crb.Take(), &dispatched.reply); continue; } } @@ -187,8 +198,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx); - dispatched.reply = crb.Take(); - sinfo.total_reply_size += Size(dispatched.reply); + move_reply(crb.Take(), &dispatched.reply); // Assert commands made no persistent state changes to stub context state const auto& local_state = local_cntx.conn_state; @@ -247,9 +257,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { size_t total_reply_size = 0; for (auto& sinfo : sharded_) { - total_reply_size += sinfo.total_reply_size; + total_reply_size += sinfo.reply_size_delta; } - fresh_ss->stats.current_reply_size += total_reply_size; + for (auto idx : order_) { auto& sinfo = sharded_[idx]; DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size()); @@ -264,7 +274,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { uint64_t after_reply = proactor->GetMonotonicTimeNs(); fresh_ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; fresh_ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; - fresh_ss->stats.current_reply_size -= total_reply_size; + + tl_facade_stats->reply_stats.squashing_current_reply_size.fetch_sub(total_reply_size, + std::memory_order_release); for (auto& sinfo : sharded_) { sinfo.dispatched.clear(); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 1d5afa437..9505bbae5 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -46,7 +46,9 @@ class MultiCommandSquasher { }; std::vector dispatched; // Dispatched commands unsigned reply_id = 0; - size_t total_reply_size = 0; // Total size of replies + + std::atomic* reply_size_total_ptr; // Total size of replies on the IO thread + size_t reply_size_delta = 0; // Size of replies for this shard boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard }; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f81e6f799..f29e93361 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1328,9 +1328,10 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6, MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("commands_squashing_replies_bytes", "", - m.coordinator_stats.current_reply_size, MetricType::GAUGE, - &resp->body()); + AppendMetricWithoutLabels( + "commands_squashing_replies_bytes", "", + m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed), + MetricType::GAUGE, &resp->body()); string connections_libs; AppendMetricHeader("connections_libs", "Total number of connections by libname:ver", MetricType::GAUGE, &connections_libs); @@ -2468,7 +2469,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity); append("tls_bytes", m.tls_bytes); append("snapshot_serialization_bytes", m.serialization_bytes); - append("commands_squashing_replies_bytes", m.coordinator_stats.current_reply_size); + append("commands_squashing_replies_bytes", + m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed)); if (GetFlag(FLAGS_cache_mode)) { append("cache_mode", "cache"); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 6bf9435c9..34344ab77 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -37,7 +37,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 21 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 20 * 8, "Stats size mismatch"); #define ADD(x) this->x += (other.x) @@ -64,7 +64,6 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { ADD(compressed_blobs); ADD(oom_error_cmd_cnt); - ADD(current_reply_size); ADD(conn_timeout_events); if (this->tx_width_freq_arr.size() > 0) { DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size()); diff --git a/src/server/server_state.h b/src/server/server_state.h index 071c5607f..79a8ec885 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -132,7 +132,6 @@ class ServerState { // public struct - to allow initialization. // Number of times we rejected command dispatch due to OOM condition. uint64_t oom_error_cmd_cnt = 0; - size_t current_reply_size = 0; uint32_t conn_timeout_events = 0; std::valarray tx_width_freq_arr;