From 963023f07c75919f549b3e1b71ebe736fb818042 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 11 Feb 2024 12:06:36 +0300 Subject: [PATCH] chore(transaction): Simplify armed state (#2508) * chore(transaction): Simplify armed state Remove atomic is_armed variable and turn it into a regular local state flag. This is now possible because we have clearly defined phases with the phased barrier and baton barrier for blocking commands --------- Signed-off-by: Vladislav --- src/server/blocking_controller_test.cc | 12 +++--- src/server/test_utils.cc | 4 +- src/server/transaction.cc | 50 +++++++++++++++--------- src/server/transaction.h | 54 +++++++++----------------- 4 files changed, 58 insertions(+), 62 deletions(-) diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 52bf62cdf..b68843759 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -56,7 +56,7 @@ void BlockingControllerTest::SetUp() { trans_.reset(new Transaction{&cid_}); - str_vec_.assign({"blpop", "x", "z", "0"}); + str_vec_.assign({"x", "z", "0"}); for (auto& s : str_vec_) { arg_vec_.emplace_back(s); } @@ -78,16 +78,16 @@ void BlockingControllerTest::TearDown() { } TEST_F(BlockingControllerTest, Basic) { - shard_set->Await(0, [&] { - EngineShard* shard = EngineShard::tlocal(); + trans_->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) { BlockingController bc(shard); - auto keys = trans_->GetShardArgs(shard->shard_id()); + auto keys = t->GetShardArgs(shard->shard_id()); bc.AddWatched( - keys, [](auto...) { return true; }, trans_.get()); + keys, [](auto...) { return true; }, t); EXPECT_EQ(1, bc.NumWatched(0)); - bc.FinalizeWatched(keys, trans_.get()); + bc.FinalizeWatched(keys, t); EXPECT_EQ(0, bc.NumWatched(0)); + return OpStatus::OK; }); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 3bb207a7d..4951c7f4d 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -247,9 +247,7 @@ void BaseFamilyTest::ResetService() { auto it = head; do { Transaction* trans = std::get(es->txq()->At(it)); - LOG(ERROR) << "Transaction " << trans->DebugId() << " " - << trans->GetLocalMask(es->shard_id()) << " " - << trans->IsArmedInShard(es->shard_id()); + LOG(ERROR) << "Transaction " << trans->DebugId(es->shard_id()); it = txq->Next(it); } while (it != head); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0bdb82474..ff83b5786 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -124,21 +124,21 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const { return count_.load(memory_order_relaxed); } -bool Transaction::BatonBarrierrier::IsClaimed() const { +bool Transaction::BatonBarrier::IsClaimed() const { return claimed_.load(memory_order_relaxed); } -bool Transaction::BatonBarrierrier::TryClaim() { +bool Transaction::BatonBarrier::TryClaim() { return !claimed_.exchange(true, memory_order_relaxed); // false means first means success } -void Transaction::BatonBarrierrier::Close() { +void Transaction::BatonBarrier::Close() { DCHECK(claimed_.load(memory_order_relaxed)); closed_.store(true, memory_order_relaxed); ec_.notify(); // release } -cv_status Transaction::BatonBarrierrier::Wait(time_point tp) { +cv_status Transaction::BatonBarrier::Wait(time_point tp) { auto cb = [this] { return closed_.load(memory_order_acquire); }; if (tp != time_point::max()) { @@ -407,7 +407,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { for (const auto& sd : shard_data_) { // sd.local_mask may be non-zero for multi transactions with instant locking. // Specifically EVALs may maintain state between calls. - DCHECK(!sd.is_armed.load(std::memory_order_relaxed)); + DCHECK_EQ(sd.local_mask & ARMED, 0); if (!multi_) { DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); } @@ -533,7 +533,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(IsAtomicMulti()); // Every command determines it's own active shards sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED } - DCHECK(!sd.is_armed.load(memory_order_relaxed)); + DCHECK_EQ(sd.local_mask & ARMED, 0); } if (multi_->mode == NON_ATOMIC) { @@ -547,13 +547,18 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { multi_->role = DEFAULT; } -string Transaction::DebugId() const { +string Transaction::DebugId(std::optional sid) const { DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_); if (multi_) { absl::StrAppend(&res, ":", multi_->cmd_seq_num); } - absl::StrAppend(&res, " (", trans_id(this), ")"); + absl::StrAppend(&res, " {id=", trans_id(this)); + if (sid) { + absl::StrAppend(&res, ",mask[", *sid, "]=", int(shard_data_[SidToId(*sid)].local_mask), + ",txqpos[]=", shard_data_[SidToId(*sid)].pq_pos); + } + absl::StrAppend(&res, "}"); return res; } @@ -573,16 +578,13 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK_GT(txid_, 0u); CHECK(cb_ptr_) << DebugId(); - // Unlike with regular transactions we do not acquire locks upon scheduling - // because Scheduling is done before multi-exec batch is executed. Therefore we - // lock keys right before the execution of each statement. - unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); - CHECK_GT(run_barrier_.DEBUG_Count(), 0u); + CHECK(sd.local_mask & ARMED); + sd.local_mask &= ~ARMED; + CHECK_GT(run_barrier_.DEBUG_Count(), 0u); VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; bool was_suspended = sd.local_mask & SUSPENDED_Q; @@ -799,7 +801,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); InitTxTime(); - shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed); + shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; // Start new phase, be careful with writes until phase end! run_barrier_.Start(1); @@ -931,7 +933,7 @@ void Transaction::ExecuteAsync() { DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); // Set armed flags on all active shards - IterateActiveShards([](auto& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); }); + IterateActiveShards([](auto& sd, auto i) { sd.local_mask |= ARMED; }); // Start new phase: release semantics. From here we can be discovered by IsArmedInShard(), // and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end! @@ -1010,7 +1012,8 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id(); DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id(); - CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); + CHECK(sd.local_mask & ARMED); + sd.local_mask &= ~ARMED; // Calling the callback in somewhat safe way RunnableResult result; @@ -1071,6 +1074,11 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { return res; } +bool Transaction::IsArmedInShard(ShardId sid) const { + // Barrier has acquire semantics + return run_barrier_.Active() && (shard_data_[SidToId(sid)].local_mask & ARMED); +} + bool Transaction::IsActive(ShardId sid) const { // If we have only one shard, we often don't store infromation about all shards, so determine it // solely by id @@ -1083,6 +1091,12 @@ bool Transaction::IsActive(ShardId sid) const { return shard_data_[SidToId(sid)].local_mask & ACTIVE; } +uint16_t Transaction::GetLocalMask(ShardId sid) const { + DCHECK(IsActive(sid)); + DCHECK_GT(run_barrier_.DEBUG_Count(), 0u); + return shard_data_[SidToId(sid)].local_mask; +} + // Runs within a engine shard thread. // Optimized path that schedules and runs transactions out of order if possible. // Returns true if eagerly executed, false if the callback will be handled by the transaction @@ -1114,7 +1128,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { if (result.flags & RunnableResult::AVOID_CONCLUDING) { // If we want to run again, we have to actually schedule this transaction - DCHECK_EQ(sd.is_armed, false); + DCHECK_EQ(sd.local_mask & ARMED, 0); continue_scheduling = true; } else { LogAutoJournalOnShard(shard, result); diff --git a/src/server/transaction.h b/src/server/transaction.h index 04f521749..ea071975a 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -156,11 +156,12 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { - ACTIVE = 1, // Set on all active shards. + ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) + ARMED = 1 << 1, // Whether its armed (the hop was prepared) OUT_OF_ORDER = - 1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED is not set. + 1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired - SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard()) + SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb }; @@ -253,23 +254,15 @@ class Transaction { // Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; - // Returns true if the transaction spans this shard_id. - bool IsActive(ShardId shard_id) const; - // Returns true if the transaction is waiting for shard callbacks and the shard is armed. // Safe to read transaction state (and update shard local) until following RunInShard() finishes. - bool IsArmedInShard(ShardId sid) const { - if (sid >= shard_data_.size()) // For multi transactions shard_data_ spans all shards. - sid = 0; + bool IsArmedInShard(ShardId sid) const; - // Barrier has acquire semantics - return run_barrier_.Active() && shard_data_[sid].is_armed.load(std::memory_order_relaxed); - } + // Returns if the transaction spans this shard. Safe only when the transaction is armed. + bool IsActive(ShardId sid) const; - // Called from engine set shard threads. - uint16_t GetLocalMask(ShardId sid) const { - return shard_data_[SidToId(sid)].local_mask; - } + // Returns the state mask on this shard. Safe only when the transaction is armed (or blocked). + uint16_t GetLocalMask(ShardId sid) const; uint32_t GetLocalTxqPos(ShardId sid) const { return shard_data_[SidToId(sid)].pq_pos; @@ -325,7 +318,8 @@ class Transaction { return cid_; } - std::string DebugId() const; + // Return debug information about a transaction, include shard local info if passed + std::string DebugId(std::optional sid = std::nullopt) const; // Prepares for running ScheduleSingleHop() for a single-shard multi tx. // It is safe to call ScheduleSingleHop() after calling this method, but the callback passed @@ -364,30 +358,20 @@ class Transaction { }; struct alignas(64) PerShardData { - PerShardData(PerShardData&&) noexcept { - } - - PerShardData() = default; - - // this is the only variable that is accessed by both shard and coordinator threads. - std::atomic_bool is_armed{false}; - - // We pad with some memory so that atomic loads won't cause false sharing between threads. - char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline. - - uint32_t arg_start = 0; // Indices into args_ array. + uint32_t arg_start = 0; // Subspan in kv_args_ with local arguments. uint32_t arg_count = 0; - // Needed to rollback inconsistent schedulings or remove OOO transactions from - // tx queue. + // Position in the tx queue. OOO or cancelled schedules remove themselves by this index. uint32_t pq_pos = TxQueue::kEnd; - // Accessed within shard thread. - // Bitmask of LocalMask enums. + // State of shard - bitmask with LocalState flags uint16_t local_mask = 0; // Index of key relative to args in shard that the shard was woken up after blocking wait. uint16_t wake_key_pos = UINT16_MAX; + + // Prevent "false sharing" between cache lines: occupy a full cache line (64 bytes) + char pad[64 - 4 * sizeof(uint32_t)]; }; static_assert(sizeof(PerShardData) == 64); // cacheline @@ -449,7 +433,7 @@ class Transaction { // "Single claim - single modification" barrier. Multiple threads might try to claim it, only one // will succeed and will be allowed to modify the guarded object until it closes the barrier. // A closed barrier can't be claimed again or re-used in any way. - class BatonBarrierrier { + class BatonBarrier { public: bool IsClaimed() const; // Return if barrier is claimed, only for peeking bool TryClaim(); // Return if the barrier was claimed successfully @@ -616,7 +600,7 @@ class Transaction { UniqueSlotChecker unique_slot_checker_; // Barrier for waking blocking transactions that ensures exclusivity of waking operation. - BatonBarrierrier blocking_barrier_{}; + BatonBarrier blocking_barrier_{}; // Transaction coordinator state, written and read by coordinator thread. uint8_t coordinator_state_ = 0;