From 8e0080133c7c362967f3d859d18140e6dc9f0189 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 30 Mar 2023 06:55:28 +0300 Subject: [PATCH] fix: add missing barrier to fix reads in the coordinator fiber (#1009) Fixes #997. Signed-off-by: Roman Gershman --- src/facade/facade_test.h | 4 ++-- src/server/transaction.cc | 41 ++++++++++++++++++++++++--------------- src/server/transaction.h | 13 ++++++++++--- 3 files changed, 37 insertions(+), 21 deletions(-) diff --git a/src/facade/facade_test.h b/src/facade/facade_test.h index 929c25079..0f910468a 100644 --- a/src/facade/facade_test.h +++ b/src/facade/facade_test.h @@ -32,8 +32,8 @@ class RespMatcher { RespExpr::Type type_; std::string exp_str_; - int64_t exp_int_; - double_t exp_double_; + int64_t exp_int_ = 0; + double_t exp_double_ = 0; }; class RespTypeMatcher { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 9681ec9e9..4f6750c25 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -371,13 +371,13 @@ void Transaction::StartMultiNonAtomic() { void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); - DCHECK(!cb_); + DCHECK(!cb_ptr_); unique_shard_id_ = 0; unique_shard_cnt_ = 0; args_.clear(); cid_ = cid; - cb_ = nullptr; + cb_ptr_ = nullptr; if (multi_->mode == NON_ATOMIC) { for (auto& sd : shard_data_) { @@ -399,7 +399,7 @@ string Transaction::DebugId() const { // Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. bool Transaction::RunInShard(EngineShard* shard) { DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); - CHECK(cb_) << DebugId(); + CHECK(cb_ptr_) << DebugId(); DCHECK_GT(txid_, 0u); // Unlike with regular transactions we do not acquire locks upon scheduling @@ -447,10 +447,10 @@ bool Transaction::RunInShard(EngineShard* shard) { // if a transaction is suspended, we still run it because of brpoplpush/blmove case // that needs to run lpush on its suspended shard. - status = cb_(this, shard); + status = (*cb_ptr_)(this, shard); if (unique_shard_cnt_ == 1) { - cb_ = nullptr; // We can do it because only a single thread runs the callback. + cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback. local_result_ = status; } else { if (status == OpStatus::OUT_OF_MEMORY) { @@ -639,8 +639,8 @@ bool Transaction::MultiData::IsIncrLocks() const { // transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or // BLPOP where a data must be read from multiple shards before performing another hop. OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { - DCHECK(!cb_); - cb_ = std::move(cb); + DCHECK(!cb_ptr_); + cb_ptr_ = &cb; DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. @@ -671,10 +671,10 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); if (run_fast) { was_ooo = true; - // it's important to DecreaseRunCnt only for run_fast and after run_eager is assigned. + // it's important to DecreaseRunCnt only for run_fast and after was_ooo is assigned. // If DecreaseRunCnt were called before ScheduleUniqueShard finishes // then WaitForShardCallbacks below could exit before schedule_cb assigns return value - // to run_eager and cause stack corruption. + // to was_ooo and cause stack corruption. CHECK_GE(DecreaseRunCnt(), 1u); } }; @@ -695,10 +695,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { WaitForShardCallbacks(); DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId(); - if (was_ooo) + if (was_ooo) { coordinator_state_ |= COORD_OOO; + } - cb_ = nullptr; + if (schedule_fast) { + CHECK(!cb_ptr_); // we should have reset it within the callback. + } + cb_ptr_ = nullptr; return local_result_; } @@ -756,8 +760,9 @@ void Transaction::Schedule() { // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { DCHECK(coordinator_state_ & COORD_SCHED); + DCHECK(!cb_ptr_); - cb_ = std::move(cb); + cb_ptr_ = &cb; coordinator_state_ |= COORD_EXEC; if (conclude) { @@ -772,7 +777,7 @@ void Transaction::Execute(RunnableType cb, bool conclude) { WaitForShardCallbacks(); DVLOG(1) << "Wait on Exec " << DebugId() << " completed"; - cb_ = nullptr; + cb_ptr_ = nullptr; } // Runs in coordinator thread. @@ -857,11 +862,11 @@ void Transaction::RunQuickie(EngineShard* shard) { DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; - CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; + DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; // Calling the callback in somewhat safe way try { - local_result_ = cb_(this, shard); + local_result_ = (*cb_ptr_)(this, shard); } catch (std::bad_alloc&) { LOG_FIRST_N(ERROR, 16) << " out of memory"; local_result_ = OpStatus::OUT_OF_MEMORY; @@ -872,7 +877,7 @@ void Transaction::RunQuickie(EngineShard* shard) { LogAutoJournalOnShard(shard); sd.is_armed.store(false, memory_order_relaxed); - cb_ = nullptr; // We can do it because only a single shard runs the callback. + cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback. } // runs in coordinator thread. @@ -1206,6 +1211,10 @@ inline uint32_t Transaction::DecreaseRunCnt() { ::boost::intrusive_ptr guard(this); // We use release so that no stores will be reordered after. + // It's needed because we need to enforce that all stores executed before this point + // are visible right after run_count_ is unblocked in the coordinator thread. + // The fact that run_ec_.notify() does release operation is not enough, because + // WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0. uint32_t res = run_count_.fetch_sub(1, memory_order_release); if (res == 1) { run_ec_.notify(); diff --git a/src/server/transaction.h b/src/server/transaction.h index 5ea143290..707a35f1e 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -71,7 +72,7 @@ class Transaction { public: using time_point = ::std::chrono::steady_clock::time_point; // Runnable that is run on shards during hop executions (often named callback). - using RunnableType = std::function; + using RunnableType = absl::FunctionRef; // Provides keys to block on for specific shard. using WaitKeysProvider = std::function; @@ -406,7 +407,13 @@ class Transaction { void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); - seqlock_.fetch_add(1, std::memory_order_release); + // no reads after this fence will be reordered before it, and if a store operation sequenced + // before some release operation that happened before the fence in another thread, this store + // will be visible after the fence. + // In this specific case we synchronize with DecreaseRunCnt that releases run_count_. + // See #997 before changing it. + std::atomic_thread_fence(std::memory_order_acquire); + seqlock_.fetch_add(1, std::memory_order_relaxed); } // Log command in shard's journal, if this is a write command with auto-journaling enabled. @@ -471,7 +478,7 @@ class Transaction { // Reverse argument mapping for ReverseArgIndex to convert from shard index to original index. std::vector reverse_index_; - RunnableType cb_; // Run on shard threads + RunnableType* cb_ptr_ = nullptr; // Run on shard threads const CommandId* cid_; // Underlying command std::unique_ptr multi_; // Initialized when the transaction is multi/exec.