From 0f415acb8138b7f6a08fbbafe40d3934f18cf404 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 28 Apr 2025 10:55:14 +0300 Subject: [PATCH] chore: StoredCmd to support both owned and external arguments (#5010) Before: StoredCmd always copied the backing buffer of the commands. this of course sub-optimal if the bucking buffer exists during the life-time of StoredCmd. This is exactly the case in `Service::DispatchManyCommands`. This PR: 1. Adds support for both owned and non-owned arguments. 2. Improves the interfaces around StoredCmd and removes some code duplication. Signed-off-by: Roman Gershman --- src/server/conn_context.cc | 66 +++++++++++++++++----------- src/server/conn_context.h | 38 ++++++++-------- src/server/main_service.cc | 39 +++++++--------- src/server/multi_command_squasher.cc | 18 +++----- src/server/multi_command_squasher.h | 13 +++--- 5 files changed, 90 insertions(+), 84 deletions(-) diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index cc6b27c26..4cd9b0e6e 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -32,51 +32,62 @@ static void SendSubscriptionChangedResponse(string_view action, std::optionalSendLong(count); } -StoredCmd::StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode) - : cid_{cid}, buffer_{}, sizes_(args.size()), reply_mode_{mode} { +StoredCmd::StoredCmd(const CommandId* cid, bool own_args, ArgSlice args) + : cid_{cid}, args_{args}, reply_mode_{facade::ReplyMode::FULL} { + if (!own_args) + return; + + auto& own_storage = args_.emplace(args.size()); size_t total_size = 0; for (auto args : args) { total_size += args.size(); } - - buffer_.resize(total_size); - char* next = buffer_.data(); + own_storage.buffer.resize(total_size); + char* next = own_storage.buffer.data(); for (unsigned i = 0; i < args.size(); i++) { if (args[i].size() > 0) memcpy(next, args[i].data(), args[i].size()); next += args[i].size(); - sizes_[i] = args[i].size(); + own_storage.sizes[i] = args[i].size(); } } StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, ArgSlice args, facade::ReplyMode mode) - : cid_{cid}, buffer_{std::move(buffer)}, sizes_(args.size()), reply_mode_{mode} { + : cid_{cid}, args_{OwnStorage{args.size()}}, reply_mode_{mode} { + OwnStorage& own_storage = std::get(args_); + own_storage.buffer = std::move(buffer); + for (unsigned i = 0; i < args.size(); i++) { // Assume tightly packed list. DCHECK(i + 1 == args.size() || args[i].data() + args[i].size() == args[i + 1].data()); - sizes_[i] = args[i].size(); + own_storage.sizes[i] = args[i].size(); } } -void StoredCmd::Fill(absl::Span args) { - DCHECK_GE(args.size(), sizes_.size()); - - unsigned offset = 0; - for (unsigned i = 0; i < sizes_.size(); i++) { - args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]}; - offset += sizes_[i]; - } -} - -size_t StoredCmd::NumArgs() const { - return sizes_.size(); +CmdArgList StoredCmd::ArgList(CmdArgVec* scratch) const { + return std::visit( + Overloaded{[&](const OwnStorage& s) { + unsigned offset = 0; + scratch->resize(s.sizes.size()); + for (unsigned i = 0; i < s.sizes.size(); i++) { + (*scratch)[i] = string_view{s.buffer.data() + offset, s.sizes[i]}; + offset += s.sizes[i]; + } + return CmdArgList{*scratch}; + }, + [&](const CmdArgList& s) { return s; }}, + args_); } std::string StoredCmd::FirstArg() const { - if (sizes_.size() == 0) { + if (NumArgs() == 0) { return {}; } - return buffer_.substr(0, sizes_[0]); + return std::visit(Overloaded{[&](const OwnStorage& s) { return s.buffer.substr(0, s.sizes[0]); }, + [&](const ArgSlice& s) { + return std::string{s[0].data(), s[0].size()}; + }}, + args_); } facade::ReplyMode StoredCmd::ReplyMode() const { @@ -91,9 +102,14 @@ template size_t IsStoredInlined(const C& c) { } size_t StoredCmd::UsedMemory() const { - size_t buffer_size = IsStoredInlined(buffer_) ? 0 : buffer_.size(); - size_t sz_size = IsStoredInlined(sizes_) ? 0 : sizes_.size() * sizeof(uint32_t); - return buffer_size + sz_size; + return std::visit(Overloaded{[&](const OwnStorage& s) { + size_t buffer_size = + IsStoredInlined(s.buffer) ? 0 : s.buffer.capacity(); + size_t sz_size = IsStoredInlined(s.sizes) ? 0 : s.sizes.memsize(); + return buffer_size + sz_size; + }, + [&](const ArgSlice&) -> size_t { return 0U; }}, + args_); } const CommandId* StoredCmd::Cid() const { diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 634511c6f..aabcab5cf 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -8,6 +8,7 @@ #include #include "acl/acl_commands_def.h" +#include "core/overloaded.h" #include "facade/acl_commands_def.h" #include "facade/conn_context.h" #include "facade/reply_capture.h" @@ -27,25 +28,20 @@ struct FlowInfo; // Used for storing MULTI/EXEC commands. class StoredCmd { public: - StoredCmd(const CommandId* cid, ArgSlice args, facade::ReplyMode mode = facade::ReplyMode::FULL); + StoredCmd(const CommandId* cid, bool own_args, CmdArgList args); // Create on top of already filled tightly-packed buffer. - StoredCmd(std::string&& buffer, const CommandId* cid, ArgSlice args, - facade::ReplyMode mode = facade::ReplyMode::FULL); + StoredCmd(std::string&& buffer, const CommandId* cid, CmdArgList args, facade::ReplyMode mode); - size_t NumArgs() const; - - size_t UsedMemory() const; - - // Fill the arg list with stored arguments, it should be at least of size NumArgs(). - // Between filling and invocation, cmd should NOT be moved. - void Fill(absl::Span args); - - void Fill(CmdArgVec* dest) { - dest->resize(sizes_.size()); - Fill(absl::MakeSpan(*dest)); + size_t NumArgs() const { + return std::visit(Overloaded{// + [](const OwnStorage& s) { return s.sizes.size(); }, + [](const CmdArgList& s) { return s.size(); }}, + args_); } + size_t UsedMemory() const; + facade::CmdArgList ArgList(CmdArgVec* scratch) const; std::string FirstArg() const; const CommandId* Cid() const; @@ -53,10 +49,16 @@ class StoredCmd { facade::ReplyMode ReplyMode() const; private: - const CommandId* cid_; // underlying command - std::string buffer_; // underlying buffer - absl::FixedArray sizes_; // sizes of arg part - facade::ReplyMode reply_mode_; // reply mode + const CommandId* cid_; // underlying command + struct OwnStorage { + std::string buffer; // underlying buffer + absl::FixedArray sizes; // sizes of arg part + explicit OwnStorage(size_t sz) : sizes(sz) { + } + }; + + std::variant args_; // args storage + facade::ReplyMode reply_mode_; // reply mode }; struct ConnectionState { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 0a96236c4..20efa1a4a 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -634,9 +634,8 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state, // We can only tell if eval is transactional based on they keycount if (absl::StartsWith(scmd.Cid()->name(), "EVAL")) { CmdArgVec arg_vec{}; - StoredCmd cmd = scmd; - cmd.Fill(&arg_vec); - auto keys = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec)); + auto args = scmd.ArgList(&arg_vec); + auto keys = DetermineKeys(scmd.Cid(), args); transactional |= (keys && keys.value().NumArgs() > 0); } else { transactional |= scmd.Cid()->IsTransactional(); @@ -1200,9 +1199,8 @@ void Service::DispatchCommand(ArgSlice args, SinkReplyBuilder* builder, bool is_trans_cmd = CO::IsTransKind(cid->name()); if (dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd) { // TODO: protect against aggregating huge transactions. - StoredCmd stored_cmd{cid, args_no_cmd}; - dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd)); - if (stored_cmd.Cid()->IsWriteOnly()) { + dfly_cntx->conn_state.exec_info.body.emplace_back(cid, true, args_no_cmd); + if (cid->IsWriteOnly()) { dfly_cntx->conn_state.exec_info.is_write = true; } return builder->SendSimpleString("QUEUED"); @@ -1412,11 +1410,14 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply DCHECK(!dfly_cntx->conn_state.exec_info.IsRunning()); DCHECK_EQ(builder->GetProtocol(), Protocol::REDIS); + auto* ss = dfly::ServerState::tlocal(); + // Don't even start when paused. We can only continue if DispatchTracker is aware of us running. + if (ss->IsPaused()) + return 0; + vector stored_cmds; intrusive_ptr dist_trans; - size_t dispatched = 0; - auto* ss = dfly::ServerState::tlocal(); auto perform_squash = [&] { if (stored_cmds.empty()) @@ -1445,10 +1446,6 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply stored_cmds.clear(); }; - // Don't even start when paused. We can only continue if DispatchTracker is aware of us running. - if (ss->IsPaused()) - return 0; - for (auto args : args_list) { string cmd = absl::AsciiStrToUpper(ArgS(args, 0)); const auto [cid, tail_args] = registry_.FindExtended(cmd, args.subspan(1)); @@ -1468,7 +1465,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply if (!is_multi && !is_eval && !is_blocking && cid != nullptr) { stored_cmds.reserve(args_list.size()); - stored_cmds.emplace_back(cid, tail_args); + stored_cmds.emplace_back(cid, false /* do not deep-copy commands*/, tail_args); continue; } @@ -2103,19 +2100,18 @@ bool IsWatchingOtherDbs(DbIndex db_indx, const ConnectionState::ExecInfo& exec_i [db_indx](const auto& pair) { return pair.first != db_indx; }); } -template void IterateAllKeys(ConnectionState::ExecInfo* exec_info, F&& f) { +template void IterateAllKeys(const ConnectionState::ExecInfo* exec_info, F&& f) { for (auto& [dbid, key] : exec_info->watched_keys) f(MutableSlice{key.data(), key.size()}); CmdArgVec arg_vec{}; - for (auto& scmd : exec_info->body) { + for (const auto& scmd : exec_info->body) { if (!scmd.Cid()->IsTransactional()) continue; - scmd.Fill(&arg_vec); - - auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec)); + auto args = scmd.ArgList(&arg_vec); + auto key_res = DetermineKeys(scmd.Cid(), args); if (!key_res.ok()) continue; @@ -2217,15 +2213,12 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this, opts); } else { CmdArgVec arg_vec; - for (auto& scmd : exec_info.body) { + for (const auto& scmd : exec_info.body) { VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs(); cntx->SwitchTxCmd(scmd.Cid()); - arg_vec.resize(scmd.NumArgs()); - scmd.Fill(&arg_vec); - - CmdArgList args = absl::MakeSpan(arg_vec); + CmdArgList args = scmd.ArgList(&arg_vec); if (scmd.Cid()->IsTransactional()) { OpStatus st = cmd_cntx.tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 4f4fc9d5a..f6bd2a596 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -92,7 +92,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar return sinfo; } -MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cmd) { +MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredCmd* cmd) { DCHECK(cmd->Cid()); if (!cmd->Cid()->IsTransactional() || (cmd->Cid()->opt_mask() & CO::BLOCKING) || @@ -103,8 +103,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; } - cmd->Fill(&tmp_keylist_); - auto args = absl::MakeSpan(tmp_keylist_); + auto args = cmd->ArgList(&tmp_keylist_); if (args.empty()) return SquashResult::NOT_SQUASHED; @@ -136,11 +135,10 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } -bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd) { +bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted - cmd->Fill(&tmp_keylist_); - auto args = absl::MakeSpan(tmp_keylist_); + auto args = cmd->ArgList(&tmp_keylist_); if (opts_.verify_commands) { if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { @@ -170,13 +168,11 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v if (cntx_->conn()) { local_cntx.skip_acl_validation = cntx_->conn()->IsPrivileged(); } - absl::InlinedVector arg_vec; - for (auto* cmd : sinfo.cmds) { - arg_vec.resize(cmd->NumArgs()); - auto args = absl::MakeSpan(arg_vec); - cmd->Fill(args); + CmdArgVec arg_vec; + for (const auto* cmd : sinfo.cmds) { + auto args = cmd->ArgList(&arg_vec); if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index ad736b1bd..657ecef1e 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -28,6 +28,7 @@ class MultiCommandSquasher { unsigned max_squash_size = 32; // How many commands to squash at once }; + // Returns number of processed commands. static size_t Execute(absl::Span cmds, facade::RedisReplyBuilder* rb, ConnectionContext* cntx, Service* service, const Opts& opts) { return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb); @@ -40,10 +41,10 @@ class MultiCommandSquasher { private: // Per-shard execution info. struct ShardExecInfo { - ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} { + ShardExecInfo() : local_tx{nullptr} { } - std::vector cmds; // accumulated commands + std::vector cmds; // accumulated commands std::vector replies; unsigned reply_id = 0; boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard @@ -51,7 +52,6 @@ class MultiCommandSquasher { enum class SquashResult { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR }; - private: MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* Service, const Opts& opts); @@ -59,10 +59,10 @@ class MultiCommandSquasher { ShardExecInfo& PrepareShardInfo(ShardId sid); // Retrun squash flags - SquashResult TrySquash(StoredCmd* cmd); + SquashResult TrySquash(const StoredCmd* cmd); // Execute separate non-squashed cmd. Return false if aborting on error. - bool ExecuteStandalone(facade::RedisReplyBuilder* rb, StoredCmd* cmd); + bool ExecuteStandalone(facade::RedisReplyBuilder* rb, const StoredCmd* cmd); // Callback that runs on shards during squashed hop. facade::OpStatus SquashedHopCb(EngineShard* es, facade::RespVersion resp_v); @@ -70,12 +70,11 @@ class MultiCommandSquasher { // Execute all currently squashed commands. Return false if aborting on error. bool ExecuteSquashed(facade::RedisReplyBuilder* rb); - // Run all commands until completion. Returns number of squashed commands. + // Run all commands until completion. Returns number of processed commands. size_t Run(facade::RedisReplyBuilder* rb); bool IsAtomic() const; - private: absl::Span cmds_; // Input range of stored commands ConnectionContext* cntx_; // Underlying context Service* service_;