From 9c6e6a96b78473c80c2f40a8b9bbfa7c2a285f51 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 14 Mar 2024 17:40:32 +0300 Subject: [PATCH] fix(transaction): Replace with armed sync point (#2708) 1. Replaces run_barrier as a synchronization point with is_armed + an embedded blocking counter for awaiting running jobs 2. Replaces IsArmedInShard + GetLocalMask + is_armed.exchange chain with a single DisarmInShard() / DisarmInShardWhen Signed-off-by: Vladislav Oleshko --- helio | 2 +- src/server/blocking_controller.cc | 4 +- src/server/engine_shard_set.cc | 54 +++++++++-------- src/server/main_service.cc | 2 +- src/server/transaction.cc | 97 ++++++++++++------------------- src/server/transaction.h | 72 +++++++++++------------ 6 files changed, 103 insertions(+), 128 deletions(-) diff --git a/helio b/helio index 4829d23bc..b3ed89d13 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 4829d23bca125186e0d34faebf294a7579acb398 +Subproject commit b3ed89d13ea3b7095e3a48915ec9c8cd01223039 diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 5a4ef6099..7f1584adf 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -123,7 +123,7 @@ void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { VLOG(1) << "FinalizeBlocking [" << owner_->shard_id() << "]" << tx->DebugId(); bool removed = awakened_transactions_.erase(tx); - DCHECK(!removed || (tx->GetLocalMask(owner_->shard_id()) & Transaction::AWAKED_Q)); + DCHECK(!removed || (tx->DEBUG_GetLocalMask(owner_->shard_id()) & Transaction::AWAKED_Q)); auto dbit = watched_dbs_.find(tx->GetDbIndex()); @@ -138,7 +138,7 @@ void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { for (string_view key : args) { bool removed_awakened = wt.UnwatchTx(key, tx); CHECK(!removed_awakened || removed) - << tx->DebugId() << " " << key << " " << tx->GetLocalMask(owner_->shard_id()); + << tx->DebugId() << " " << key << " " << tx->DEBUG_GetLocalMask(owner_->shard_id()); } if (wt.queue_map.empty()) { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 6aaa5ca7c..ed0a02440 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -448,18 +448,24 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { ShardId sid = shard_id(); stats_.poll_execution_total++; - // Check if the caller was handled by a previous poll. - if (trans && !trans->IsArmedInShard(sid)) + // If any of the following flags are present, we are guaranteed to run in this function: + // 1. AWAKED_Q -> Blocking transactions are executed immediately after waking up, they don't + // occupy a place in txq and have highest priority + // 2. SUSPENDED_Q -> Suspended shards are run to clean up and finalize blocking keys + // 3. OUT_OF_ORDER -> Transactions without conflicting keys can run earlier than their position in + // txq is reached + uint16_t flags = Transaction::AWAKED_Q | Transaction::SUSPENDED_Q | Transaction::OUT_OF_ORDER; + auto [trans_mask, disarmed] = + trans ? trans->DisarmInShardWhen(sid, flags) : make_pair(uint16_t(0), false); + + if (trans && trans_mask == 0) // If not armed, it means that this poll task expired return; - auto local_mask = trans ? trans->GetLocalMask(sid) : 0; // safe only when trans is armed - - // Blocked transactions are executed immediately after waking up - if (local_mask & Transaction::AWAKED_Q) { + if (trans_mask & Transaction::AWAKED_Q) { CHECK(continuation_trans_ == nullptr || continuation_trans_ == trans) << continuation_trans_->DebugId() << " when polling " << trans->DebugId() - << "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " - << trans->GetLocalMask(sid); + << "cont_mask: " << continuation_trans_->DEBUG_GetLocalMask(sid) << " vs " + << trans->DEBUG_GetLocalMask(sid); // Commands like BRPOPLPUSH don't conclude immediately if (trans->RunInShard(this, false)) { @@ -481,10 +487,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { // Check the currently running transaction, we have to handle it first until it concludes if (continuation_trans_) { - if (trans == continuation_trans_) + bool is_self = continuation_trans_ == trans; + if (is_self) trans = nullptr; - if (continuation_trans_->IsArmedInShard(sid)) { + if ((is_self && disarmed) || continuation_trans_->DisarmInShard(sid)) { if (bool keep = run(continuation_trans_, false); !keep) { // if this holds, we can remove this check altogether. DCHECK(continuation_trans_ == nullptr); @@ -503,10 +510,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { head = get(txq_.Front()); - VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << head->IsArmedInShard(sid); + VLOG(2) << "Considering head " << head->DebugId() + << " isarmed: " << head->DEBUG_IsArmedInShard(sid); // If the transaction isn't armed yet, it will be handled by a successive poll - if (!head->IsArmedInShard(sid)) + bool should_run = (head == trans && disarmed) || head->DisarmInShard(sid); + if (!should_run) break; // Avoid processing the caller transaction below if we found it in the queue, @@ -525,23 +534,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { continuation_trans_ = head; } - // Either the poll had no caller or it was handled above - if (trans == nullptr) - return; - - // If the pointer is valid, we didn't handle it above, so trans is still armed - DCHECK(trans->IsArmedInShard(sid)); - - // OOO means no transaction before us in the txq accesses our keys, so we can run earlier - bool is_ooo = local_mask & Transaction::OUT_OF_ORDER; - - // Still suspended shards need to run just to finalize and unregister, so we can run anytime - bool is_suspended = local_mask & Transaction::SUSPENDED_Q; - DCHECK_EQ(local_mask & Transaction::AWAKED_Q, 0); - - if (is_ooo || is_suspended) { + // If we disarmed, but didn't find ourselves in the loop, run now. + if (trans && disarmed) { DCHECK(trans != head); + DCHECK(trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q)); + bool is_ooo = trans_mask & Transaction::OUT_OF_ORDER; bool keep = run(trans, is_ooo); if (is_ooo && !keep) { stats_.tx_ooo_total++; @@ -728,7 +726,7 @@ auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo { max_db_id = trx->GetDbIndex(); } - bool is_armed = trx->IsArmedInShard(sid); + bool is_armed = trx->DEBUG_IsArmedInShard(sid); DVLOG(1) << "Inspecting " << trx->DebugId() << " is_armed " << is_armed; if (is_armed) { info.tx_armed++; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 9ed4840b5..df05c8c11 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -520,7 +520,7 @@ void TxTable(const http::QueryArgs& args, HttpContext* send) { Transaction* trx = std::get(value); absl::AlphaNum an2(trx->txid()); - absl::AlphaNum an3(trx->IsArmedInShard(sid)); + absl::AlphaNum an3(trx->DEBUG_IsArmedInShard(sid)); SortedTable::Row({sid_an.Piece(), tid.Piece(), an2.Piece(), an3.Piece()}, &mine); cur = queue->Next(cur); } while (cur != queue->Head()); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 4d1c7765b..57274438d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -53,7 +53,7 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) { const Transaction* cont_tx = shard->GetContTx(); if (cont_tx) { absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ", - cont_tx->IsArmedInShard(shard->shard_id()) ? " armed" : ""); + cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : ""); } LOG(WARNING) << msg; @@ -97,34 +97,6 @@ uint16_t trans_id(const Transaction* ptr) { } // namespace -void Transaction::PhasedBarrier::Start(uint32_t count) { - DCHECK_EQ(DEBUG_Count(), 0u); - count_.store(count, memory_order_release); -} - -bool Transaction::PhasedBarrier::Active() const { - return count_.load(memory_order_acquire) > 0; -} - -void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) { - // Prevent transaction from being destroyed after count was decreased and Wait() unlocked, - // but before this thread finished notifying. - ::boost::intrusive_ptr guard(keep_alive); - - uint32_t before = count_.fetch_sub(1); - CHECK_GE(before, 1u) << keep_alive->DEBUG_PrintFailState(EngineShard::tlocal()->shard_id()); - if (before == 1) - ec_.notify(); -} - -void Transaction::PhasedBarrier::Wait() { - ec_.await([this] { return count_.load(memory_order_acquire) == 0; }); -} - -uint32_t Transaction::PhasedBarrier::DEBUG_Count() const { - return count_.load(memory_order_relaxed); -} - bool Transaction::BatonBarrier::IsClaimed() const { return claimed_.load(memory_order_relaxed); } @@ -401,7 +373,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { for (const auto& sd : shard_data_) { // sd.local_mask may be non-zero for multi transactions with instant locking. // Specifically EVALs may maintain state between calls. - DCHECK_EQ(sd.local_mask & ARMED, 0); + DCHECK(!sd.is_armed.load(memory_order_relaxed)); if (!multi_) { DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); } @@ -529,7 +501,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(IsAtomicMulti()); // Every command determines it's own active shards sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED } - DCHECK_EQ(sd.local_mask & ARMED, 0); + DCHECK(!sd.is_armed.load(memory_order_relaxed)); } if (multi_->mode == NON_ATOMIC) { @@ -588,16 +560,12 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA // Runs in the dbslice thread. Returns true if the transaction continues running in the thread. bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { - DCHECK(run_barrier_.Active()); DCHECK_GT(txid_, 0u); CHECK(cb_ptr_) << DebugId(); unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id()); - sd.local_mask &= ~ARMED; - sd.stats.total_runs++; DCHECK_GT(run_barrier_.DEBUG_Count(), 0u); @@ -720,7 +688,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { } } - run_barrier_.Dec(this); // From this point on we can not access 'this'. + FinishHop(); // From this point on we can not access 'this'. return !is_concluding; } @@ -824,10 +792,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK(shard_data_.size() == 1 || multi_); InitTxTime(); - shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; - // Start new phase, be careful with writes until phase end! run_barrier_.Start(1); + shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_release); auto schedule_cb = [this, &was_ooo] { bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); @@ -835,7 +802,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // We didn't decrease the barrier, so the scope is valid UNTIL Dec() below DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u); was_ooo = true; - run_barrier_.Dec(this); + FinishHop(); } // Otherwise it's not safe to access the function scope, as // ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below. @@ -961,18 +928,19 @@ void Transaction::ExecuteAsync() { DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); DCHECK_LE(shard_data_.size(), 1024u); - // Set armed flags on all active shards. Copy indices for dispatching poll tasks, - // because local_mask can be written concurrently after starting a new phase. + // Hops can start executing immediately after being armed, so we + // initialize the run barrier before arming, as well as copy indices + // of active shards to avoid reading concurrently accessed shard data. std::bitset<1024> poll_flags(0); + run_barrier_.Start(unique_shard_cnt_); + + // Set armed flags on all active shards. + std::atomic_thread_fence(memory_order_release); // once to avoid flushing poll_flags in loop IterateActiveShards([&poll_flags](auto& sd, auto i) { - sd.local_mask |= ARMED; + sd.is_armed.store(true, memory_order_relaxed); poll_flags.set(i, true); }); - // Start new phase: release semantics. From here we can be discovered by IsArmedInShard(), - // and thus picked by a foreign thread's PollExecution(). Careful with data access! - run_barrier_.Start(unique_shard_cnt_); - auto* ss = ServerState::tlocal(); if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { @@ -994,6 +962,11 @@ void Transaction::ExecuteAsync() { }); } +void Transaction::FinishHop() { + boost::intrusive_ptr guard(this); // Keep alive until Dec() fully finishes + run_barrier_.Dec(); +} + void Transaction::Conclude() { if (!IsScheduled()) return; @@ -1062,9 +1035,6 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id(); DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id(); - CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id()); - sd.local_mask &= ~ARMED; - sd.stats.total_runs++; // Calling the callback in somewhat safe way @@ -1126,9 +1096,21 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { return res; } -bool Transaction::IsArmedInShard(ShardId sid) const { - // Barrier has acquire semantics - return run_barrier_.Active() && (shard_data_[SidToId(sid)].local_mask & ARMED); +uint16_t Transaction::DisarmInShard(ShardId sid) { + auto& sd = shard_data_[SidToId(sid)]; + // NOTE: Maybe compare_exchange is worth it to avoid redundant writes + return sd.is_armed.exchange(false, memory_order_acquire) ? sd.local_mask : 0; +} + +pair Transaction::DisarmInShardWhen(ShardId sid, uint16_t relevant_flags) { + auto& sd = shard_data_[SidToId(sid)]; + if (sd.is_armed.load(memory_order_acquire)) { + bool relevant = sd.local_mask & relevant_flags; + if (relevant) + CHECK(sd.is_armed.exchange(false, memory_order_release)); + return {sd.local_mask, relevant}; + } + return {0, false}; } bool Transaction::IsActive(ShardId sid) const { @@ -1143,12 +1125,6 @@ bool Transaction::IsActive(ShardId sid) const { return shard_data_[SidToId(sid)].local_mask & ACTIVE; } -uint16_t Transaction::GetLocalMask(ShardId sid) const { - DCHECK(IsActive(sid)); - DCHECK_GT(run_barrier_.DEBUG_Count(), 0u); - return shard_data_[SidToId(sid)].local_mask; -} - IntentLock::Mode Transaction::LockMode() const { return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } @@ -1185,12 +1161,13 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // Fast path. If none of the keys are locked, we can run briefly atomically on the thread // without acquiring them at all. if (quick_run) { + CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); auto result = RunQuickie(shard); local_result_ = result.status; if (result.flags & RunnableResult::AVOID_CONCLUDING) { // If we want to run again, we have to actually schedule this transaction - DCHECK_EQ(sd.local_mask & ARMED, 0); + DCHECK(!sd.is_armed.load(memory_order_relaxed)); continue_scheduling = true; } else { LogAutoJournalOnShard(shard, result); @@ -1400,7 +1377,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { // Resume processing of transaction queue shard->PollExecution("unwatchcb", nullptr); - run_barrier_.Dec(this); + FinishHop(); } OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 267d76224..f7114692a 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -10,6 +10,7 @@ #include #include +#include #include #include #include @@ -21,6 +22,7 @@ #include "server/common.h" #include "server/journal/types.h" #include "server/table.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -156,8 +158,8 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { - ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) - ARMED = 1 << 1, // Whether its armed (the hop was prepared) + ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) + // ARMED = 1 << 1, // Whether its armed (the hop was prepared) // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set OUT_OF_ORDER = 1 << 2, // Whether its key locks are acquired, never set for global commands. @@ -254,16 +256,17 @@ class Transaction { // Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; - // Returns true if the transaction is waiting for shard callbacks and the shard is armed. - // Safe to read transaction state (and update shard local) until following RunInShard() finishes. - bool IsArmedInShard(ShardId sid) const; + // If the transaction is armed, disarm it and return the local mask (ACTIVE is always set). + // Otherwise 0 is returned. Sync point (acquire). + uint16_t DisarmInShard(ShardId sid); + + // Same as DisarmInShard, but the transaction is only disarmed if any of the req_flags is present. + // If the transaction is armed, returns the local mask and a flag whether it was disarmed. + std::pair DisarmInShardWhen(ShardId sid, uint16_t req_flags); // Returns if the transaction spans this shard. Safe only when the transaction is armed. bool IsActive(ShardId sid) const; - // Returns the state mask on this shard. Safe only when the transaction is armed (or blocked). - uint16_t GetLocalMask(ShardId sid) const; - // If blocking tx was woken up on this shard, get wake key. std::optional GetWakeKey(ShardId sid) const; @@ -356,6 +359,14 @@ class Transaction { return shard_data_[SidToId(sid)].pq_pos; } + bool DEBUG_IsArmedInShard(ShardId sid) const { + return shard_data_[SidToId(sid)].is_armed.load(std::memory_order_relaxed); + } + + uint16_t DEBUG_GetLocalMask(ShardId sid) const { + return shard_data_[SidToId(sid)].local_mask; + } + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { @@ -372,15 +383,23 @@ class Transaction { }; struct alignas(64) PerShardData { + PerShardData() { + } + PerShardData(PerShardData&& other) noexcept { + } + + // State of shard - bitmask with LocalState flags + uint16_t local_mask = 0; + + // Set when the shard is prepared for another hop. Sync point. Cleared when execution starts. + std::atomic_bool is_armed = false; + uint32_t arg_start = 0; // Subspan in kv_args_ with local arguments. uint32_t arg_count = 0; // Position in the tx queue. OOO or cancelled schedules remove themselves by this index. TxQueue::Iterator pq_pos = TxQueue::kEnd; - // State of shard - bitmask with LocalState flags - uint16_t local_mask = 0; - // Index of key relative to args in shard that the shard was woken up after blocking wait. uint16_t wake_key_pos = UINT16_MAX; @@ -390,7 +409,7 @@ class Transaction { } stats; // Prevent "false sharing" between cache lines: occupy a full cache line (64 bytes) - char pad[64 - 4 * sizeof(uint32_t) - sizeof(Stats)]; + char pad[64 - 5 * sizeof(uint32_t) - sizeof(Stats)]; }; static_assert(sizeof(PerShardData) == 64); // cacheline @@ -435,22 +454,6 @@ class Transaction { } }; - // Barrier akin to helio's BlockingCounter, but with proper acquire semantics - // for polling work from other threads (active, inactive phases). And without heap allocation. - class PhasedBarrier { - public: - void Start(uint32_t count); // Release: Store count - void Wait(); // Acquire: Wait until count = 0 - - bool Active() const; // Acquire: Return if count > 0. Use for polling for work - void Dec(Transaction* keep_alive); // Release: Decrease count, notify ec on count = 0 - - uint32_t DEBUG_Count() const; // Get current counter value - private: - std::atomic_uint32_t count_{0}; - util::fb2::EventCount ec_{}; - }; - // "Single claim - single modification" barrier. Multiple threads might try to claim it, only one // will succeed and will be allowed to modify the guarded object until it closes the barrier. // A closed barrier can't be claimed again or re-used in any way. @@ -514,6 +517,9 @@ class Transaction { // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void ExecuteAsync(); + // Finish hop, decrement run barrier + void FinishHop(); + // Adds itself to watched queue in the shard. Must run in that shard thread. OpStatus WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc); @@ -579,14 +585,8 @@ class Transaction { } private: - // Main synchronization point for dispatching hop callbacks and waiting for them to finish. - // After scheduling, sequential hops are executed as follows: - // coordinator: Prepare hop, then Start(num_shards), dispatch poll jobs and Wait() - // tx queue: Once IsArmedInShard() /* checks Active() */ -> run in shard and Dec() - // As long as barrier is active, any writes by the coordinator are prohibited, so shard threads - // can safely read transaction state and modify per-shard state belonging to them. - // Inter-thread synchronization is provided by the barriers acquire/release pairs. - PhasedBarrier run_barrier_; + // Used for waiting for all hop callbacks to run. + util::fb2::EmbeddedBlockingCounter run_barrier_{0}; // Stores per-shard data: state flags and keys. Index only with SidToId(shard index)! // Theoretically, same size as number of shards, but contains only a single element for