mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
feat: Remove batch locks from non-atomic squashing (#1613)
feat: Remove batch locks from non-atomic squashing Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
3b0bd212f4
commit
844fe57dec
4 changed files with 52 additions and 38 deletions
|
@ -26,18 +26,25 @@ template <typename F> void IterateKeys(CmdArgList args, KeyIndex keys, F&& f) {
|
||||||
|
|
||||||
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
|
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
|
||||||
bool error_abort)
|
bool error_abort)
|
||||||
: cmds_{cmds}, cntx_{cntx}, base_cid_{cntx->transaction->GetCId()}, error_abort_{error_abort} {
|
: cmds_{cmds}, cntx_{cntx}, base_cid_{nullptr}, error_abort_{error_abort} {
|
||||||
auto mode = cntx->transaction->GetMultiMode();
|
auto mode = cntx->transaction->GetMultiMode();
|
||||||
track_keys_ = mode == Transaction::NON_ATOMIC;
|
base_cid_ = mode == Transaction::NON_ATOMIC ? nullptr : cntx->transaction->GetCId();
|
||||||
}
|
}
|
||||||
|
|
||||||
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
|
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
|
||||||
if (sharded_.empty())
|
if (sharded_.empty())
|
||||||
sharded_.resize(shard_set->size());
|
sharded_.resize(shard_set->size());
|
||||||
|
|
||||||
|
// See header top for atomic/non-atomic difference
|
||||||
auto& sinfo = sharded_[sid];
|
auto& sinfo = sharded_[sid];
|
||||||
if (!sinfo.local_tx)
|
if (!sinfo.local_tx) {
|
||||||
|
if (IsAtomic()) {
|
||||||
sinfo.local_tx = new Transaction{cntx_->transaction};
|
sinfo.local_tx = new Transaction{cntx_->transaction};
|
||||||
|
} else {
|
||||||
|
sinfo.local_tx = new Transaction{cntx_->transaction->GetCId(), sid};
|
||||||
|
sinfo.local_tx->StartMultiNonAtomic();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return sinfo;
|
return sinfo;
|
||||||
}
|
}
|
||||||
|
@ -71,9 +78,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
|
||||||
if (found_more || last_sid == kInvalidSid)
|
if (found_more || last_sid == kInvalidSid)
|
||||||
return SquashResult::NOT_SQUASHED;
|
return SquashResult::NOT_SQUASHED;
|
||||||
|
|
||||||
if (track_keys_)
|
|
||||||
IterateKeys(args, *keys, [this](MutableSlice key) { collected_keys_.insert(key); });
|
|
||||||
|
|
||||||
auto& sinfo = PrepareShardInfo(last_sid);
|
auto& sinfo = PrepareShardInfo(last_sid);
|
||||||
|
|
||||||
sinfo.had_writes |= (cmd->Cid()->opt_mask() & CO::WRITE);
|
sinfo.had_writes |= (cmd->Cid()->opt_mask() & CO::WRITE);
|
||||||
|
@ -138,21 +142,24 @@ bool MultiCommandSquasher::ExecuteSquashed() {
|
||||||
if (order_.empty())
|
if (order_.empty())
|
||||||
return false;
|
return false;
|
||||||
|
|
||||||
Transaction* tx = cntx_->transaction;
|
|
||||||
|
|
||||||
if (track_keys_) {
|
|
||||||
tmp_keylist_.assign(collected_keys_.begin(), collected_keys_.end());
|
|
||||||
tx->PrepareSquashedMultiHop(base_cid_, CmdArgList{tmp_keylist_});
|
|
||||||
} else {
|
|
||||||
auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
|
|
||||||
tx->PrepareSquashedMultiHop(base_cid_, cb);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto& sd : sharded_)
|
for (auto& sd : sharded_)
|
||||||
sd.replies.reserve(sd.cmds.size());
|
sd.replies.reserve(sd.cmds.size());
|
||||||
|
|
||||||
|
Transaction* tx = cntx_->transaction;
|
||||||
|
|
||||||
|
// Atomic transactions (that have all keys locked) perform hops and run squashed commands via
|
||||||
|
// stubs, non-atomic ones just run the commands in parallel.
|
||||||
|
if (IsAtomic()) {
|
||||||
cntx_->cid = base_cid_;
|
cntx_->cid = base_cid_;
|
||||||
|
auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
|
||||||
|
tx->PrepareSquashedMultiHop(base_cid_, cb);
|
||||||
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
|
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
|
||||||
|
} else {
|
||||||
|
shard_set->RunBlockingInParallel([this, tx](auto* es) {
|
||||||
|
if (!sharded_[es->shard_id()].cmds.empty())
|
||||||
|
SquashedHopCb(tx, es);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
bool aborted = false;
|
bool aborted = false;
|
||||||
|
|
||||||
|
@ -174,7 +181,6 @@ bool MultiCommandSquasher::ExecuteSquashed() {
|
||||||
sinfo.cmds.clear();
|
sinfo.cmds.clear();
|
||||||
|
|
||||||
order_.clear();
|
order_.clear();
|
||||||
collected_keys_.clear();
|
|
||||||
return aborted;
|
return aborted;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -202,6 +208,19 @@ void MultiCommandSquasher::Run() {
|
||||||
if (!sharded_.empty())
|
if (!sharded_.empty())
|
||||||
cntx_->transaction->ReportWritesSquashedMulti(
|
cntx_->transaction->ReportWritesSquashedMulti(
|
||||||
[this](ShardId sid) { return sharded_[sid].had_writes; });
|
[this](ShardId sid) { return sharded_[sid].had_writes; });
|
||||||
|
|
||||||
|
// UnlockMulti is a no-op for non-atomic multi transactions,
|
||||||
|
// still called for correctness and future changes
|
||||||
|
if (!IsAtomic()) {
|
||||||
|
for (auto& sd : sharded_) {
|
||||||
|
if (sd.local_tx)
|
||||||
|
sd.local_tx->UnlockMulti();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
bool MultiCommandSquasher::IsAtomic() const {
|
||||||
|
return base_cid_ != nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -13,7 +13,14 @@ namespace dfly {
|
||||||
|
|
||||||
// MultiCommandSquasher allows executing a series of commands under a multi transaction
|
// MultiCommandSquasher allows executing a series of commands under a multi transaction
|
||||||
// and squashing multiple consecutive single-shard commands into one hop whenever it's possible,
|
// and squashing multiple consecutive single-shard commands into one hop whenever it's possible,
|
||||||
// thus greatly decreasing the dispatch overhead for them.
|
// thus parallelizing command execution and greatly decreasing the dispatch overhead for them.
|
||||||
|
//
|
||||||
|
// Single shard commands are executed in small batches over multiple shards.
|
||||||
|
// For atomic multi transactions (global & locking ahead), the batch is executed with a regular hop
|
||||||
|
// of the multi transaction. Each shard contains a "stub" transaction to mimic the regular
|
||||||
|
// transactional api for commands. Non atomic multi transactions use regular shard_set dispatches
|
||||||
|
// instead of hops for executing batches. This allows avoiding locking many keys at once. Each shard
|
||||||
|
// contains a non-atomic multi transaction to execute squashed commands.
|
||||||
class MultiCommandSquasher {
|
class MultiCommandSquasher {
|
||||||
public:
|
public:
|
||||||
static void Execute(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
|
static void Execute(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
|
||||||
|
@ -58,21 +65,20 @@ class MultiCommandSquasher {
|
||||||
// Run all commands until completion.
|
// Run all commands until completion.
|
||||||
void Run();
|
void Run();
|
||||||
|
|
||||||
|
bool IsAtomic() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
absl::Span<StoredCmd> cmds_; // Input range of stored commands
|
absl::Span<StoredCmd> cmds_; // Input range of stored commands
|
||||||
ConnectionContext* cntx_; // Underlying context
|
ConnectionContext* cntx_; // Underlying context
|
||||||
const CommandId* base_cid_; // either EVAL or EXEC, used for squashed hops
|
|
||||||
|
// underlying cid (exec or eval) for executing batch hops, nullptr for non-atomic
|
||||||
|
const CommandId* base_cid_;
|
||||||
|
|
||||||
bool error_abort_ = false; // Abort upon receiving error
|
bool error_abort_ = false; // Abort upon receiving error
|
||||||
|
|
||||||
std::vector<ShardExecInfo> sharded_;
|
std::vector<ShardExecInfo> sharded_;
|
||||||
std::vector<ShardId> order_; // reply order for squashed cmds
|
std::vector<ShardId> order_; // reply order for squashed cmds
|
||||||
|
|
||||||
// multi modes that lock on hops (non-atomic, incremental) need keys for squashed hops.
|
|
||||||
// track_keys_ stores whether to populate collected_keys_
|
|
||||||
bool track_keys_;
|
|
||||||
absl::flat_hash_set<MutableSlice> collected_keys_;
|
|
||||||
|
|
||||||
std::vector<MutableSlice> tmp_keylist_;
|
std::vector<MutableSlice> tmp_keylist_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -332,14 +332,6 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::PrepareSquashedMultiHop(const CommandId* cid, CmdArgList keys) {
|
|
||||||
MultiSwitchCmd(cid);
|
|
||||||
|
|
||||||
multi_->role = SQUASHER;
|
|
||||||
InitBase(db_index_, keys);
|
|
||||||
InitByKeys(KeyIndex::Range(0, keys.size()));
|
|
||||||
}
|
|
||||||
|
|
||||||
void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
|
void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
|
||||||
absl::FunctionRef<bool(ShardId)> enabled) {
|
absl::FunctionRef<bool(ShardId)> enabled) {
|
||||||
CHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
|
CHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
|
||||||
|
|
|
@ -195,9 +195,6 @@ class Transaction {
|
||||||
renabled_auto_journal_.store(true, std::memory_order_relaxed);
|
renabled_auto_journal_.store(true, std::memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Prepare a squashed hop on given keys.
|
|
||||||
void PrepareSquashedMultiHop(const CommandId* cid, CmdArgList keys);
|
|
||||||
|
|
||||||
// Prepare a squashed hop on given shards.
|
// Prepare a squashed hop on given shards.
|
||||||
// Only compatible with multi modes that acquire all locks ahead - global and lock_ahead.
|
// Only compatible with multi modes that acquire all locks ahead - global and lock_ahead.
|
||||||
void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef<bool(ShardId)> enabled);
|
void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef<bool(ShardId)> enabled);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue