fix: add missing barrier to fix reads in the coordinator fiber (#1009)

Fixes #997.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-30 06:55:28 +03:00 committed by GitHub
parent 0312b66244
commit 8e0080133c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 37 additions and 21 deletions

View file

@ -32,8 +32,8 @@ class RespMatcher {
RespExpr::Type type_; RespExpr::Type type_;
std::string exp_str_; std::string exp_str_;
int64_t exp_int_; int64_t exp_int_ = 0;
double_t exp_double_; double_t exp_double_ = 0;
}; };
class RespTypeMatcher { class RespTypeMatcher {

View file

@ -371,13 +371,13 @@ void Transaction::StartMultiNonAtomic() {
void Transaction::MultiSwitchCmd(const CommandId* cid) { void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK(multi_); DCHECK(multi_);
DCHECK(!cb_); DCHECK(!cb_ptr_);
unique_shard_id_ = 0; unique_shard_id_ = 0;
unique_shard_cnt_ = 0; unique_shard_cnt_ = 0;
args_.clear(); args_.clear();
cid_ = cid; cid_ = cid;
cb_ = nullptr; cb_ptr_ = nullptr;
if (multi_->mode == NON_ATOMIC) { if (multi_->mode == NON_ATOMIC) {
for (auto& sd : shard_data_) { for (auto& sd : shard_data_) {
@ -399,7 +399,7 @@ string Transaction::DebugId() const {
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. // Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
bool Transaction::RunInShard(EngineShard* shard) { bool Transaction::RunInShard(EngineShard* shard) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_) << DebugId(); CHECK(cb_ptr_) << DebugId();
DCHECK_GT(txid_, 0u); DCHECK_GT(txid_, 0u);
// Unlike with regular transactions we do not acquire locks upon scheduling // Unlike with regular transactions we do not acquire locks upon scheduling
@ -447,10 +447,10 @@ bool Transaction::RunInShard(EngineShard* shard) {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case // if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard. // that needs to run lpush on its suspended shard.
status = cb_(this, shard); status = (*cb_ptr_)(this, shard);
if (unique_shard_cnt_ == 1) { if (unique_shard_cnt_ == 1) {
cb_ = nullptr; // We can do it because only a single thread runs the callback. cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = status; local_result_ = status;
} else { } else {
if (status == OpStatus::OUT_OF_MEMORY) { if (status == OpStatus::OUT_OF_MEMORY) {
@ -639,8 +639,8 @@ bool Transaction::MultiData::IsIncrLocks() const {
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or // transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop. // BLPOP where a data must be read from multiple shards before performing another hop.
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DCHECK(!cb_); DCHECK(!cb_ptr_);
cb_ = std::move(cb); cb_ptr_ = &cb;
DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance.
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude.
@ -671,10 +671,10 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
if (run_fast) { if (run_fast) {
was_ooo = true; was_ooo = true;
// it's important to DecreaseRunCnt only for run_fast and after run_eager is assigned. // it's important to DecreaseRunCnt only for run_fast and after was_ooo is assigned.
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes // If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value // then WaitForShardCallbacks below could exit before schedule_cb assigns return value
// to run_eager and cause stack corruption. // to was_ooo and cause stack corruption.
CHECK_GE(DecreaseRunCnt(), 1u); CHECK_GE(DecreaseRunCnt(), 1u);
} }
}; };
@ -695,10 +695,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
WaitForShardCallbacks(); WaitForShardCallbacks();
DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId(); DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId();
if (was_ooo) if (was_ooo) {
coordinator_state_ |= COORD_OOO; coordinator_state_ |= COORD_OOO;
}
cb_ = nullptr; if (schedule_fast) {
CHECK(!cb_ptr_); // we should have reset it within the callback.
}
cb_ptr_ = nullptr;
return local_result_; return local_result_;
} }
@ -756,8 +760,9 @@ void Transaction::Schedule() {
// Runs in coordinator thread. // Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) { void Transaction::Execute(RunnableType cb, bool conclude) {
DCHECK(coordinator_state_ & COORD_SCHED); DCHECK(coordinator_state_ & COORD_SCHED);
DCHECK(!cb_ptr_);
cb_ = std::move(cb); cb_ptr_ = &cb;
coordinator_state_ |= COORD_EXEC; coordinator_state_ |= COORD_EXEC;
if (conclude) { if (conclude) {
@ -772,7 +777,7 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
WaitForShardCallbacks(); WaitForShardCallbacks();
DVLOG(1) << "Wait on Exec " << DebugId() << " completed"; DVLOG(1) << "Wait on Exec " << DebugId() << " completed";
cb_ = nullptr; cb_ptr_ = nullptr;
} }
// Runs in coordinator thread. // Runs in coordinator thread.
@ -857,11 +862,11 @@ void Transaction::RunQuickie(EngineShard* shard) {
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
CHECK(cb_) << DebugId() << " " << shard->shard_id() << " " << args_[0]; DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id() << " " << args_[0];
// Calling the callback in somewhat safe way // Calling the callback in somewhat safe way
try { try {
local_result_ = cb_(this, shard); local_result_ = (*cb_ptr_)(this, shard);
} catch (std::bad_alloc&) { } catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; LOG_FIRST_N(ERROR, 16) << " out of memory";
local_result_ = OpStatus::OUT_OF_MEMORY; local_result_ = OpStatus::OUT_OF_MEMORY;
@ -872,7 +877,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
LogAutoJournalOnShard(shard); LogAutoJournalOnShard(shard);
sd.is_armed.store(false, memory_order_relaxed); sd.is_armed.store(false, memory_order_relaxed);
cb_ = nullptr; // We can do it because only a single shard runs the callback. cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback.
} }
// runs in coordinator thread. // runs in coordinator thread.
@ -1206,6 +1211,10 @@ inline uint32_t Transaction::DecreaseRunCnt() {
::boost::intrusive_ptr guard(this); ::boost::intrusive_ptr guard(this);
// We use release so that no stores will be reordered after. // We use release so that no stores will be reordered after.
// It's needed because we need to enforce that all stores executed before this point
// are visible right after run_count_ is unblocked in the coordinator thread.
// The fact that run_ec_.notify() does release operation is not enough, because
// WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0.
uint32_t res = run_count_.fetch_sub(1, memory_order_release); uint32_t res = run_count_.fetch_sub(1, memory_order_release);
if (res == 1) { if (res == 1) {
run_ec_.notify(); run_ec_.notify();

View file

@ -7,6 +7,7 @@
#include <absl/container/flat_hash_map.h> #include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h> #include <absl/container/flat_hash_set.h>
#include <absl/container/inlined_vector.h> #include <absl/container/inlined_vector.h>
#include <absl/functional/function_ref.h>
#include <string_view> #include <string_view>
#include <variant> #include <variant>
@ -71,7 +72,7 @@ class Transaction {
public: public:
using time_point = ::std::chrono::steady_clock::time_point; using time_point = ::std::chrono::steady_clock::time_point;
// Runnable that is run on shards during hop executions (often named callback). // Runnable that is run on shards during hop executions (often named callback).
using RunnableType = std::function<OpStatus(Transaction* t, EngineShard*)>; using RunnableType = absl::FunctionRef<OpStatus(Transaction* t, EngineShard*)>;
// Provides keys to block on for specific shard. // Provides keys to block on for specific shard.
using WaitKeysProvider = std::function<ArgSlice(Transaction*, EngineShard* shard)>; using WaitKeysProvider = std::function<ArgSlice(Transaction*, EngineShard* shard)>;
@ -406,7 +407,13 @@ class Transaction {
void WaitForShardCallbacks() { void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
seqlock_.fetch_add(1, std::memory_order_release); // no reads after this fence will be reordered before it, and if a store operation sequenced
// before some release operation that happened before the fence in another thread, this store
// will be visible after the fence.
// In this specific case we synchronize with DecreaseRunCnt that releases run_count_.
// See #997 before changing it.
std::atomic_thread_fence(std::memory_order_acquire);
seqlock_.fetch_add(1, std::memory_order_relaxed);
} }
// Log command in shard's journal, if this is a write command with auto-journaling enabled. // Log command in shard's journal, if this is a write command with auto-journaling enabled.
@ -471,7 +478,7 @@ class Transaction {
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index. // Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_; std::vector<uint32_t> reverse_index_;
RunnableType cb_; // Run on shard threads RunnableType* cb_ptr_ = nullptr; // Run on shard threads
const CommandId* cid_; // Underlying command const CommandId* cid_; // Underlying command
std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec. std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec.