mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix server: fix stats of pipeline squashed commands (#4132)
Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
a887d822a9
commit
db67b35f8e
3 changed files with 12 additions and 11 deletions
|
@ -1407,13 +1407,13 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
|
||||||
}
|
}
|
||||||
|
|
||||||
dfly_cntx->transaction = dist_trans.get();
|
dfly_cntx->transaction = dist_trans.get();
|
||||||
MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds),
|
size_t squashed_num = MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds),
|
||||||
static_cast<RedisReplyBuilder*>(builder), dfly_cntx, this, true,
|
static_cast<RedisReplyBuilder*>(builder),
|
||||||
false);
|
dfly_cntx, this, true, false);
|
||||||
dfly_cntx->transaction = nullptr;
|
dfly_cntx->transaction = nullptr;
|
||||||
|
|
||||||
dispatched += stored_cmds.size();
|
dispatched += stored_cmds.size();
|
||||||
ss->stats.squashed_commands += stored_cmds.size();
|
ss->stats.squashed_commands += squashed_num;
|
||||||
stored_cmds.clear();
|
stored_cmds.clear();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -254,7 +254,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
|
||||||
return !aborted;
|
return !aborted;
|
||||||
}
|
}
|
||||||
|
|
||||||
void MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
|
size_t MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
|
||||||
DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction "
|
DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction "
|
||||||
<< cntx_->transaction->DebugId();
|
<< cntx_->transaction->DebugId();
|
||||||
|
|
||||||
|
@ -291,6 +291,7 @@ void MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
|
||||||
|
|
||||||
VLOG(1) << "Squashed " << num_squashed_ << " of " << cmds_.size()
|
VLOG(1) << "Squashed " << num_squashed_ << " of " << cmds_.size()
|
||||||
<< " commands, max fanout: " << num_shards_ << ", atomic: " << atomic_;
|
<< " commands, max fanout: " << num_shards_ << ", atomic: " << atomic_;
|
||||||
|
return num_squashed_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool MultiCommandSquasher::IsAtomic() const {
|
bool MultiCommandSquasher::IsAtomic() const {
|
||||||
|
|
|
@ -22,10 +22,10 @@ namespace dfly {
|
||||||
// contains a non-atomic multi transaction to execute squashed commands.
|
// contains a non-atomic multi transaction to execute squashed commands.
|
||||||
class MultiCommandSquasher {
|
class MultiCommandSquasher {
|
||||||
public:
|
public:
|
||||||
static void Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
|
static size_t Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
|
||||||
ConnectionContext* cntx, Service* service, bool verify_commands = false,
|
ConnectionContext* cntx, Service* service, bool verify_commands = false,
|
||||||
bool error_abort = false) {
|
bool error_abort = false) {
|
||||||
MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb);
|
return MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb);
|
||||||
}
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -62,8 +62,8 @@ class MultiCommandSquasher {
|
||||||
// Execute all currently squashed commands. Return false if aborting on error.
|
// Execute all currently squashed commands. Return false if aborting on error.
|
||||||
bool ExecuteSquashed(facade::RedisReplyBuilder* rb);
|
bool ExecuteSquashed(facade::RedisReplyBuilder* rb);
|
||||||
|
|
||||||
// Run all commands until completion.
|
// Run all commands until completion. Returns number of squashed commands.
|
||||||
void Run(facade::RedisReplyBuilder* rb);
|
size_t Run(facade::RedisReplyBuilder* rb);
|
||||||
|
|
||||||
bool IsAtomic() const;
|
bool IsAtomic() const;
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue