diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index a8c5b9aae..cc6b27c26 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -157,6 +157,11 @@ void ConnectionContext::ChangeMonitor(bool start) { EnableMonitoring(start); } +void ConnectionContext::SwitchTxCmd(const CommandId* cid) { + transaction->MultiSwitchCmd(cid); + this->cid = cid; +} + void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args, facade::RedisReplyBuilder* rb) { vector result = ChangeSubscriptions(args, false, to_add, to_reply); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 92542ff06..634511c6f 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -301,6 +301,7 @@ class ConnectionContext : public facade::ConnectionContext { void UnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb); void PUnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb); void ChangeMonitor(bool start); // either start or stop monitor on a given connection + void SwitchTxCmd(const CommandId* cid); size_t UsedMemory() const override; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 38d3d8e81..9b4a457c8 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -1309,9 +1309,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat args_view.push_back(arg); } auto args_span = absl::MakeSpan(args_view); - - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; + local_cntx.SwitchTxCmd(cid); crb.SetReplyMode(ReplyMode::NONE); stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span); @@ -1332,8 +1330,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat args_view.push_back(arg); } auto args_span = absl::MakeSpan(args_view); - stub_tx->MultiSwitchCmd(cid); - local_cntx.cid = cid; + local_cntx.SwitchTxCmd(cid); crb.SetReplyMode(ReplyMode::NONE); stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span); sf_.service().InvokeCmd(cid, args_span, &crb, &local_cntx); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 1581bf775..0a96236c4 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -2220,8 +2220,7 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) { for (auto& scmd : exec_info.body) { VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs(); - cmd_cntx.tx->MultiSwitchCmd(scmd.Cid()); - cntx->cid = scmd.Cid(); + cntx->SwitchTxCmd(scmd.Cid()); arg_vec.resize(scmd.NumArgs()); scmd.Fill(&arg_vec); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index e52423e39..4f4fc9d5a 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -151,8 +151,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor } auto* tx = cntx_->transaction; - tx->MultiSwitchCmd(cmd->Cid()); - cntx_->cid = cmd->Cid(); + cntx_->SwitchTxCmd(cmd->Cid()); if (cmd->Cid()->IsTransactional()) tx->InitByArgs(cntx_->ns, cntx_->conn_state.db_index, args); @@ -189,8 +188,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v } } - local_tx->MultiSwitchCmd(cmd->Cid()); - local_cntx.cid = cmd->Cid(); + local_cntx.SwitchTxCmd(cmd->Cid()); crb.SetReplyMode(cmd->ReplyMode()); local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); @@ -205,7 +203,6 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v CheckConnStateClean(local_state); } - reverse(sinfo.replies.begin(), sinfo.replies.end()); return OpStatus::OK; } @@ -255,15 +252,15 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { bool aborted = false; for (auto idx : order_) { - auto& replies = sharded_[idx].replies; - CHECK(!replies.empty()); + auto& sinfo = sharded_[idx]; + auto& replies = sinfo.replies; + DCHECK_LT(sinfo.reply_id, replies.size()); - aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back()); - - current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); - CapturingReplyBuilder::Apply(std::move(replies.back()), rb); - replies.pop_back(); + auto& reply = replies[sinfo.reply_id++]; + aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply); + current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed); + CapturingReplyBuilder::Apply(std::move(reply), rb); if (aborted) break; } @@ -271,8 +268,11 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; - for (auto& sinfo : sharded_) + for (auto& sinfo : sharded_) { sinfo.cmds.clear(); + sinfo.replies.clear(); + sinfo.reply_id = 0; + } order_.clear(); return !aborted; diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index b54c7b2f5..ad736b1bd 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -45,6 +45,7 @@ class MultiCommandSquasher { std::vector cmds; // accumulated commands std::vector replies; + unsigned reply_id = 0; boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard };