From 4d4fed6feccab0b7d4f98f115fc97ad6c4c1414f Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 18 Feb 2024 12:25:57 +0300 Subject: [PATCH] chore(transaction): Untie scheduling from multi status (#2590) * chore(transaction): Untie scheduling from multi status Idea: We decide whether we have to schedule not based on our multi status (atomic multi), but solely based on the fact if COORD_SCHED is set Goal: Being able to use ScheduleSingleHop()/Schedule() for multi transactions, and thus later allow single hop multi transactions --------- Signed-off-by: Vladislav Oleshko --- src/server/command_registry.cc | 4 ++ src/server/command_registry.h | 2 + src/server/transaction.cc | 101 ++++++++++++++++++++++----------- src/server/transaction.h | 39 ++++++++----- 4 files changed, 98 insertions(+), 48 deletions(-) diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 033681189..43aef8769 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -55,6 +55,10 @@ bool CommandId::IsTransactional() const { return false; } +bool CommandId::IsMultiTransactional() const { + return CO::IsTransKind(name()) || CO::IsEvalKind(name()); +} + uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const { int64_t before = absl::GetCurrentTimeNanos(); handler_(args, cntx); diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 20069ebec..8b42e65f5 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -97,6 +97,8 @@ class CommandId : public facade::CommandId { bool IsTransactional() const; + bool IsMultiTransactional() const; + bool IsReadOnly() const { return opt_mask_ & CO::READONLY; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f07f3dd68..988113de6 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -193,8 +193,7 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio multi_->role = SQUASHED_STUB; multi_->shard_journal_write.resize(1); - time_now_ms_ = parent->time_now_ms_; - + MultiUpdateWithParent(parent); if (slot_id.has_value()) { unique_slot_checker_.Add(*slot_id); } @@ -447,7 +446,6 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid, MultiSwitchCmd(cid); - multi_->role = SQUASHER; InitBase(db_index_, {}); // Because squashing already determines active shards by partitioning commands, @@ -465,6 +463,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid, shard_data_[i].arg_start = 0; shard_data_[i].arg_count = 0; } + + MultiBecomeSquasher(); } void Transaction::StartMultiGlobal(DbIndex dbid) { @@ -479,7 +479,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { ScheduleInternal(); } -void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) { +void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling) { DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys"; DCHECK(multi_); @@ -493,7 +493,8 @@ void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) { InitBase(dbid, absl::MakeSpan(keys)); InitByKeys(KeyIndex::Range(0, keys.size())); - ScheduleInternal(); + if (!skip_scheduling) + ScheduleInternal(); full_args_ = {nullptr, 0}; // InitBase set it to temporary keys, now we reset it. } @@ -543,10 +544,28 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK_EQ(coordinator_state_, 0u); } + // Each hop needs to be prepared, reset role if (multi_->role == SQUASHER) multi_->role = DEFAULT; } +void Transaction::MultiUpdateWithParent(const Transaction* parent) { + // Disabled because of single shard lua optimization + // DCHECK(multi_); + // DCHECK(parent->multi_); // it might not be a squasher yet, but certainly is multi + DCHECK_EQ(multi_->role, SQUASHED_STUB); + txid_ = parent->txid_; + time_now_ms_ = parent->time_now_ms_; + unique_slot_checker_ = parent->unique_slot_checker_; +} + +void Transaction::MultiBecomeSquasher() { + DCHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD); + DCHECK_GT(GetUniqueShardCnt(), 0u); // initialized and determined active shards + DCHECK(cid_->IsMultiTransactional()); // proper base command set + multi_->role = SQUASHER; +} + string Transaction::DebugId(std::optional sid) const { DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_); @@ -681,6 +700,11 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // This is the last hop, so clear cont_trans if its held by the current tx shard->RemoveContTx(this); + if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop + DCHECK(cid_->IsMultiTransactional()); + MultiReportJournalOnShard(shard); + } + // It has 2 responsibilities. // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head @@ -708,10 +732,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or // auto-tune based on the static analysis (by identifying commands with hardcoded command names). void Transaction::ScheduleInternal() { - DCHECK(!shard_data_.empty()); - DCHECK_EQ(0u, txid_); - DCHECK_EQ(0, coordinator_state_ & COORD_SCHED); + DCHECK_EQ(txid_, 0u); + DCHECK_EQ(coordinator_state_ & COORD_SCHED, 0); DCHECK_GT(unique_shard_cnt_, 0u); + DCHECK(!IsAtomicMulti() || cid_->IsMultiTransactional()); DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards"; @@ -776,26 +800,29 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } DCHECK(!cb_ptr_); - DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. - cb_ptr_ = &cb; - if (IsAtomicMulti()) { + // We can be already scheduled if we're part of a multi transaction. Note: If a multi tx isn't + // scheduled, we assume it's not mimicking the interface, but actually preparing a single hop. + bool scheduled = (coordinator_state_ & COORD_SCHED) > 0; + if (scheduled) { + DCHECK(IsAtomicMulti()); multi_->concluding = true; } else { - coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude. + // For multi it only makes sense with squashing and thus a proper underlying command + DCHECK(!IsAtomicMulti() || (multi_->role == SQUASHER && cid_->IsMultiTransactional())); + coordinator_state_ |= COORD_CONCLUDING; } // If we run only on one shard and conclude, we can possibly avoid scheduling at all // and directly run the callback on the destination thread if the locks are free. - bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); - + bool schedule_fast = !scheduled && (unique_shard_cnt_ == 1) && !IsGlobal(); bool was_ooo = false, was_inline = false; if (schedule_fast) { DCHECK_NE(unique_shard_id_, kInvalidSid); DCHECK(IsActive(unique_shard_id_)); - DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); + DCHECK(shard_data_.size() == 1 || multi_); InitTxTime(); shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; @@ -825,7 +852,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } } else { // This transaction either spans multiple shards and/or is multi, which schedule in advance. - if (!IsAtomicMulti()) + if (!scheduled) ScheduleInternal(); ExecuteAsync(); @@ -845,6 +872,9 @@ void Transaction::ReportWritesSquashedMulti(absl::FunctionRef had DCHECK(multi_); for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++) multi_->shard_journal_write[i] |= had_write(i); + + // Update imemdiately if we decide to conclude after one hop without UnlockMulti + multi_->shard_journal_cnt = CalcMultiNumOfShardJournals(); } // Runs in the coordinator fiber. @@ -853,7 +883,8 @@ void Transaction::UnlockMulti() { DCHECK(multi_); DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress. - if (multi_->mode == NON_ATOMIC) + // Return if we either didn't schedule at all (and thus run) or already did conclude + if ((coordinator_state_ & COORD_SCHED) == 0 || (coordinator_state_ & COORD_CONCLUDING) > 0) return; multi_->frozen_keys_set.clear(); @@ -864,15 +895,14 @@ void Transaction::UnlockMulti() { (*sharded_keys)[sid].emplace_back(key); } - unsigned shard_journals_cnt = - ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; + multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); DCHECK_EQ(shard_data_.size(), shard_set->size()); for (ShardId i = 0; i < shard_data_.size(); ++i) { - shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() { - this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt); + shard_set->Add(i, [this, sharded_keys, i]() { + this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal()); intrusive_ptr_release(this); }); } @@ -894,7 +924,7 @@ void Transaction::Schedule() { if (multi_ && multi_->role == SQUASHED_STUB) return; - if (!IsAtomicMulti()) + if ((coordinator_state_ & COORD_SCHED) == 0) ScheduleInternal(); } @@ -987,10 +1017,8 @@ void Transaction::FIX_ConcludeJournalExec() { if (!multi_->shard_journal_write.front()) return; - if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1, - unique_slot_checker_.GetUniqueSlotId(), {}, false); - } + multi_->shard_journal_cnt = 1; + MultiReportJournalOnShard(EngineShard::tlocal()); } void Transaction::EnableShard(ShardId sid) { @@ -1363,16 +1391,21 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { return result; } -void Transaction::UnlockMultiShardCb(absl::Span sharded_keys, - EngineShard* shard, uint32_t shard_journals_cnt) { - DCHECK(multi_ && multi_->lock_mode); - - auto journal = shard->journal(); - - if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()]) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, +void Transaction::MultiReportJournalOnShard(EngineShard* shard) const { + DCHECK_EQ(EngineShard::tlocal(), shard); + auto* journal = shard->journal(); + size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id(); + if (journal != nullptr && multi_->shard_journal_write[write_idx]) { + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt, unique_slot_checker_.GetUniqueSlotId(), {}, true); } +} + +void Transaction::UnlockMultiShardCb(absl::Span sharded_keys, + EngineShard* shard) { + DCHECK(multi_ && multi_->lock_mode); + + MultiReportJournalOnShard(shard); if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); diff --git a/src/server/transaction.h b/src/server/transaction.h index 4e59f990b..b8ac0c0e8 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -233,13 +233,11 @@ class Transaction { void StartMultiGlobal(DbIndex dbid); // Start multi in LOCK_AHEAD mode with given keys. - void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys); + void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling = false); // Start multi in NON_ATOMIC mode. void StartMultiNonAtomic(); - void InitTxTime(); - // Report which shards had write commands that executed on stub transactions // and thus did not mark itself in MultiData::shard_journal_write. void ReportWritesSquashedMulti(absl::FunctionRef had_write); @@ -250,6 +248,12 @@ class Transaction { // Set new command for multi transaction. void MultiSwitchCmd(const CommandId* cid); + // Copy txid, time and unique slot from parent + void MultiUpdateWithParent(const Transaction* parent); + + // Set squasher role + void MultiBecomeSquasher(); + // Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. // Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; @@ -297,6 +301,14 @@ class Transaction { return multi_->mode; } + // Whether the transaction is multi and runs in an atomic mode. + // This, instead of just IsMulti(), should be used to check for the possibility of + // different optimizations, because they can safely be applied to non-atomic multi + // transactions as well. + bool IsAtomicMulti() const { + return multi_ && (multi_->mode == LOCK_AHEAD || multi_->mode == GLOBAL); + } + bool IsGlobal() const; // If blocking tx was woken up on this shard, get wake key. @@ -389,12 +401,14 @@ class Transaction { // Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING bool concluding = false; + unsigned cmd_seq_num = 0; // used for debugging purposes. + // The shard_journal_write vector variable is used to determine the number of shards // involved in a multi-command transaction. This information is utilized by replicas when // executing multi-command. For every write to a shard journal, the corresponding index in the // vector is marked as true. absl::InlinedVector shard_journal_write; - unsigned cmd_seq_num = 0; // used for debugging purposes. + unsigned shard_journal_cnt; }; enum CoordinatorState : uint8_t { @@ -507,8 +521,13 @@ class Transaction { // Run callback inline as part of multi stub. OpStatus RunSquashedMultiCb(RunnableType cb); - void UnlockMultiShardCb(absl::Span sharded_keys, EngineShard* shard, - uint32_t shard_journals_cnt); + // Set time_now_ms_ + void InitTxTime(); + + // If journaling is enabled, report final exec opcode to finish the chain of commands. + void MultiReportJournalOnShard(EngineShard* shard) const; + + void UnlockMultiShardCb(absl::Span sharded_keys, EngineShard* shard); // In a multi-command transaction, we determine the number of shard journals that we wrote entries // to by updating the shard_journal_write vector during command execution. The total number of @@ -525,14 +544,6 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } - // Whether the transaction is multi and runs in an atomic mode. - // This, instead of just IsMulti(), should be used to check for the possibility of - // different optimizations, because they can safely be applied to non-atomic multi - // transactions as well. - bool IsAtomicMulti() const { - return multi_ && multi_->mode != NON_ATOMIC; - } - bool IsActiveMulti() const { return multi_ && multi_->role != SQUASHED_STUB; }