chore: clean ups around command squasher (#5011)

1. Eliminate replies reverse call - will allow to unite replies and cmd vectors into one.
2. Introduce SwitchTxCmd function to avoid code duplication.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-04-28 10:44:42 +03:00 committed by GitHub
parent d37318711c
commit d7a7591a46
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 23 additions and 20 deletions

View file

@ -157,6 +157,11 @@ void ConnectionContext::ChangeMonitor(bool start) {
EnableMonitoring(start);
}
void ConnectionContext::SwitchTxCmd(const CommandId* cid) {
transaction->MultiSwitchCmd(cid);
this->cid = cid;
}
void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args,
facade::RedisReplyBuilder* rb) {
vector<unsigned> result = ChangeSubscriptions(args, false, to_add, to_reply);

View file

@ -301,6 +301,7 @@ class ConnectionContext : public facade::ConnectionContext {
void UnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb);
void PUnsubscribeAll(bool to_reply, facade::RedisReplyBuilder* rb);
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
void SwitchTxCmd(const CommandId* cid);
size_t UsedMemory() const override;

View file

@ -1309,9 +1309,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
local_cntx.SwitchTxCmd(cid);
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);
@ -1332,8 +1330,7 @@ void DebugCmd::DoPopulateBatch(const PopulateOptions& options, const PopulateBat
args_view.push_back(arg);
}
auto args_span = absl::MakeSpan(args_view);
stub_tx->MultiSwitchCmd(cid);
local_cntx.cid = cid;
local_cntx.SwitchTxCmd(cid);
crb.SetReplyMode(ReplyMode::NONE);
stub_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args_span);
sf_.service().InvokeCmd(cid, args_span, &crb, &local_cntx);

View file

@ -2220,8 +2220,7 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
cmd_cntx.tx->MultiSwitchCmd(scmd.Cid());
cntx->cid = scmd.Cid();
cntx->SwitchTxCmd(scmd.Cid());
arg_vec.resize(scmd.NumArgs());
scmd.Fill(&arg_vec);

View file

@ -151,8 +151,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor
}
auto* tx = cntx_->transaction;
tx->MultiSwitchCmd(cmd->Cid());
cntx_->cid = cmd->Cid();
cntx_->SwitchTxCmd(cmd->Cid());
if (cmd->Cid()->IsTransactional())
tx->InitByArgs(cntx_->ns, cntx_->conn_state.db_index, args);
@ -189,8 +188,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
}
}
local_tx->MultiSwitchCmd(cmd->Cid());
local_cntx.cid = cmd->Cid();
local_cntx.SwitchTxCmd(cmd->Cid());
crb.SetReplyMode(cmd->ReplyMode());
local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args);
@ -205,7 +203,6 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
CheckConnStateClean(local_state);
}
reverse(sinfo.replies.begin(), sinfo.replies.end());
return OpStatus::OK;
}
@ -255,15 +252,15 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
bool aborted = false;
for (auto idx : order_) {
auto& replies = sharded_[idx].replies;
CHECK(!replies.empty());
auto& sinfo = sharded_[idx];
auto& replies = sinfo.replies;
DCHECK_LT(sinfo.reply_id, replies.size());
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back());
current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed);
CapturingReplyBuilder::Apply(std::move(replies.back()), rb);
replies.pop_back();
auto& reply = replies[sinfo.reply_id++];
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply);
current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed);
CapturingReplyBuilder::Apply(std::move(reply), rb);
if (aborted)
break;
}
@ -271,8 +268,11 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000;
ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000;
for (auto& sinfo : sharded_)
for (auto& sinfo : sharded_) {
sinfo.cmds.clear();
sinfo.replies.clear();
sinfo.reply_id = 0;
}
order_.clear();
return !aborted;

View file

@ -45,6 +45,7 @@ class MultiCommandSquasher {
std::vector<StoredCmd*> cmds; // accumulated commands
std::vector<facade::CapturingReplyBuilder::Payload> replies;
unsigned reply_id = 0;
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
};