diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b267329e6..60509d6fc 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1431,7 +1431,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply dfly_cntx->transaction = dist_trans.get(); size_t squashed_num = MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), static_cast(builder), - dfly_cntx, this, true, false); + dfly_cntx, this, {.verify_commands = true}); dfly_cntx->transaction = nullptr; dispatched += stored_cmds.size(); @@ -1732,7 +1732,10 @@ optional Service::FlushEvalAsyncCmds(ConnectionC tx->MultiSwitchCmd(eval_cid); CapturingReplyBuilder crb{ReplyMode::ONLY_ERR}; - MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, true, true); + MultiCommandSquasher::Opts opts; + opts.verify_commands = true; + opts.error_abort = true; + MultiCommandSquasher::Execute(absl::MakeSpan(info->async_cmds), &crb, cntx, this, opts); info->async_cmds_heap_mem = 0; info->async_cmds.clear(); @@ -2206,7 +2209,7 @@ void Service::Exec(CmdArgList args, const CommandContext& cmd_cntx) { if (absl::GetFlag(FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN && !cntx->conn_state.tracking_info_.IsTrackingOn()) { - MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this); + MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this, {}); } else { CmdArgVec arg_vec; for (auto& scmd : exec_info.body) { diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 5e4a013c7..4279793ab 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -65,13 +65,8 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, - Service* service, bool verify_commands, bool error_abort) - : cmds_{cmds}, - cntx_{cntx}, - service_{service}, - base_cid_{nullptr}, - verify_commands_{verify_commands}, - error_abort_{error_abort} { + Service* service, const Opts& opts) + : cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr}, opts_{opts} { auto mode = cntx->transaction->GetMultiMode(); base_cid_ = cntx->transaction->GetCId(); atomic_ = mode != Transaction::NON_ATOMIC; @@ -137,9 +132,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm num_squashed_++; - // Because the squashed hop is currently blocking, we cannot add more than the max channel size, - // otherwise a deadlock occurs. - bool need_flush = sinfo.cmds.size() >= kMaxSquashing - 1; + bool need_flush = sinfo.cmds.size() >= opts_.max_squash_size; return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } @@ -149,11 +142,11 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, Stor cmd->Fill(&tmp_keylist_); auto args = absl::MakeSpan(tmp_keylist_); - if (verify_commands_) { + if (opts_.verify_commands) { if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { rb->SendError(std::move(*err)); rb->ConsumeLastError(); - return !error_abort_; + return !opts_.error_abort; } } @@ -185,7 +178,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v auto args = absl::MakeSpan(arg_vec); cmd->Fill(args); - if (verify_commands_) { + if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); @@ -265,7 +258,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { auto& replies = sharded_[idx].replies; CHECK(!replies.empty()); - aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back()); + aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(replies.back()); current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); CapturingReplyBuilder::Apply(std::move(replies.back()), rb); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index dc4158dd0..b54c7b2f5 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -22,10 +22,15 @@ namespace dfly { // contains a non-atomic multi transaction to execute squashed commands. class MultiCommandSquasher { public: + struct Opts { + bool verify_commands = false; // Whether commands need to be verified before execution + bool error_abort = false; // Abort upon receiving error + unsigned max_squash_size = 32; // How many commands to squash at once + }; + static size_t Execute(absl::Span cmds, facade::RedisReplyBuilder* rb, - ConnectionContext* cntx, Service* service, bool verify_commands = false, - bool error_abort = false) { - return MultiCommandSquasher{cmds, cntx, service, verify_commands, error_abort}.Run(rb); + ConnectionContext* cntx, Service* service, const Opts& opts) { + return MultiCommandSquasher{cmds, cntx, service, opts}.Run(rb); } static size_t GetRepliesMemSize() { @@ -45,11 +50,9 @@ class MultiCommandSquasher { enum class SquashResult { SQUASHED, SQUASHED_FULL, NOT_SQUASHED, ERROR }; - static constexpr int kMaxSquashing = 32; - private: MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* Service, - bool verify_commands, bool error_abort); + const Opts& opts); // Lazy initialize shard info. ShardExecInfo& PrepareShardInfo(ShardId sid); @@ -79,8 +82,7 @@ class MultiCommandSquasher { bool atomic_; // Whether working in any of the atomic modes const CommandId* base_cid_; // underlying cid (exec or eval) for executing batch hops - bool verify_commands_ = false; // Whether commands need to be verified before execution - bool error_abort_ = false; // Abort upon receiving error + Opts opts_; std::vector sharded_; std::vector order_; // reply order for squashed cmds diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 0e4438a7d..7b9c53602 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -343,7 +343,7 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { // we counter-balance CPU over-usage by forcing sleep. // We measure running_cycles before the preemption points, because they reset the counter. uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency()) / 2; - ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul))); + ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul))); return serialized; }