chore: track squash reply allocation as early as they are created

Before - we used a global atomic var current_reply_size that was constantly updated by all threads
when reply buffers grew for squashing.

Now, we use per thread atomic variables that track reply buffer size for the I/O thread that issues squashing.
The shard threads contend less because they update multiple atomic variables. Moreover,
now we can adjust IsPipelineBufferOverLimit to take into account squashing_current_reply_size as well.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-05-05 14:43:09 +03:00
parent 41508f241a
commit 23ef7b72e5
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
7 changed files with 64 additions and 22 deletions

View file

@ -44,8 +44,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
return *this;
}
ReplyStats::ReplyStats(ReplyStats&& other) noexcept {
*this = other;
}
ReplyStats& ReplyStats::operator+=(const ReplyStats& o) {
static_assert(sizeof(ReplyStats) == 72u + kSanitizerOverhead);
static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead);
ADD(io_write_cnt);
ADD(io_write_bytes);
@ -56,12 +60,30 @@ ReplyStats& ReplyStats::operator+=(const ReplyStats& o) {
ADD(script_error_count);
send_stats += o.send_stats;
squashing_current_reply_size.fetch_add(o.squashing_current_reply_size.load(memory_order_relaxed),
memory_order_relaxed);
return *this;
}
#undef ADD
ReplyStats& ReplyStats::operator=(const ReplyStats& o) {
static_assert(sizeof(ReplyStats) == 80u + kSanitizerOverhead);
if (this == &o) {
return *this;
}
send_stats = o.send_stats;
io_write_cnt = o.io_write_cnt;
io_write_bytes = o.io_write_bytes;
err_count = o.err_count;
script_error_count = o.script_error_count;
squashing_current_reply_size.store(o.squashing_current_reply_size.load(memory_order_relaxed),
memory_order_relaxed);
return *this;
}
string WrongNumArgsError(string_view cmd) {
return absl::StrCat("wrong number of arguments for '", absl::AsciiStrToLower(cmd), "' command");
}

View file

