diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 945843a30..eff17ce1e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -82,6 +82,14 @@ void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inl ss->stats.tx_width_freq_arr[0]++; } +std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) { + using namespace chrono; + if (tp == Transaction::time_point::max()) + return os << "inf"; + size_t ms = duration_cast(tp - Transaction::time_point::clock::now()).count(); + return os << ms << "ms"; +} + } // namespace IntentLock::Mode Transaction::LockMode() const { @@ -116,6 +124,41 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const { return count_.load(memory_order_relaxed); } +bool Transaction::BatonBarrierrier::IsClaimed() const { + return claimed_.load(memory_order_relaxed); +} + +bool Transaction::BatonBarrierrier::TryClaim() { + return !claimed_.exchange(true, memory_order_relaxed); // false means first means success +} + +void Transaction::BatonBarrierrier::Close() { + DCHECK(claimed_.load(memory_order_relaxed)); + closed_.store(true, memory_order_relaxed); + ec_.notify(); // release +} + +cv_status Transaction::BatonBarrierrier::Wait(time_point tp) { + auto cb = [this] { return closed_.load(memory_order_acquire); }; + + if (tp != time_point::max()) { + // Wait until timepoint and return immediately if we finished without a timeout + if (ec_.await_until(cb, tp) == cv_status::no_timeout) + return cv_status::no_timeout; + + // We timed out and claimed the barrier, so no one will be able to claim it anymore + if (TryClaim()) { + closed_.store(true, memory_order_relaxed); // Purely formal + return cv_status::timeout; + } + + // fallthrough: otherwise a modification is in progress, wait for it below + } + + ec_.await(cb); + return cv_status::no_timeout; +} + /** * @brief Construct a new Transaction:: Transaction object * @@ -1205,42 +1248,24 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, KeyReadyChecker krc) { - DVLOG(2) << "WaitOnWatch " << DebugId(); - using namespace chrono; + DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used + // Register keys on active shards blocking controllers and mark shard state as suspended. auto cb = [&](Transaction* t, EngineShard* shard) { auto keys = wkeys_provider(t, shard); return t->WatchInShard(keys, shard, krc); }; - Execute(std::move(cb), true); - coordinator_state_ |= COORD_BLOCKED; - - auto wake_cb = [this] { - return (coordinator_state_ & COORD_CANCELLED) || - wakeup_requested_.load(memory_order_relaxed) > 0; - }; - auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; + DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); - if (DCHECK_IS_ON()) { - int64_t ms = -1; - if (tp != time_point::max()) - ms = duration_cast(tp - time_point::clock::now()).count(); - DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId(); - } - - cv_status status = cv_status::no_timeout; - if (tp == time_point::max()) { - blocking_ec_.await(std::move(wake_cb)); - } else { - status = blocking_ec_.await_until(std::move(wake_cb), tp); - } + // Wait for the blocking barrier to be closed. + // Note: It might return immediately if another thread already notified us. + cv_status status = blocking_barrier_.Wait(tp); DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); - --stats->num_blocked_clients; OpStatus result = OpStatus::OK; @@ -1250,38 +1275,32 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p result = local_result_; } + // If we don't follow up with an "action" hop, we must clean up manually on all shards. if (result != OpStatus::OK) ExpireBlocking(wkeys_provider); - coordinator_state_ &= ~COORD_BLOCKED; return result; } -// Runs only in the shard thread. OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard, KeyReadyChecker krc) { - ShardId idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[SidToId(shard->shard_id())]; - auto& sd = shard_data_[idx]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); - - auto* bc = shard->EnsureBlockingController(); - bc->AddWatched(keys, std::move(krc), this); - sd.local_mask |= SUSPENDED_Q; sd.local_mask &= ~OUT_OF_ORDER; - DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask - << ", first_key:" << keys.front(); + + shard->EnsureBlockingController()->AddWatched(keys, std::move(krc), this); + DVLOG(2) << "WatchInShard " << DebugId() << ", first_key:" << keys.front(); return OpStatus::OK; } void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { + // Blocking transactions don't release keys when suspending, release them now. auto lock_args = GetLockArgs(shard->shard_id()); shard->db_slice().Release(LockMode(), lock_args); - unsigned sd_idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[sd_idx]; - sd.local_mask |= EXPIRED_Q; + auto& sd = shard_data_[SidToId(shard->shard_id())]; sd.local_mask &= ~KEYLOCK_ACQUIRED; shard->blocking_controller()->FinalizeWatched(wkeys, this); @@ -1361,40 +1380,32 @@ bool Transaction::IsGlobal() const { // Returns true if the transacton has changed its state from suspended to awakened, // false, otherwise. bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) { - unsigned idx = SidToId(sid); - auto& sd = shard_data_[idx]; - unsigned local_mask = sd.local_mask; - - if (local_mask & Transaction::EXPIRED_Q) { - return false; - } - // Wake a transaction only once on the first notify. // We don't care about preserving the strict order with multiple operations running on blocking // keys in parallel, because the internal order is not observable from outside either way. - if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0) + if (!blocking_barrier_.TryClaim()) return false; - DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id " - << committed_txid; + auto& sd = shard_data_[SidToId(sid)]; - // local_mask could be awaked (i.e. not suspended) if the transaction has been - // awakened by another key or awakened by the same key multiple times. - if (local_mask & SUSPENDED_Q) { - DCHECK_EQ(0u, local_mask & AWAKED_Q); + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << sd.local_mask + << " by commited_id " << committed_txid; - sd.local_mask &= ~SUSPENDED_Q; - sd.local_mask |= AWAKED_Q; + // We're the first and only to wake this transaction, expect the shard to be suspended + CHECK(sd.local_mask & SUSPENDED_Q); + CHECK_EQ(sd.local_mask & AWAKED_Q, 0); - // Find index of awakened key. - auto args = GetShardArgs(sid); - auto it = - find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; }); - DCHECK(it != args.end()); - sd.wake_key_pos = it - args.begin(); - } + // Find index of awakened key + auto args = GetShardArgs(sid); + auto it = find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; }); + CHECK(it != args.end()); - blocking_ec_.notify(); + // Change state to awaked and store index of awakened key + sd.local_mask &= ~SUSPENDED_Q; + sd.local_mask |= AWAKED_Q; + sd.wake_key_pos = it - args.begin(); + + blocking_barrier_.Close(); return true; } @@ -1474,7 +1485,10 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt } void Transaction::CancelBlocking(std::function status_cb) { - if ((coordinator_state_ & COORD_BLOCKED) == 0) + // We're on the owning thread of this transaction, so we can safely access it's data below. + // We still need to claim the blocking barrier, but as this function is often called blindly, we + // want to check first if it makes sense to even proceed. + if (blocking_barrier_.IsClaimed()) return; OpStatus status = OpStatus::CANCELLED; @@ -1490,9 +1504,13 @@ void Transaction::CancelBlocking(std::function status_cb) { if (status == OpStatus::OK) return; + // Check if someone else is about to wake us up + if (!blocking_barrier_.TryClaim()) + return; + coordinator_state_ |= COORD_CANCELLED; local_result_ = status; - blocking_ec_.notify(); + blocking_barrier_.Close(); } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { diff --git a/src/server/transaction.h b/src/server/transaction.h index eba2f9f2e..15710d1c7 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -162,8 +162,7 @@ class Transaction { KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard()) AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) - EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped - UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb + UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb }; public: @@ -412,8 +411,7 @@ class Transaction { enum CoordinatorState : uint8_t { COORD_SCHED = 1, COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction - COORD_BLOCKED = 1 << 2, - COORD_CANCELLED = 1 << 3, + COORD_CANCELLED = 1 << 2, }; // Auxiliary structure used during initialization @@ -443,6 +441,25 @@ class Transaction { EventCount ec_{}; }; + // "Single claim - single modification" barrier. Multiple threads might try to claim it, only one + // will succeed and will be allowed to modify the guarded object until it closes the barrier. + // A closed barrier can't be claimed again or re-used in any way. + class BatonBarrierrier { + public: + bool IsClaimed() const; // Return if barrier is claimed, only for peeking + bool TryClaim(); // Return if the barrier was claimed successfully + void Close(); // Close barrier after it was claimed + + // Wait for barrier until time_point, or indefinitely if time_point::max() was passed. + // After Wait returns, the barrier is guaranteed to be closed, including expiration. + std::cv_status Wait(time_point); + + private: + std::atomic_bool claimed_{false}; + std::atomic_bool closed_{false}; + EventCount ec_{}; + }; + private: // Init basic fields and reset re-usable. void InitBase(DbIndex dbid, CmdArgList args); @@ -484,6 +501,7 @@ class Transaction { // Optimized version of RunInShard for single shard uncontended cases. RunnableResult RunQuickie(EngineShard* shard); + // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void ExecuteAsync(); // Adds itself to watched queue in the shard. Must run in that shard thread. @@ -592,8 +610,8 @@ class Transaction { ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 UniqueSlotChecker unique_slot_checker_; - std::atomic_uint32_t wakeup_requested_{0}; // incremented when blocking transaction gets notified - EventCount blocking_ec_; // to wait for wakeup_requested > 0 (or cancelled) + // Barrier for waking blocking transactions that ensures exclusivity of waking operation. + BatonBarrierrier blocking_barrier_{}; // Transaction coordinator state, written and read by coordinator thread. uint8_t coordinator_state_ = 0;