diff --git a/src/facade/facade.cc b/src/facade/facade.cc index b1b560bf2..9af13bf4f 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -44,8 +44,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { return *this; } +ReplyStats::ReplyStats(ReplyStats&& other) noexcept { + *this = other; +} + ReplyStats& ReplyStats::operator+=(const ReplyStats& o) { - static_assert(sizeof(ReplyStats) == 72u + kSanitizerOverhead); + static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead); ADD(io_write_cnt); ADD(io_write_bytes); @@ -56,12 +60,30 @@ ReplyStats& ReplyStats::operator+=(const ReplyStats& o) { ADD(script_error_count); send_stats += o.send_stats; - + squashing_current_reply_size.fetch_add(o.squashing_current_reply_size.load(memory_order_relaxed), + memory_order_relaxed); return *this; } #undef ADD +ReplyStats& ReplyStats::operator=(const ReplyStats& o) { + static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead); + + if (this == &o) { + return *this; + } + + send_stats = o.send_stats; + io_write_cnt = o.io_write_cnt; + io_write_bytes = o.io_write_bytes; + err_count = o.err_count; + script_error_count = o.script_error_count; + squashing_current_reply_size.store(o.squashing_current_reply_size.load(memory_order_relaxed), + memory_order_relaxed); + return *this; +} + string WrongNumArgsError(string_view cmd) { return absl::StrCat("wrong number of arguments for '", absl::AsciiStrToLower(cmd), "' command"); } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 70f0feea5..0a77f2743 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -140,7 +140,13 @@ struct ReplyStats { absl::flat_hash_map err_count; size_t script_error_count = 0; + // This variable can be updated directly from shard threads when they allocate memory for replies. + std::atomic squashing_current_reply_size{0}; + + ReplyStats() = default; + ReplyStats(ReplyStats&& other) noexcept; ReplyStats& operator+=(const ReplyStats& other); + ReplyStats& operator=(const ReplyStats& other); }; struct FacadeStats { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 0a9369f3f..726210e1e 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -30,8 +30,8 @@ void CheckConnStateClean(const ConnectionState& state) { DCHECK(!state.subscribe_info); } -size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { - size_t payload_size = sizeof(facade::CapturingReplyBuilder::Payload); +size_t Size(const CapturingReplyBuilder::Payload& payload) { + size_t payload_size = sizeof(CapturingReplyBuilder::Payload); return visit( Overloaded{ [&](monostate) { return payload_size; }, @@ -62,8 +62,6 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { } // namespace -atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; - MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* service, const Opts& opts) : cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr}, opts_{opts} { @@ -73,8 +71,12 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio } MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) { - if (sharded_.empty()) + if (sharded_.empty()) { sharded_.resize(shard_set->size()); + for (size_t i = 0; i < sharded_.size(); i++) { + sharded_[i].reply_size_total_ptr = &tl_facade_stats->reply_stats.squashing_current_reply_size; + } + } auto& sinfo = sharded_[sid]; if (!sinfo.local_tx) { @@ -135,7 +137,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredC return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } -bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) { +bool MultiCommandSquasher::ExecuteStandalone(RedisReplyBuilder* rb, const StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted auto args = cmd->ArgList(&tmp_keylist_); @@ -163,7 +165,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v DCHECK(!sinfo.dispatched.empty()); auto* local_tx = sinfo.local_tx.get(); - facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); + CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); ConnectionContext local_cntx{cntx_, local_tx}; if (cntx_->conn()) { local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged(); @@ -171,15 +173,21 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v CmdArgVec arg_vec; + auto move_reply = [&sinfo](CapturingReplyBuilder::Payload&& src, + CapturingReplyBuilder::Payload* dst) { + *dst = std::move(src); + size_t sz = Size(*dst); + sinfo.reply_size_delta += sz; + sinfo.reply_size_total_ptr->fetch_add(sz, std::memory_order_relaxed); + }; + for (auto& dispatched : sinfo.dispatched) { auto args = dispatched.cmd->ArgList(&arg_vec); if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); - dispatched.reply = crb.Take(); - current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); - + move_reply(crb.Take(), &dispatched.reply); continue; } } @@ -190,8 +198,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx); - dispatched.reply = crb.Take(); - current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); + move_reply(crb.Take(), &dispatched.reply); // Assert commands made no persistent state changes to stub context state const auto& local_state = local_cntx.conn_state; @@ -246,6 +253,13 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { uint64_t after_hop = proactor->GetMonotonicTimeNs(); bool aborted = false; + ServerState* fresh_ss = ServerState::SafeTLocal(); + + size_t total_reply_size = 0; + for (auto& sinfo : sharded_) { + total_reply_size += sinfo.reply_size_delta; + } + for (auto idx : order_) { auto& sinfo = sharded_[idx]; DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size()); @@ -253,14 +267,16 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { auto& reply = sinfo.dispatched[sinfo.reply_id++].reply; aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply); - current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed); CapturingReplyBuilder::Apply(std::move(reply), rb); if (aborted) break; } uint64_t after_reply = proactor->GetMonotonicTimeNs(); - ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; - ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; + fresh_ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; + fresh_ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; + + tl_facade_stats->reply_stats.squashing_current_reply_size.fetch_sub(total_reply_size, + std::memory_order_release); for (auto& sinfo : sharded_) { sinfo.dispatched.clear(); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 9aea56880..9505bbae5 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -34,10 +34,6 @@ class MultiCommandSquasher { return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb); } - static size_t GetRepliesMemSize() { - return current_reply_size_.load(std::memory_order_relaxed); - } - private: // Per-shard execution info. struct ShardExecInfo { @@ -50,6 +46,9 @@ class MultiCommandSquasher { }; std::vector dispatched; // Dispatched commands unsigned reply_id = 0; + + std::atomic* reply_size_total_ptr; // Total size of replies on the IO thread + size_t reply_size_delta = 0; // Size of replies for this shard boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard }; @@ -94,9 +93,6 @@ class MultiCommandSquasher { size_t num_shards_ = 0; std::vector tmp_keylist_; - - // we increase size in one thread and decrease in another - static atomic_uint64_t current_reply_size_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index bbd9d09b7..bf34ac940 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1328,9 +1328,10 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6, MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("commands_squashing_replies_bytes", "", - MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE, - &resp->body()); + AppendMetricWithoutLabels( + "commands_squashing_replies_bytes", "", + m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed), + MetricType::GAUGE, &resp->body()); string connections_libs; AppendMetricHeader("connections_libs", "Total number of connections by libname:ver", MetricType::GAUGE, &connections_libs); @@ -2481,7 +2482,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity); append("tls_bytes", m.tls_bytes); append("snapshot_serialization_bytes", m.serialization_bytes); - append("commands_squashing_replies_bytes", MultiCommandSquasher::GetRepliesMemSize()); + append("commands_squashing_replies_bytes", + m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed)); if (GetFlag(FLAGS_cache_mode)) { append("cache_mode", "cache");