chore: transaction simplification (#2347)

chore: simplify transaction multi-locking

Also, add the ananlysis routine that determines whether the schewduled transaction is contended with other transaction in a
shard thread.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-12-31 17:02:12 +02:00 committed by GitHub
parent ddbdf63470
commit 1fb0a486ac
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 87 additions and 131 deletions

View file

@ -90,7 +90,7 @@ void Transaction::InitGlobal() {
EnableAllShards();
}
void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping,
std::vector<PerShardCache>* out) {
auto args = full_args_;
@ -157,38 +157,23 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
CHECK_EQ(args_.size(), num_args);
}
void Transaction::InitMultiData(KeyIndex key_index) {
void Transaction::RecordMultiLocks(const KeyIndex& key_index) {
DCHECK(multi_);
DCHECK(!multi_->lock_mode);
if (multi_->mode == NON_ATOMIC)
return;
IntentLock::Mode mode = Mode();
auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); };
auto& tmp_uniques = tmp_space.uniq_keys;
auto lock_key = [this, mode, &tmp_uniques](string_view key) {
if (auto [_, inserted] = tmp_uniques.insert(KeyLockArgs::GetLockKey(key)); !inserted)
return;
multi_->lock_counts[key][mode]++;
};
// With EVAL, we call this function for EVAL itself as well as for each command
// for eval. currently, we lock everything only during the eval call.
if (!multi_->locks_recorded) {
tmp_uniques.clear();
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(full_args_, i));
if (key_index.bonus)
lock_key(ArgS(full_args_, *key_index.bonus));
multi_->locks_recorded = true;
}
multi_->lock_mode.emplace(Mode());
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(full_args_, i));
if (key_index.bonus)
lock_key(ArgS(full_args_, *key_index.bonus));
DCHECK(IsAtomicMulti());
DCHECK(multi_->mode == GLOBAL || !multi_->lock_counts.empty());
DCHECK(multi_->mode == GLOBAL || !multi_->locks.empty());
}
void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
@ -230,15 +215,13 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
*
**/
void Transaction::InitByKeys(KeyIndex key_index) {
auto args = full_args_;
if (key_index.start == args.size()) { // eval with 0 keys.
void Transaction::InitByKeys(const KeyIndex& key_index) {
if (key_index.start == full_args_.size()) { // eval with 0 keys.
CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name();
return;
}
DCHECK_LT(key_index.start, args.size());
DCHECK_LT(key_index.start, full_args_.size());
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
@ -265,7 +248,7 @@ void Transaction::InitByKeys(KeyIndex key_index) {
shard_data_.resize(shard_set->size()); // shard_data isn't sparse, so we must allocate for all :(
DCHECK(key_index.step == 1 || key_index.step == 2);
DCHECK(key_index.step != 2 || (args.size() % 2) == 0);
DCHECK(key_index.step != 2 || (full_args_.size() % 2) == 0);
// Safe, because flow below is not preemptive.
auto& shard_index = tmp_space.GetShardIndex(shard_data_.size());
@ -276,8 +259,8 @@ void Transaction::InitByKeys(KeyIndex key_index) {
// Initialize shard data based on distributed arguments.
InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping);
if (multi_)
InitMultiData(key_index);
if (multi_ && !multi_->lock_mode)
RecordMultiLocks(key_index);
DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front();
@ -298,7 +281,7 @@ void Transaction::InitByKeys(KeyIndex key_index) {
// Validation. Check reverse mapping was built correctly.
if (needs_reverse_mapping) {
for (size_t i = 0; i < args_.size(); ++i) {
DCHECK_EQ(args_[i], ArgS(args, reverse_index_[i])) << args;
DCHECK_EQ(args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
}
}
@ -373,7 +356,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) {
multi_->mode = GLOBAL;
InitBase(dbid, {});
InitGlobal();
multi_->locks_recorded = true;
multi_->lock_mode = IntentLock::EXCLUSIVE;
ScheduleInternal();
}
@ -519,7 +502,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// is concluding.
bool remove_txq = tx_stop_runnig || !txq_ooo;
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
VLOG(2) << "Remove from txq" << this->DebugId();
VLOG(2) << "Remove from txq " << this->DebugId();
shard->txq()->Remove(sd.pq_pos);
sd.pq_pos = TxQueue::kEnd;
}
@ -782,10 +765,10 @@ void Transaction::UnlockMulti() {
return;
auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
while (!multi_->lock_counts.empty()) {
auto entry = multi_->lock_counts.extract(multi_->lock_counts.begin());
ShardId sid = Shard(entry.key(), sharded_keys->size());
(*sharded_keys)[sid].emplace_back(std::move(entry.key()), entry.mapped());
while (!multi_->locks.empty()) {
auto entry = multi_->locks.extract(multi_->locks.begin());
ShardId sid = Shard(entry.value(), sharded_keys->size());
(*sharded_keys)[sid].emplace_back(std::move(entry.value()));
}
unsigned shard_journals_cnt =
@ -796,8 +779,8 @@ void Transaction::UnlockMulti() {
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [this, sharded_keys, shard_journals_cnt]() {
this->UnlockMultiShardCb(*sharded_keys, EngineShard::tlocal(), shard_journals_cnt);
shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() {
this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt);
intrusive_ptr_release(this);
});
}
@ -857,7 +840,7 @@ void Transaction::ExecuteAsync() {
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
DCHECK(!IsAtomicMulti() || multi_->locks_recorded);
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
// executed by the engine shard once it has been armed and coordinator thread will finish the
@ -941,6 +924,16 @@ void Transaction::Refurbish() {
cb_ptr_ = nullptr;
}
void Transaction::IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const {
unsigned shard_num = shard_set->size();
for (const auto& key : multi_->locks) {
ShardId key_sid = Shard(key, shard_num);
if (key_sid == sid) {
cb(key);
}
}
}
void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
@ -1290,8 +1283,10 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
return status;
}
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard,
void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard,
uint32_t shard_journals_cnt) {
DCHECK(multi_ && multi_->lock_mode);
auto journal = shard->journal();
if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()]) {
@ -1301,20 +1296,13 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
} else {
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]);
}
};
release(IntentLock::SHARED);
release(IntentLock::EXCLUSIVE);
for (const auto& key : sharded_keys) {
shard->db_slice().ReleaseNormalized(*multi_->lock_mode, db_index_, key, 1);
}
}
auto& sd = shard_data_[SidToId(shard->shard_id())];
ShardId sid = shard->shard_id();
auto& sd = shard_data_[SidToId(sid)];
sd.local_mask |= UNLOCK_MULTI;
// It does not have to be that all shards in multi transaction execute this tx.