From dc4306890c8903439894a2e8068f66b95164792f Mon Sep 17 00:00:00 2001 From: adiholden Date: Wed, 18 Jan 2023 14:37:29 +0200 Subject: [PATCH] feat(transaction): simplify calc multi trans unique shard count (#672) Signed-off-by: adi_holden --- src/server/journal/journal.cc | 4 --- src/server/journal/journal.h | 2 -- src/server/journal/journal_slice.cc | 1 - src/server/journal/journal_slice.h | 5 --- src/server/transaction.cc | 48 ++++++++++++----------------- src/server/transaction.h | 17 +++++++--- 6 files changed, 32 insertions(+), 45 deletions(-) diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index c63926cc5..0acf23b9b 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -116,10 +116,6 @@ void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}); } -TxId Journal::GetLastTxId() { - return journal_slice.GetLastTxId(); -} - /* void Journal::OpArgs(TxId txid, Op opcode, Span keys) { DCHECK(journal_slice.IsOpen()); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index f86eaea32..9d75a56ab 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -55,8 +55,6 @@ class Journal { void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload); - TxId GetLastTxId(); - private: mutable boost::fibers::mutex state_mu_; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index b982b054c..f8e42aa20 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -117,7 +117,6 @@ error_code JournalSlice::Close() { void JournalSlice::AddLogRecord(const Entry& entry) { DCHECK(ring_buffer_); - last_txid_ = entry.txid; iterating_cb_arr_ = true; for (const auto& k_v : change_cb_arr_) { k_v.second(entry); diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 8971fc2c5..ef63e59de 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -47,10 +47,6 @@ class JournalSlice { uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t); - TxId GetLastTxId() { - return last_txid_; - } - private: struct RingItem; @@ -66,7 +62,6 @@ class JournalSlice { uint32_t slice_index_ = UINT32_MAX; uint32_t next_cb_id_ = 1; - TxId last_txid_ = 0; std::error_code status_ec_; bool lameduck_ = false; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 5b5b56c38..9f21a4544 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -46,6 +46,7 @@ Transaction::Transaction(const CommandId* cid) : cid_(cid) { if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_.reset(new MultiData); multi_->multi_opts = cid->opt_mask(); + multi_->shard_journal_write.resize(shard_set->size(), false); if (cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_->is_expanding = false; // we lock all the keys at once. @@ -625,15 +626,17 @@ void Transaction::UnlockMulti() { sharded_keys[sid].push_back(k_v); } + uint32_t shard_journals_cnt = 0; if (ServerState::tlocal()->journal()) { - CalculateUnqiueShardCntForMulti(); + shard_journals_cnt = CalcMultiNumOfShardJournals(); } uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); DCHECK_EQ(prev, 0u); for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); }); + shard_set->Add( + i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal(), shard_journals_cnt); }); } WaitForShardCallbacks(); DCHECK_GE(GetUseCount(), 1u); @@ -641,31 +644,14 @@ void Transaction::UnlockMulti() { VLOG(1) << "UnlockMultiEnd " << DebugId(); } -void Transaction::CalculateUnqiueShardCntForMulti() { - uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); - DCHECK_EQ(prev, 0u); - - std::atomic unique_shard_cnt = 0; - - auto update_shard_cnd = [&] { - EngineShard* shard = EngineShard::tlocal(); - auto journal = shard->journal(); - - if (journal != nullptr) { - TxId last_tx = journal->GetLastTxId(); - if (last_tx == txid_) { - unique_shard_cnt.fetch_add(1, std::memory_order_relaxed); - } +uint32_t Transaction::CalcMultiNumOfShardJournals() const { + uint32_t shard_journals_cnt = 0; + for (bool was_shard_write : multi_->shard_journal_write) { + if (was_shard_write) { + ++shard_journals_cnt; } - this->DecreaseRunCnt(); - }; - - for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add(i, std::move(update_shard_cnd)); } - WaitForShardCallbacks(); - - unique_shard_cnt_ = unique_shard_cnt.load(std::memory_order_release); + return shard_journals_cnt; } void Transaction::Schedule() { @@ -1089,10 +1075,12 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard CHECK_GE(DecreaseRunCnt(), 1u); } -void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard) { +void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard, + uint32_t shard_journals_cnt) { auto journal = shard->journal(); - if (journal != nullptr && journal->GetLastTxId() == txid_) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_, {}); + + if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) { + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}); } if (multi_->multi_opts & CO::GLOBAL_TRANS) { @@ -1237,7 +1225,6 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { auto cmd = facade::ToSV(cmd_with_full_args_.front()); entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id())); } - LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false); } @@ -1245,6 +1232,9 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload& uint32_t shard_cnt, bool multi_commands) const { auto journal = shard->journal(); CHECK(journal); + if (multi_) { + multi_->shard_journal_write[shard->shard_id()] = true; + } auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload)); } diff --git a/src/server/transaction.h b/src/server/transaction.h index 0b114f68b..e8c4b79c4 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -250,6 +250,11 @@ class Transaction { absl::flat_hash_map lock_counts; std::vector keys; + // The shard_journal_write vector variable is used to determine the number of shards + // involved in a multi-command transaction. This information is utilized by replicas when + // executing multi-command. For every write to a shard journal, the corresponding index in the + // vector is marked as true. + absl::InlinedVector shard_journal_write; uint32_t multi_opts = 0; // options of the parent transaction. // Whether this transaction can lock more keys during its progress. @@ -298,11 +303,15 @@ class Transaction { void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard); - void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); + void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard, + uint32_t shard_journals_cnt); - // Calculate number of unqiue shards for multi transaction after alll commands were executed. - // This value is used in stable state replication to allow applying the command atomically. - void CalculateUnqiueShardCntForMulti(); + // In a multi-command transaction, we determine the number of shard journals that we wrote entries + // to by updating the shard_journal_write vector during command execution. The total number of + // shard journals written to can be found by summing the true values in the vector. This value is + // then written to each shard journal with the journal EXEC op, enabling replication to + // synchronize the multi-shard transaction. + uint32_t CalcMultiNumOfShardJournals() const; void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });