From 844fe57dec10f0d7a95a48d406bab0e7f595a5d2 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Wed, 2 Aug 2023 09:16:47 +0300 Subject: [PATCH] feat: Remove batch locks from non-atomic squashing (#1613) feat: Remove batch locks from non-atomic squashing Signed-off-by: Vladislav Oleshko --- src/server/multi_command_squasher.cc | 59 ++++++++++++++++++---------- src/server/multi_command_squasher.h | 20 ++++++---- src/server/transaction.cc | 8 ---- src/server/transaction.h | 3 -- 4 files changed, 52 insertions(+), 38 deletions(-) diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 3de19b0bd..9b8609731 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -26,18 +26,25 @@ template void IterateKeys(CmdArgList args, KeyIndex keys, F&& f) { MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, bool error_abort) - : cmds_{cmds}, cntx_{cntx}, base_cid_{cntx->transaction->GetCId()}, error_abort_{error_abort} { + : cmds_{cmds}, cntx_{cntx}, base_cid_{nullptr}, error_abort_{error_abort} { auto mode = cntx->transaction->GetMultiMode(); - track_keys_ = mode == Transaction::NON_ATOMIC; + base_cid_ = mode == Transaction::NON_ATOMIC ? nullptr : cntx->transaction->GetCId(); } MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) { if (sharded_.empty()) sharded_.resize(shard_set->size()); + // See header top for atomic/non-atomic difference auto& sinfo = sharded_[sid]; - if (!sinfo.local_tx) - sinfo.local_tx = new Transaction{cntx_->transaction}; + if (!sinfo.local_tx) { + if (IsAtomic()) { + sinfo.local_tx = new Transaction{cntx_->transaction}; + } else { + sinfo.local_tx = new Transaction{cntx_->transaction->GetCId(), sid}; + sinfo.local_tx->StartMultiNonAtomic(); + } + } return sinfo; } @@ -71,9 +78,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm if (found_more || last_sid == kInvalidSid) return SquashResult::NOT_SQUASHED; - if (track_keys_) - IterateKeys(args, *keys, [this](MutableSlice key) { collected_keys_.insert(key); }); - auto& sinfo = PrepareShardInfo(last_sid); sinfo.had_writes |= (cmd->Cid()->opt_mask() & CO::WRITE); @@ -138,21 +142,24 @@ bool MultiCommandSquasher::ExecuteSquashed() { if (order_.empty()) return false; - Transaction* tx = cntx_->transaction; - - if (track_keys_) { - tmp_keylist_.assign(collected_keys_.begin(), collected_keys_.end()); - tx->PrepareSquashedMultiHop(base_cid_, CmdArgList{tmp_keylist_}); - } else { - auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); }; - tx->PrepareSquashedMultiHop(base_cid_, cb); - } - for (auto& sd : sharded_) sd.replies.reserve(sd.cmds.size()); - cntx_->cid = base_cid_; - tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); }); + Transaction* tx = cntx_->transaction; + + // Atomic transactions (that have all keys locked) perform hops and run squashed commands via + // stubs, non-atomic ones just run the commands in parallel. + if (IsAtomic()) { + cntx_->cid = base_cid_; + auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); }; + tx->PrepareSquashedMultiHop(base_cid_, cb); + tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); }); + } else { + shard_set->RunBlockingInParallel([this, tx](auto* es) { + if (!sharded_[es->shard_id()].cmds.empty()) + SquashedHopCb(tx, es); + }); + } bool aborted = false; @@ -174,7 +181,6 @@ bool MultiCommandSquasher::ExecuteSquashed() { sinfo.cmds.clear(); order_.clear(); - collected_keys_.clear(); return aborted; } @@ -202,6 +208,19 @@ void MultiCommandSquasher::Run() { if (!sharded_.empty()) cntx_->transaction->ReportWritesSquashedMulti( [this](ShardId sid) { return sharded_[sid].had_writes; }); + + // UnlockMulti is a no-op for non-atomic multi transactions, + // still called for correctness and future changes + if (!IsAtomic()) { + for (auto& sd : sharded_) { + if (sd.local_tx) + sd.local_tx->UnlockMulti(); + } + } +} + +bool MultiCommandSquasher::IsAtomic() const { + return base_cid_ != nullptr; } } // namespace dfly diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 08257f060..0fc57fdb8 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -13,7 +13,14 @@ namespace dfly { // MultiCommandSquasher allows executing a series of commands under a multi transaction // and squashing multiple consecutive single-shard commands into one hop whenever it's possible, -// thus greatly decreasing the dispatch overhead for them. +// thus parallelizing command execution and greatly decreasing the dispatch overhead for them. +// +// Single shard commands are executed in small batches over multiple shards. +// For atomic multi transactions (global & locking ahead), the batch is executed with a regular hop +// of the multi transaction. Each shard contains a "stub" transaction to mimic the regular +// transactional api for commands. Non atomic multi transactions use regular shard_set dispatches +// instead of hops for executing batches. This allows avoiding locking many keys at once. Each shard +// contains a non-atomic multi transaction to execute squashed commands. class MultiCommandSquasher { public: static void Execute(absl::Span cmds, ConnectionContext* cntx, @@ -58,21 +65,20 @@ class MultiCommandSquasher { // Run all commands until completion. void Run(); + bool IsAtomic() const; + private: absl::Span cmds_; // Input range of stored commands ConnectionContext* cntx_; // Underlying context - const CommandId* base_cid_; // either EVAL or EXEC, used for squashed hops + + // underlying cid (exec or eval) for executing batch hops, nullptr for non-atomic + const CommandId* base_cid_; bool error_abort_ = false; // Abort upon receiving error std::vector sharded_; std::vector order_; // reply order for squashed cmds - // multi modes that lock on hops (non-atomic, incremental) need keys for squashed hops. - // track_keys_ stores whether to populate collected_keys_ - bool track_keys_; - absl::flat_hash_set collected_keys_; - std::vector tmp_keylist_; }; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0661c251f..8b59671dd 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -332,14 +332,6 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { return OpStatus::OK; } -void Transaction::PrepareSquashedMultiHop(const CommandId* cid, CmdArgList keys) { - MultiSwitchCmd(cid); - - multi_->role = SQUASHER; - InitBase(db_index_, keys); - InitByKeys(KeyIndex::Range(0, keys.size())); -} - void Transaction::PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef enabled) { CHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD); diff --git a/src/server/transaction.h b/src/server/transaction.h index b6c9e8795..99e65d508 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -195,9 +195,6 @@ class Transaction { renabled_auto_journal_.store(true, std::memory_order_relaxed); } - // Prepare a squashed hop on given keys. - void PrepareSquashedMultiHop(const CommandId* cid, CmdArgList keys); - // Prepare a squashed hop on given shards. // Only compatible with multi modes that acquire all locks ahead - global and lock_ahead. void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef enabled);