From 90a9f05e36d262c24b4f2d75bd630a3d2cdb23af Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 30 Jan 2024 19:43:06 +0300 Subject: [PATCH] chore(transaction): Use PhasedBarrier for easier synchronization (#2455) chore(transaction): Use PhasedBarrier for easier synchronization Signed-off-by: Vladislav Oleshko --- src/server/transaction.cc | 188 +++++++++++++++----------------------- src/server/transaction.h | 80 ++++++++-------- 2 files changed, 113 insertions(+), 155 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 5db16d012..e9db7b5a2 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -71,12 +71,51 @@ void RecordTxScheduleStats(const Transaction* tx) { } } +void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inline) { + DCHECK_EQ(tx->GetUniqueShardCnt(), 1u); + auto* ss = ServerState::tlocal(); + if (was_ooo) { + ss->stats.tx_type_cnt[was_inline ? ServerState::INLINE : ServerState::QUICK]++; + } else { + ss->stats.tx_type_cnt[ServerState::NORMAL]++; + } + ss->stats.tx_width_freq_arr[0]++; +} + } // namespace IntentLock::Mode Transaction::LockMode() const { return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } +void Transaction::PhasedBarrier::Start(uint32_t count) { + DCHECK_EQ(DEBUG_Count(), 0u); + count_.store(count, memory_order_release); +} + +bool Transaction::PhasedBarrier::Active() const { + return count_.load(memory_order_acquire) > 0; +} + +void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) { + // Prevent transaction from being destroyed after count was decreased and Wait() unlocked, + // but before this thread finished notifying. + ::boost::intrusive_ptr guard(keep_alive); + + uint32_t before = count_.fetch_sub(1); + CHECK_GE(before, 1u); + if (before == 1) + ec_.notify(); +} + +void Transaction::PhasedBarrier::Wait() { + ec_.await([this] { return count_.load(memory_order_acquire) == 0; }); +} + +uint32_t Transaction::PhasedBarrier::DEBUG_Count() const { + return count_.load(memory_order_relaxed); +} + /** * @brief Construct a new Transaction:: Transaction object * @@ -246,26 +285,6 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { } } -/** - * - * There are 4 options that we consider here: - * a. T spans a single shard and it's not multi. - * unique_shard_id_ is predefined before the schedule() is called. - * In that case only a single thread will be scheduled and it will use shard_data[0] just because - * shard_data.size() = 1. Coordinator thread can access any data because there is a - * schedule barrier between InitByArgs and RunInShard/IsArmedInShard functions. - * b. T spans multiple shards and its not multi - * In that case multiple threads will be scheduled. Similarly they have a schedule barrier, - * and IsArmedInShard can read any variable from shard_data[x]. - * c. Trans spans a single shard and it's multi. shard_data has size of ess_.size. - * IsArmedInShard will check shard_data[x]. - * d. Trans spans multiple shards and it's multi. Similarly shard_data[x] will be checked. - * unique_shard_cnt_ and unique_shard_id_ are not accessed until shard_data[x] is armed, hence - * we have a barrier between coordinator and engine-threads. Therefore there should not be - * data races. - * - **/ - void Transaction::InitByKeys(const KeyIndex& key_index) { if (key_index.start == full_args_.size()) { // eval with 0 keys. CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name(); @@ -499,9 +518,9 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA // Runs in the dbslice thread. Returns true if the transaction continues running in the thread. bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { - DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); - CHECK(cb_ptr_) << DebugId(); + DCHECK(run_barrier_.Active()); DCHECK_GT(txid_, 0u); + CHECK(cb_ptr_) << DebugId(); // Unlike with regular transactions we do not acquire locks upon scheduling // because Scheduling is done before multi-exec batch is executed. Therefore we @@ -511,7 +530,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { auto& sd = shard_data_[idx]; CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); - CHECK_GT(run_count_.load(memory_order_relaxed), 0u); + CHECK_GT(run_barrier_.DEBUG_Count(), 0u); VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; @@ -626,9 +645,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { } } - DecreaseRunCnt(); - // From this point on we can not access 'this'. - + run_barrier_.Dec(this); // From this point on we can not access 'this'. return !is_concluding; } @@ -718,71 +735,56 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude. } - // If we run only on one shard and conclude, we can avoid scheduling at all - // and directly dispatch the task to its destination shard. + // If we run only on one shard and conclude, we can possibly avoid scheduling at all + // and directly run the callback on the destination thread if the locks are free. bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); - bool was_ooo = false; - bool run_inline = false; - ServerState* ss = nullptr; + bool was_ooo = false, was_inline = false; if (schedule_fast) { DCHECK_NE(unique_shard_id_, kInvalidSid); DCHECK(IsActive(unique_shard_id_)); DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); - // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. - shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed); - run_count_.store(1, memory_order_release); - time_now_ms_ = GetCurrentTimeMs(); + shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed); - // NOTE: schedule_cb cannot update data on stack when run_fast is false. - // This is because ScheduleSingleHop can finish before the callback returns. + // Start new phase, be careful with writes until phase end! + run_barrier_.Start(1); - // This happens when ScheduleUniqueShard schedules into TxQueue (hence run_fast is false), and - // then calls PollExecute that in turn runs the callback which calls DecreaseRunCnt. As a result - // WaitForShardCallbacks below is unblocked before schedule_cb returns. However, if run_fast is - // true, then we may mutate stack variables, but only before DecreaseRunCnt is called. auto schedule_cb = [this, &was_ooo] { bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); if (run_fast) { + // We didn't decrease the barrier, so the scope is valid UNTIL Dec() below + DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u); was_ooo = true; - // it's important to DecreaseRunCnt only for run_fast and after was_ooo is assigned. - // If DecreaseRunCnt were called before ScheduleUniqueShard finishes - // then WaitForShardCallbacks below could exit before schedule_cb assigns return value - // to was_ooo and cause stack corruption. - DecreaseRunCnt(); + run_barrier_.Dec(this); } + // Otherwise it's not safe to access the function scope, as + // ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below. }; - ss = ServerState::tlocal(); + auto* ss = ServerState::tlocal(); if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { DVLOG(2) << "Inline scheduling a transaction"; schedule_cb(); - run_inline = true; + was_inline = true; } else { shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. } - } else { // This transaction either spans multiple shards and/or is multi. - if (!IsAtomicMulti()) // Multi schedule in advance. + } else { + // This transaction either spans multiple shards and/or is multi, which schedule in advance. + if (!IsAtomicMulti()) ScheduleInternal(); ExecuteAsync(); } - DVLOG(2) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); - WaitForShardCallbacks(); - DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId(); + run_barrier_.Wait(); if (schedule_fast) { CHECK(!cb_ptr_); // we should have reset it within the callback. - if (was_ooo) { - ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++; - } else { - ss->stats.tx_type_cnt[ServerState::NORMAL]++; - } - ss->stats.tx_width_freq_arr[0]++; + RecordTxScheduleFastStats(this, was_ooo, was_inline); } cb_ptr_ = nullptr; return local_result_; @@ -814,9 +816,6 @@ void Transaction::UnlockMulti() { unsigned shard_journals_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; - uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); - DCHECK_EQ(prev, 0u); - use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); for (ShardId i = 0; i < shard_data_.size(); ++i) { shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() { @@ -867,9 +866,7 @@ void Transaction::Execute(RunnableType cb, bool conclude) { ExecuteAsync(); - DVLOG(1) << "Execute::WaitForCbs " << DebugId(); - WaitForShardCallbacks(); - DVLOG(1) << "Execute::WaitForCbs " << DebugId() << " completed"; + run_barrier_.Wait(); cb_ptr_ = nullptr; } @@ -877,46 +874,33 @@ void Transaction::Execute(RunnableType cb, bool conclude) { // Runs in coordinator thread. void Transaction::ExecuteAsync() { DVLOG(1) << "ExecuteAsync " << DebugId(); - DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); - // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be - // executed by the engine shard once it has been armed and coordinator thread will finish the - // transaction before engine shard thread stops accessing it. Therefore, we increase reference - // by number of callbacks accessing 'this' to allow callbacks to execute shard->Execute(this); - // safely. - use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); + // Set armed flags on all active shards + IterateActiveShards([](auto& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); }); - // We access sd.is_armed outside of shard-threads but we guard it with run_count_ release. - IterateActiveShards( - [](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); }); - - // this fence prevents that a read or write operation before a release fence will be reordered - // with a write operation after a release fence. Specifically no writes below will be reordered - // upwards. Important, because it protects non-threadsafe local_mask from being accessed by - // IsArmedInShard in other threads. - run_count_.store(unique_shard_cnt_, memory_order_release); + // Start new phase: release semantics. From here we can be discovered by IsArmedInShard(), + // and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end! + run_barrier_.Start(unique_shard_cnt_); auto* ss = ServerState::tlocal(); if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId(); EngineShard::tlocal()->PollExecution("exec_cb", this); - intrusive_ptr_release(this); // against use_count_.fetch_add above. return; } - auto cb = [this] { - EngineShard::tlocal()->PollExecution("exec_cb", this); + use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); // for each pointer from poll_cb + auto poll_cb = [this] { + EngineShard::tlocal()->PollExecution("exec_cb", this); DVLOG(3) << "ptr_release " << DebugId(); intrusive_ptr_release(this); // against use_count_.fetch_add above. }; - - // IsArmedInShard is the protector of non-thread safe data. - IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); }); + IterateActiveShards([&poll_cb](PerShardData& sd, auto i) { shard_set->Add(i, poll_cb); }); } void Transaction::Conclude() { @@ -990,8 +974,7 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { void Transaction::ExpireBlocking(WaitKeysProvider wcb) { DCHECK(!IsGlobal()); DVLOG(1) << "ExpireBlocking " << DebugId(); - - run_count_.store(unique_shard_cnt_, memory_order_release); + run_barrier_.Start(unique_shard_cnt_); auto expire_cb = [this, &wcb] { EngineShard* es = EngineShard::tlocal(); @@ -999,7 +982,7 @@ void Transaction::ExpireBlocking(WaitKeysProvider wcb) { }; IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); }); - WaitForShardCallbacks(); + run_barrier_.Wait(); DVLOG(1) << "ExpireBlocking finished " << DebugId(); } @@ -1302,8 +1285,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { // Resume processing of transaction queue shard->PollExecution("unwatchcb", nullptr); - - DecreaseRunCnt(); + run_barrier_.Dec(this); } OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { @@ -1363,28 +1345,6 @@ void Transaction::UnlockMultiShardCb(absl::Span sharded_ shard->blocking_controller()->NotifyPending(); shard->PollExecution("unlockmulti", nullptr); - - this->DecreaseRunCnt(); -} - -void Transaction::DecreaseRunCnt() { - // to protect against cases where Transaction is destroyed before run_ec_.notify - // finishes running. We can not put it inside the (res == 1) block because then it's too late. - ::boost::intrusive_ptr guard(this); - - // We use release so that no stores will be reordered after. - // It's needed because we need to enforce that all stores executed before this point - // are visible right after run_count_ is unblocked in the coordinator thread. - // The fact that run_ec_.notify() does release operation is not enough, because - // WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0. - uint32_t res = run_count_.fetch_sub(1, memory_order_release); - - CHECK_GE(res, 1u) << unique_shard_cnt_ << " " << unique_shard_id_ << " " << cid_->name() << " " - << use_count_.load(memory_order_relaxed) << " " << uint32_t(coordinator_state_); - - if (res == 1) { - run_ec_.notify(); - } } bool Transaction::IsGlobal() const { diff --git a/src/server/transaction.h b/src/server/transaction.h index 88fa8787e..a5dca3c2d 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -255,17 +255,14 @@ class Transaction { // Returns true if the transaction spans this shard_id. bool IsActive(ShardId shard_id) const; - //! Returns true if the transaction is armed for execution on this sid (used to avoid - //! duplicate runs). Supports local transactions under multi as well. - //! Can be used in contexts that wait for an event to happen. + // Returns true if the transaction is waiting for shard callbacks and the shard is armed. + // Safe to read transaction state (and update shard local) until following RunInShard() finishes. bool IsArmedInShard(ShardId sid) const { - // For multi transactions shard_data_ spans all shards. - if (sid >= shard_data_.size()) + if (sid >= shard_data_.size()) // For multi transactions shard_data_ spans all shards. sid = 0; - // We use acquire so that no reordering will move before this load. - return run_count_.load(std::memory_order_acquire) > 0 && - shard_data_[sid].is_armed.load(std::memory_order_relaxed); + // Barrier has acquire semantics + return run_barrier_.Active() && shard_data_[sid].is_armed.load(std::memory_order_relaxed); } // Called from engine set shard threads. @@ -419,6 +416,7 @@ class Transaction { COORD_CANCELLED = 1 << 3, }; + // Auxiliary structure used during initialization struct PerShardCache { std::vector args; std::vector original_index; @@ -429,6 +427,22 @@ class Transaction { } }; + // Barrier akin to helio's BlockingCounter, but with proper acquire semantics + // for polling work from other threads (active, inactive phases). And without heap allocation. + class PhasedBarrier { + public: + void Start(uint32_t count); // Release: Store count + void Wait(); // Acquire: Wait until count = 0 + + bool Active() const; // Acquire: Return if count > 0. Use for polling for work + void Dec(Transaction* keep_alive); // Release: Decrease count, notify ec on count = 0 + + uint32_t DEBUG_Count() const; // Get current counter value + private: + std::atomic_uint32_t count_{0}; + EventCount ec_{}; + }; + private: // Init basic fields and reset re-usable. void InitBase(DbIndex dbid, CmdArgList args); @@ -496,24 +510,10 @@ class Transaction { // synchronize the multi-shard transaction. uint32_t CalcMultiNumOfShardJournals() const; - void WaitForShardCallbacks() { - run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); - - // no reads after this fence will be reordered before it, and if a store operation sequenced - // before some release operation that happened before the fence in another thread, this store - // will be visible after the fence. - // In this specific case we synchronize with DecreaseRunCnt that releases run_count_. - // See #997 before changing it. - std::atomic_thread_fence(std::memory_order_acquire); - } - // Log command in shard's journal, if this is a write command with auto-journaling enabled. - // Should be called immediately after the last phase (hop). + // Should be called immediately after the last hop. void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); - // Returns the previous value of run count. - void DecreaseRunCnt(); - uint32_t GetUseCount() const { return use_count_.load(std::memory_order_relaxed); } @@ -548,17 +548,20 @@ class Transaction { } private: - // shard_data spans all the shards in ess_. - // I wish we could use a dense array of size [0..uniq_shards] but since - // multiple threads access this array to synchronize between themselves using - // PerShardData.state, it can be tricky. The complication comes from multi_ transactions where - // scheduled transaction is accessed between operations as well. + // Main synchronization point for dispatching hop callbacks and waiting for them to finish. + // After scheduling, sequential hops are executed as follows: + // coordinator: Prepare hop, then Start(num_shards), dispatch poll jobs and Wait() + // tx queue: Once IsArmedInShard() /* checks Active() */ -> run in shard and Dec() + // As long as barrier is active, any writes by the coordinator are prohibited, so shard threads + // can safely read transaction state and modify per-shard state belonging to them. + // Inter-thread synchronization is provided by the barriers acquire/release pairs. + PhasedBarrier run_barrier_; - // Stores per-shard data. - // For non-multi transactions, it can be of size one in case only one shard is active - // (unique_shard_cnt_ = 1). - // Never access directly with index, always use SidToId. - absl::InlinedVector shard_data_; // length = shard_count + // Stores per-shard data: state flags and keys. Index only with SidToId(shard index)! + // Theoretically, same size as number of shards, but contains only a single element for + // single shard non-multi transactions (optimization). + // TODO: explore dense packing + absl::InlinedVector shard_data_; // Stores keys/values of the transaction partitioned by shards. // We need values as well since we reorder keys, and we need to know what value corresponds @@ -583,21 +586,16 @@ class Transaction { DbIndex db_index_{0}; uint64_t time_now_ms_{0}; - std::atomic_uint32_t wakeup_requested_{0}; // whether tx was woken up - std::atomic_uint32_t use_count_{0}, run_count_{0}; + std::atomic_uint32_t use_count_{0}; // transaction exists only as an intrusive_ptr - // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // Number of unique shards active ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 UniqueSlotChecker unique_slot_checker_; - EventCount blocking_ec_; // Used to wake blocking transactions. - EventCount run_ec_; // Used to wait for shard callbacks + std::atomic_uint32_t wakeup_requested_{0}; // incremented when blocking transaction gets notified + EventCount blocking_ec_; // to wait for wakeup_requested > 0 (or cancelled) // Transaction coordinator state, written and read by coordinator thread. - // Can be read by shard threads as long as we respect ordering rules, i.e. when - // they read this variable the coordinator thread is stalled and can not cause data races. - // If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX. uint8_t coordinator_state_ = 0; // Result of callbacks. Usually written by single shard only, lock below for multishard oom error