@ -140,7 +140,13 @@ struct ReplyStats {
absl::flat_hash_map<std::string, uint64_t> err_count;
size_t script_error_count = 0;
// This variable can be updated directly from shard threads when they allocate memory for replies.
std::atomic<size_t> squashing_current_reply_size{0};
ReplyStats() = default;
ReplyStats(ReplyStats&& other) noexcept;
ReplyStats& operator+=(const ReplyStats& other);
ReplyStats& operator=(const ReplyStats& other);
};
struct FacadeStats {

View file

@ -30,8 +30,8 @@ void CheckConnStateClean(const ConnectionState& state) {
DCHECK(!state.subscribe_info);
}
size_t Size(const facade::CapturingReplyBuilder::Payload& payload) {
size_t payload_size = sizeof(facade::CapturingReplyBuilder::Payload);
size_t Size(const CapturingReplyBuilder::Payload& payload) {
size_t payload_size = sizeof(CapturingReplyBuilder::Payload);
return visit(
Overloaded{
[&](monostate) { return payload_size; },
@ -71,8 +71,12 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, Connectio
}
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
if (sharded_.empty())
if (sharded_.empty()) {
sharded_.resize(shard_set->size());
for (size_t i = 0; i < sharded_.size(); i++) {
sharded_[i].reply_size_total_ptr = &tl_facade_stats->reply_stats.squashing_current_reply_size;
}
}
auto& sinfo = sharded_[sid];
if (!sinfo.local_tx) {
@ -133,7 +137,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredC
return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED;
}
bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) {
bool MultiCommandSquasher::ExecuteStandalone(RedisReplyBuilder* rb, const StoredCmd* cmd) {
DCHECK(order_.empty()); // check no squashed chain is interrupted
auto args = cmd->ArgList(&tmp_keylist_);
@ -161,7 +165,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
DCHECK(!sinfo.dispatched.empty());
auto* local_tx = sinfo.local_tx.get();
facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v);
CapturingReplyBuilder crb(ReplyMode::FULL, resp_v);
ConnectionContext local_cntx{cntx_, local_tx};
if (cntx_->conn()) {
local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged();
@ -169,14 +173,21 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
CmdArgVec arg_vec;
auto move_reply = [&sinfo](CapturingReplyBuilder::Payload&& src,
CapturingReplyBuilder::Payload* dst) {
*dst = std::move(src);
size_t sz = Size(*dst);
sinfo.reply_size_delta += sz;
sinfo.reply_size_total_ptr->fetch_add(sz, std::memory_order_relaxed);
};
for (auto& dispatched : sinfo.dispatched) {
auto args = dispatched.cmd->ArgList(&arg_vec);
if (opts_.verify_commands) {
// The shared context is used for state verification, the local one is only for replies
if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) {
crb.SendError(std::move(*err));
dispatched.reply = crb.Take();
sinfo.total_reply_size += Size(dispatched.reply);
move_reply(crb.Take(), &dispatched.reply);
continue;
}
}
@ -187,8 +198,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args);
service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx);
dispatched.reply = crb.Take();
sinfo.total_reply_size += Size(dispatched.reply);
move_reply(crb.Take(), &dispatched.reply);
// Assert commands made no persistent state changes to stub context state
const auto& local_state = local_cntx.conn_state;
@ -247,9 +257,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
size_t total_reply_size = 0;
for (auto& sinfo : sharded_) {
total_reply_size += sinfo.total_reply_size;
total_reply_size += sinfo.reply_size_delta;
}
fresh_ss->stats.current_reply_size += total_reply_size;
for (auto idx : order_) {
auto& sinfo = sharded_[idx];
DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size());
@ -264,7 +274,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
uint64_t after_reply = proactor->GetMonotonicTimeNs();
fresh_ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000;
fresh_ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000;
fresh_ss->stats.current_reply_size -= total_reply_size;
tl_facade_stats->reply_stats.squashing_current_reply_size.fetch_sub(total_reply_size,
std::memory_order_release);
for (auto& sinfo : sharded_) {
sinfo.dispatched.clear();

View file

@ -46,7 +46,9 @@ class MultiCommandSquasher {
};
std::vector<Command> dispatched; // Dispatched commands
unsigned reply_id = 0;
size_t total_reply_size = 0; // Total size of replies
std::atomic<size_t>* reply_size_total_ptr; // Total size of replies on the IO thread
size_t reply_size_delta = 0; // Size of replies for this shard
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
};

View file

@ -1328,9 +1328,10 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("commands_squashing_replies_bytes", "",
m.coordinator_stats.current_reply_size, MetricType::GAUGE,
&resp->body());
AppendMetricWithoutLabels(
"commands_squashing_replies_bytes", "",
m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed),
MetricType::GAUGE, &resp->body());
string connections_libs;
AppendMetricHeader("connections_libs", "Total number of connections by libname:ver",
MetricType::GAUGE, &connections_libs);
@ -2468,7 +2469,8 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity);
append("tls_bytes", m.tls_bytes);
append("snapshot_serialization_bytes", m.serialization_bytes);
append("commands_squashing_replies_bytes", m.coordinator_stats.current_reply_size);
append("commands_squashing_replies_bytes",
m.facade_stats.reply_stats.squashing_current_reply_size.load(memory_order_relaxed));
if (GetFlag(FLAGS_cache_mode)) {
append("cache_mode", "cache");

View file

@ -37,7 +37,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
}
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 21 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 20 * 8, "Stats size mismatch");
#define ADD(x) this->x += (other.x)
@ -64,7 +64,6 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
ADD(compressed_blobs);
ADD(oom_error_cmd_cnt);
ADD(current_reply_size);
ADD(conn_timeout_events);
if (this->tx_width_freq_arr.size() > 0) {
DCHECK_EQ(this->tx_width_freq_arr.size(), other.tx_width_freq_arr.size());

View file

@ -132,7 +132,6 @@ class ServerState { // public struct - to allow initialization.
// Number of times we rejected command dispatch due to OOM condition.
uint64_t oom_error_cmd_cnt = 0;
size_t current_reply_size = 0;
uint32_t conn_timeout_events = 0;
std::valarray<uint64_t> tx_width_freq_arr;