feat(transaction): simplify calc multi trans unique shard count (#672)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-01-18 14:37:29 +02:00 committed by GitHub
parent a130b71cd9
commit dc4306890c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 32 additions and 45 deletions

View file

@ -46,6 +46,7 @@ Transaction::Transaction(const CommandId* cid) : cid_(cid) {
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData);
multi_->multi_opts = cid->opt_mask();
multi_->shard_journal_write.resize(shard_set->size(), false);
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_->is_expanding = false; // we lock all the keys at once.
@ -625,15 +626,17 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].push_back(k_v);
}
uint32_t shard_journals_cnt = 0;
if (ServerState::tlocal()->journal()) {
CalculateUnqiueShardCntForMulti();
shard_journals_cnt = CalcMultiNumOfShardJournals();
}
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); });
shard_set->Add(
i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal(), shard_journals_cnt); });
}
WaitForShardCallbacks();
DCHECK_GE(GetUseCount(), 1u);
@ -641,31 +644,14 @@ void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMultiEnd " << DebugId();
}
void Transaction::CalculateUnqiueShardCntForMulti() {
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
std::atomic<uint32_t> unique_shard_cnt = 0;
auto update_shard_cnd = [&] {
EngineShard* shard = EngineShard::tlocal();
auto journal = shard->journal();
if (journal != nullptr) {
TxId last_tx = journal->GetLastTxId();
if (last_tx == txid_) {
unique_shard_cnt.fetch_add(1, std::memory_order_relaxed);
}
uint32_t Transaction::CalcMultiNumOfShardJournals() const {
uint32_t shard_journals_cnt = 0;
for (bool was_shard_write : multi_->shard_journal_write) {
if (was_shard_write) {
++shard_journals_cnt;
}
this->DecreaseRunCnt();
};
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, std::move(update_shard_cnd));
}
WaitForShardCallbacks();
unique_shard_cnt_ = unique_shard_cnt.load(std::memory_order_release);
return shard_journals_cnt;
}
void Transaction::Schedule() {
@ -1089,10 +1075,12 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard
CHECK_GE(DecreaseRunCnt(), 1u);
}
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard) {
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard,
uint32_t shard_journals_cnt) {
auto journal = shard->journal();
if (journal != nullptr && journal->GetLastTxId() == txid_) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_, {});
if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {});
}
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
@ -1237,7 +1225,6 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
auto cmd = facade::ToSV(cmd_with_full_args_.front());
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
}
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false);
}
@ -1245,6 +1232,9 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
uint32_t shard_cnt, bool multi_commands) const {
auto journal = shard->journal();
CHECK(journal);
if (multi_) {
multi_->shard_journal_write[shard->shard_id()] = true;
}
auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
}