diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 210daf66f..8477198f3 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -220,6 +220,11 @@ void DebugCmd::Run(CmdArgList args) { if (subcmd == "HELP") { string_view help_arr[] = { "DEBUG [ [value] [opt] ...]. Subcommands are:", + "EXEC", + " Show the descriptors of the MULTI/EXEC transactions that were processed by ", + " the server. For each EXEC/i descriptor, 'i' is the number of shards it touches. ", + " Each descriptor details the commands it contained followed by number of their ", + " arguments. Each descriptor is prefixed by its frequency count", "OBJECT ", " Show low-level info about `key` and associated value.", "LOAD ", @@ -300,6 +305,9 @@ void DebugCmd::Run(CmdArgList args) { return Shards(); } + if (subcmd == "EXEC") { + return Exec(); + } string reply = UnknownSubCmd(subcmd, "DEBUG"); return cntx_->SendError(reply, kSyntaxErrType); } @@ -562,6 +570,26 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, }); } +void DebugCmd::Exec() { + EngineShardSet& ess = *shard_set; + fb2::Mutex mu; + std::map freq_cnt; + + ess.pool()->Await([&](auto*) { + for (const auto& k_v : ServerState::tlocal()->exec_freq_count) { + unique_lock lk(mu); + freq_cnt[k_v.first] += k_v.second; + } + }); + + string res; + for (const auto& k_v : freq_cnt) { + StrAppend(&res, k_v.second, ":", k_v.first, "\n"); + } + auto* rb = static_cast(cntx_->reply_builder()); + rb->SendVerbatimString(res); +} + void DebugCmd::Inspect(string_view key) { EngineShardSet& ess = *shard_set; ShardId sid = Shard(key, ess.size()); diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index e7981d85c..d79377837 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -36,6 +36,7 @@ class DebugCmd { void Reload(CmdArgList args); void Replica(CmdArgList args); void Load(std::string_view filename); + void Exec(); void Inspect(std::string_view key); void Watched(); void TxAnalysis(); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a33da1a09..aeecfcdca 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -79,6 +79,8 @@ ABSL_FLAG(uint32_t, multi_exec_mode, 2, ABSL_FLAG(bool, multi_exec_squash, true, "Whether multi exec will squash single shard commands to optimize performance"); +ABSL_FLAG(bool, track_exec_frequencies, true, "Whether to track exec frequencies for multi exec"); + ABSL_FLAG(uint32_t, multi_eval_squash_buffer, 4_KB, "Max buffer for squashed commands per script"); ABSL_DECLARE_FLAG(bool, primary_port_http_enabled); @@ -700,6 +702,16 @@ optional DeduceExecMode(ExecEvalState state, return multi_mode; } +string CreateExecDescriptor(const std::vector& stored_cmds, unsigned num_uniq_shards) { + string result; + result.reserve(stored_cmds.size() * 10); + absl::StrAppend(&result, "EXEC/", num_uniq_shards, "\n"); + for (const auto& scmd : stored_cmds) { + absl::StrAppend(&result, " ", scmd.Cid()->name(), " ", scmd.NumArgs(), "\n"); + } + return result; +} + // Either take the interpreter from the preborrowed multi exec transaction or borrow one. struct BorrowedInterpreter { explicit BorrowedInterpreter(ConnectionContext* cntx) { @@ -2057,13 +2069,21 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { exec_info.state = ConnectionState::ExecInfo::EXEC_RUNNING; VLOG(1) << "StartExec " << exec_info.body.size(); + + // Make sure we flush whatever responses we aggregated in the reply builder. SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(exec_info.body.size()); + ServerState* ss = ServerState::tlocal(); if (state != ExecEvalState::NONE) exec_info.preborrowed_interpreter = ServerState::tlocal()->BorrowInterpreter(); if (!exec_info.body.empty()) { + if (GetFlag(FLAGS_track_exec_frequencies)) { + string descr = CreateExecDescriptor(exec_info.body, cntx->transaction->GetUniqueShardCnt()); + ss->exec_freq_count[descr]++; + } + if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this); } else { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 395b7d3e9..71c4bb8e6 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -12,6 +12,7 @@ namespace dfly { using namespace std; using namespace facade; +using namespace util; namespace { @@ -193,6 +194,10 @@ bool MultiCommandSquasher::ExecuteSquashed() { sd.replies.reserve(sd.cmds.size()); Transaction* tx = cntx_->transaction; + ServerState* ss = ServerState::tlocal(); + ss->stats.multi_squash_executions++; + ProactorBase* proactor = ProactorBase::me(); + uint64_t start = proactor->GetMonotonicTimeNs(); // Atomic transactions (that have all keys locked) perform hops and run squashed commands via // stubs, non-atomic ones just run the commands in parallel. @@ -206,6 +211,7 @@ bool MultiCommandSquasher::ExecuteSquashed() { [this](auto sid) { return !sharded_[sid].cmds.empty(); }); } + uint64_t after_hop = proactor->GetMonotonicTimeNs(); bool aborted = false; RedisReplyBuilder* rb = static_cast(cntx_->reply_builder()); @@ -221,6 +227,9 @@ bool MultiCommandSquasher::ExecuteSquashed() { if (aborted) break; } + uint64_t after_reply = proactor->GetMonotonicTimeNs(); + ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; + ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; for (auto& sinfo : sharded_) sinfo.cmds.clear(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5435cbfdc..9c2241636 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1732,6 +1732,9 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); append("tx_queue_len", m.tx_queue_len); + append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions); + append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec); + append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec); } if (should_enter("REPLICATION")) { diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 127520516..5b92ce911 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -48,7 +48,7 @@ auto ServerState::Stats::operator=(Stats&& other) -> Stats& { } ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 10 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 13 * 8, "Stats size mismatch"); for (int i = 0; i < NUM_TX_TYPES; ++i) { this->tx_type_cnt[i] += other.tx_type_cnt[i]; @@ -59,6 +59,10 @@ ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerSta this->eval_squashed_flushes += other.eval_squashed_flushes; this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt; + this->multi_squash_executions += other.multi_squash_executions; + this->multi_squash_exec_hop_usec += other.multi_squash_exec_hop_usec; + this->multi_squash_exec_reply_usec += other.multi_squash_exec_reply_usec; + if (this->tx_width_freq_arr == nullptr) { this->tx_width_freq_arr = new uint64_t[num_shards]; std::copy_n(other.tx_width_freq_arr, num_shards, this->tx_width_freq_arr); diff --git a/src/server/server_state.h b/src/server/server_state.h index 02714bc71..8cf639043 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -102,6 +102,9 @@ class ServerState { // public struct - to allow initialization. uint64_t eval_squashed_flushes = 0; uint64_t tx_schedule_cancel_cnt = 0; + uint64_t multi_squash_executions = 0; + uint64_t multi_squash_exec_hop_usec = 0; + uint64_t multi_squash_exec_reply_usec = 0; // Array of size of number of shards. // Each entry is how many transactions we had with this width (unique_shard_cnt). @@ -254,6 +257,9 @@ class ServerState { // public struct - to allow initialization. return slow_log_shard_; }; + // Exec descriptor frequency count for this thread. + absl::flat_hash_map exec_freq_count; + private: int64_t live_transactions_ = 0; SlowLogShard slow_log_shard_;