diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 9d40d5912..de2632d2f 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "core/overloaded.h" #include "facade/dragonfly_connection.h" #include "server/cluster/cluster_utility.h" #include "server/command_registry.h" @@ -30,8 +31,40 @@ void CheckConnStateClean(const ConnectionState& state) { DCHECK(!state.subscribe_info); } +size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { + size_t payload_size = sizeof(facade::CapturingReplyBuilder::Payload); + return visit( + Overloaded{ + [&](monostate) { return payload_size; }, + [&](long) { return payload_size; }, + [&](double) { return payload_size; }, + [&](OpStatus) { return payload_size; }, + [&](CapturingReplyBuilder::Null) { return payload_size; }, + // ignore SSO because it's insignificant + [&](const CapturingReplyBuilder::SimpleString& data) { + return payload_size + data.size(); + }, + [&](const CapturingReplyBuilder::BulkString& data) { return payload_size + data.size(); }, + [&](const CapturingReplyBuilder::Error& data) { + return payload_size + data.first.size() + data.second.size(); + }, + [&](const unique_ptr& data) { + if (!data || (data->len == 0 && data->type == RedisReplyBuilder::ARRAY)) { + return payload_size; + } + for (const auto& pl : data->arr) { + payload_size += Size(pl); + } + return payload_size; + }, + }, + payload); +} + } // namespace +atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; + MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* service, bool verify_commands, bool error_abort) : cmds_{cmds}, @@ -159,6 +192,8 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); sinfo.replies.emplace_back(crb.Take()); + current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed); + continue; } } @@ -171,6 +206,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx); sinfo.replies.emplace_back(crb.Take()); + current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed); // Assert commands made no persistent state changes to stub context state const auto& local_state = local_cntx.conn_state; @@ -238,6 +274,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back()); CapturingReplyBuilder::Apply(std::move(replies.back()), rb); + current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); replies.pop_back(); if (aborted) diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 889248818..ccea7f7a2 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -28,6 +28,10 @@ class MultiCommandSquasher { return MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb); } + static size_t GetRepliesMemSize() { + return current_reply_size_.load(std::memory_order_relaxed); + } + private: // Per-shard execution info. struct ShardExecInfo { @@ -85,6 +89,9 @@ class MultiCommandSquasher { size_t num_shards_ = 0; std::vector tmp_keylist_; + + // we increase size in one thread and decrease in another + static atomic_uint64_t current_reply_size_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b5493cdac..2ffcf978f 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -50,6 +50,7 @@ extern "C" { #include "server/journal/journal.h" #include "server/main_service.h" #include "server/memory_cmd.h" +#include "server/multi_command_squasher.h" #include "server/protocol_client.h" #include "server/rdb_load.h" #include "server/rdb_save.h" @@ -2314,6 +2315,7 @@ void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity); append("tls_bytes", m.tls_bytes); append("snapshot_serialization_bytes", m.serialization_bytes); + append("commands_squashing_replies_bytes", MultiCommandSquasher::GetRepliesMemSize()); if (GetFlag(FLAGS_cache_mode)) { append("cache_mode", "cache");