chore: remove global current_reply_size

Before - we used a global atomic var current_reply_size that was constantly updated by all threads
when reply buffers grew for squashing.

Now, we use per thread variables. The disadvantage - the metric is less precise because we first allocate buffers
in a shard thread but update the thread local metrics in the coordinator threads.
I think the tradeoff is fair and the fact that the metric is updated with delay is not crucial.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-05-05 12:03:49 +03:00
parent f7a40f66d6
commit 41508f241a
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
5 changed files with 18 additions and 18 deletions

View file

@ -62,8 +62,6 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) {
} // namespace
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0;
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
Service* service, const Opts& opts)
: cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr}, opts_{opts} {
@ -178,8 +176,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) {
crb.SendError(std::move(*err));
dispatched.reply = crb.Take();
current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed);
sinfo.total_reply_size += Size(dispatched.reply);
continue;
}
}
@ -191,7 +188,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx);
dispatched.reply = crb.Take();
current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed);
sinfo.total_reply_size += Size(dispatched.reply);
// Assert commands made no persistent state changes to stub context state
const auto& local_state = local_cntx.conn_state;
@ -246,6 +243,13 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
uint64_t after_hop = proactor->GetMonotonicTimeNs();
bool aborted = false;
ServerState* fresh_ss = ServerState::SafeTLocal();
size_t total_reply_size = 0;
for (auto& sinfo : sharded_) {
total_reply_size += sinfo.total_reply_size;
}
fresh_ss->stats.current_reply_size += total_reply_size;
for (auto idx : order_) {
auto& sinfo = sharded_[idx];
DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size());
@ -253,14 +257,14 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
auto& reply = sinfo.dispatched[sinfo.reply_id++].reply;
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply);
current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed);
CapturingReplyBuilder::Apply(std::move(reply), rb);
if (aborted)
break;
}
uint64_t after_reply = proactor->GetMonotonicTimeNs();
ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000;
ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000;
fresh_ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000;
fresh_ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000;
fresh_ss->stats.current_reply_size -= total_reply_size;
for (auto& sinfo : sharded_) {
sinfo.dispatched.clear();

View file

@ -34,10 +34,6 @@ class MultiCommandSquasher {
return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb);
}
static size_t GetRepliesMemSize() {
return current_reply_size_.load(std::memory_order_relaxed);
}
private:
// Per-shard execution info.
struct ShardExecInfo {
@ -50,6 +46,7 @@ class MultiCommandSquasher {
};
std::vector<Command> dispatched; // Dispatched commands
unsigned reply_id = 0;
size_t total_reply_size = 0; // Total size of replies
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
};
@ -94,9 +91,6 @@ class MultiCommandSquasher {
size_t num_shards_ = 0;
std::vector<MutableSlice> tmp_keylist_;
// we increase size in one thread and decrease in another
static atomic_uint64_t current_reply_size_;
};
} // namespace dfly

View file

@ -1329,7 +1329,7 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("commands_squashing_replies_bytes", "",
MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE,
m.coordinator_stats.current_reply_size, MetricType::GAUGE,
&resp->body());
string connections_libs;
AppendMetricHeader("connections_libs", "Total number of connections by libname:ver",
@ -2468,7 +2468,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity);
append("tls_bytes", m.tls_bytes);
append("snapshot_serialization_bytes", m.serialization_bytes);
append("commands_squashing_replies_bytes", MultiCommandSquasher::GetRepliesMemSize());
append("commands_squashing_replies_bytes", m.coordinator_stats.current_reply_size);
if (GetFlag(FLAGS_cache_mode)) {
append("cache_mode", "cache");

View file

@ -37,7 +37,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
}
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 20 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 21 * 8, "Stats size mismatch");
#define ADD(x) this->x += (other.x)
@ -64,6 +64,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
ADD(compressed_blobs);
ADD(oom_error_cmd_cnt);
ADD(current_reply_size);
ADD(conn_timeout_events);
if (this->tx_width_freq_arr.size() > 0) {
DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size());

View file

@ -132,6 +132,7 @@ class ServerState { // public struct - to allow initialization.
// Number of times we rejected command dispatch due to OOM condition.
uint64_t oom_error_cmd_cnt = 0;
size_t current_reply_size = 0;
uint32_t conn_timeout_events = 0;
std::valarray<uint64_t> tx_width_freq_arr;