chore: pass max_squash_size for MultiCommandSquasher via option (#4960)

No functionality was changed. Also fix a build error on macos.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-04-20 12:14:23 +03:00 committed by GitHub
parent a42a17f868
commit 2ca5bf1192
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 24 additions and 26 deletions

View file

@ -1431,7 +1431,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
dfly_cntx->transaction = dist_trans.get();
size_t squashed_num = MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds),
static_cast<RedisReplyBuilder*>(builder),
dfly_cntx, this, true, false);
dfly_cntx, this, {.verify_commands = true});
dfly_cntx->transaction = nullptr;
dispatched += stored_cmds.size();
@ -1732,7 +1732,10 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
tx->MultiSwitchCmd(eval_cid);
CapturingReplyBuilder crb{ReplyMode::ONLY_ERR};
MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, true, true);
MultiCommandSquasher::Opts opts;
opts.verify_commands = true;
opts.error_abort = true;
MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, opts);
info->async_cmds_heap_mem = 0;
info->async_cmds.clear();
@ -2206,7 +2209,7 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) {
if (absl::GetFlag(FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN &&
!cntx->conn_state.tracking_info_.IsTrackingOn()) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this);
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this, {});
} else {
CmdArgVec arg_vec;
for (auto& scmd : exec_info.body) {

View file

@ -65,13 +65,8 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) {
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0;
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
Service* service, bool verify_commands, bool error_abort)
: cmds_{cmds},
cntx_{cntx},
service_{service},
base_cid_{nullptr},
verify_commands_{verify_commands},
error_abort_{error_abort} {
Service* service, const Opts& opts)
: cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr}, opts_{opts} {
auto mode = cntx->transaction->GetMultiMode();
base_cid_ = cntx->transaction->GetCId();
atomic_ = mode != Transaction::NON_ATOMIC;
@ -137,9 +132,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
num_squashed_++;
// Because the squashed hop is currently blocking, we cannot add more than the max channel size,
// otherwise a deadlock occurs.
bool need_flush = sinfo.cmds.size() >= kMaxSquashing - 1;
bool need_flush = sinfo.cmds.size() >= opts_.max_squash_size;
return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED;
}
@ -149,11 +142,11 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor
cmd->Fill(&tmp_keylist_);
auto args = absl::MakeSpan(tmp_keylist_);
if (verify_commands_) {
if (opts_.verify_commands) {
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
rb->SendError(std::move(*err));
rb->ConsumeLastError();
return !error_abort_;
return !opts_.error_abort;
}
}
@ -185,7 +178,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
auto args = absl::MakeSpan(arg_vec);
cmd->Fill(args);
if (verify_commands_) {
if (opts_.verify_commands) {
// The shared context is used for state verification, the local one is only for replies
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
crb.SendError(std::move(*err));
@ -265,7 +258,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
auto& replies = sharded_[idx].replies;
CHECK(!replies.empty());
aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back());
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back());
current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed);
CapturingReplyBuilder::Apply(std::move(replies.back()), rb);

View file

@ -22,10 +22,15 @@ namespace dfly {
// contains a non-atomic multi transaction to execute squashed commands.
class MultiCommandSquasher {
public:
struct Opts {
bool verify_commands = false; // Whether commands need to be verified before execution
bool error_abort = false; // Abort upon receiving error
unsigned max_squash_size = 32; // How many commands to squash at once
};
static size_t Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
ConnectionContext* cntx, Service* service, bool verify_commands = false,
bool error_abort = false) {
return MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb);
ConnectionContext* cntx, Service* service, const Opts& opts) {
return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb);
}
static size_t GetRepliesMemSize() {
@ -45,11 +50,9 @@ class MultiCommandSquasher {
enum class SquashResult { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR };
static constexpr int kMaxSquashing = 32;
private:
MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx, Service* Service,
bool verify_commands, bool error_abort);
const Opts& opts);
// Lazy initialize shard info.
ShardExecInfo& PrepareShardInfo(ShardId sid);
@ -79,8 +82,7 @@ class MultiCommandSquasher {
bool atomic_; // Whether working in any of the atomic modes
const CommandId* base_cid_; // underlying cid (exec or eval) for executing batch hops
bool verify_commands_ = false; // Whether commands need to be verified before execution
bool error_abort_ = false; // Abort upon receiving error
Opts opts_;
std::vector<ShardExecInfo> sharded_;
std::vector<ShardId> order_; // reply order for squashed cmds

View file

@ -343,7 +343,7 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
// we counter-balance CPU over-usage by forcing sleep.
// We measure running_cycles before the preemption points, because they reset the counter.
uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency()) / 2;
ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul)));
ThisFiber::SleepFor(chrono::microseconds(std::min<uint64_t>(sleep_usec, 2000ul)));
return serialized;
}