chore(transaction): Use PhasedBarrier for easier synchronization (#2455)

chore(transaction): Use PhasedBarrier for easier synchronization

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-30 19:43:06 +03:00 committed by GitHub
parent 503891b1fa
commit 90a9f05e36
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 113 additions and 155 deletions

View file

@ -71,12 +71,51 @@ void RecordTxScheduleStats(const Transaction* tx) {
}
}
void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inline) {
DCHECK_EQ(tx->GetUniqueShardCnt(), 1u);
auto* ss = ServerState::tlocal();
if (was_ooo) {
ss->stats.tx_type_cnt[was_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
}
} // namespace
IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
void Transaction::PhasedBarrier::Start(uint32_t count) {
DCHECK_EQ(DEBUG_Count(), 0u);
count_.store(count, memory_order_release);
}
bool Transaction::PhasedBarrier::Active() const {
return count_.load(memory_order_acquire) > 0;
}
void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) {
// Prevent transaction from being destroyed after count was decreased and Wait() unlocked,
// but before this thread finished notifying.
::boost::intrusive_ptr guard(keep_alive);
uint32_t before = count_.fetch_sub(1);
CHECK_GE(before, 1u);
if (before == 1)
ec_.notify();
}
void Transaction::PhasedBarrier::Wait() {
ec_.await([this] { return count_.load(memory_order_acquire) == 0; });
}
uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}
/**
* @brief Construct a new Transaction:: Transaction object
*
@ -246,26 +285,6 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
}
}
/**
*
* There are 4 options that we consider here:
* a. T spans a single shard and it's not multi.
* unique_shard_id_ is predefined before the schedule() is called.
* In that case only a single thread will be scheduled and it will use shard_data[0] just because
* shard_data.size() = 1. Coordinator thread can access any data because there is a
* schedule barrier between InitByArgs and RunInShard/IsArmedInShard functions.
* b. T spans multiple shards and its not multi
* In that case multiple threads will be scheduled. Similarly they have a schedule barrier,
* and IsArmedInShard can read any variable from shard_data[x].
* c. Trans spans a single shard and it's multi. shard_data has size of ess_.size.
* IsArmedInShard will check shard_data[x].
* d. Trans spans multiple shards and it's multi. Similarly shard_data[x] will be checked.
* unique_shard_cnt_ and unique_shard_id_ are not accessed until shard_data[x] is armed, hence
* we have a barrier between coordinator and engine-threads. Therefore there should not be
* data races.
*
**/
void Transaction::InitByKeys(const KeyIndex& key_index) {
if (key_index.start == full_args_.size()) { // eval with 0 keys.
CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name();
@ -499,9 +518,9 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_ptr_) << DebugId();
DCHECK(run_barrier_.Active());
DCHECK_GT(txid_, 0u);
CHECK(cb_ptr_) << DebugId();
// Unlike with regular transactions we do not acquire locks upon scheduling
// because Scheduling is done before multi-exec batch is executed. Therefore we
@ -511,7 +530,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
auto& sd = shard_data_[idx];
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
CHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK_GT(run_barrier_.DEBUG_Count(), 0u);
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
@ -626,9 +645,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
}
}
DecreaseRunCnt();
// From this point on we can not access 'this'.
run_barrier_.Dec(this); // From this point on we can not access 'this'.
return !is_concluding;
}
@ -718,71 +735,56 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude.
}
// If we run only on one shard and conclude, we can avoid scheduling at all
// and directly dispatch the task to its destination shard.
// If we run only on one shard and conclude, we can possibly avoid scheduling at all
// and directly run the callback on the destination thread if the locks are free.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();
bool was_ooo = false;
bool run_inline = false;
ServerState* ss = nullptr;
bool was_ooo = false, was_inline = false;
if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(IsActive(unique_shard_id_));
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
run_count_.store(1, memory_order_release);
time_now_ms_ = GetCurrentTimeMs();
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
// NOTE: schedule_cb cannot update data on stack when run_fast is false.
// This is because ScheduleSingleHop can finish before the callback returns.
// Start new phase, be careful with writes until phase end!
run_barrier_.Start(1);
// This happens when ScheduleUniqueShard schedules into TxQueue (hence run_fast is false), and
// then calls PollExecute that in turn runs the callback which calls DecreaseRunCnt. As a result
// WaitForShardCallbacks below is unblocked before schedule_cb returns. However, if run_fast is
// true, then we may mutate stack variables, but only before DecreaseRunCnt is called.
auto schedule_cb = [this, &was_ooo] {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
if (run_fast) {
// We didn't decrease the barrier, so the scope is valid UNTIL Dec() below
DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u);
was_ooo = true;
// it's important to DecreaseRunCnt only for run_fast and after was_ooo is assigned.
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value
// to was_ooo and cause stack corruption.
DecreaseRunCnt();
run_barrier_.Dec(this);
}
// Otherwise it's not safe to access the function scope, as
// ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below.
};
ss = ServerState::tlocal();
auto* ss = ServerState::tlocal();
if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) {
DVLOG(2) << "Inline scheduling a transaction";
schedule_cb();
run_inline = true;
was_inline = true;
} else {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
}
} else { // This transaction either spans multiple shards and/or is multi.
if (!IsAtomicMulti()) // Multi schedule in advance.
} else {
// This transaction either spans multiple shards and/or is multi, which schedule in advance.
if (!IsAtomicMulti())
ScheduleInternal();
ExecuteAsync();
}
DVLOG(2) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
WaitForShardCallbacks();
DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId();
run_barrier_.Wait();
if (schedule_fast) {
CHECK(!cb_ptr_); // we should have reset it within the callback.
if (was_ooo) {
ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
RecordTxScheduleFastStats(this, was_ooo, was_inline);
}
cb_ptr_ = nullptr;
return local_result_;
@ -814,9 +816,6 @@ void Transaction::UnlockMulti() {
unsigned shard_journals_cnt =
ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() {
@ -867,9 +866,7 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
ExecuteAsync();
DVLOG(1) << "Execute::WaitForCbs " << DebugId();
WaitForShardCallbacks();
DVLOG(1) << "Execute::WaitForCbs " << DebugId() << " completed";
run_barrier_.Wait();
cb_ptr_ = nullptr;
}
@ -877,46 +874,33 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
// Runs in coordinator thread.
void Transaction::ExecuteAsync() {
DVLOG(1) << "ExecuteAsync " << DebugId();
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
// executed by the engine shard once it has been armed and coordinator thread will finish the
// transaction before engine shard thread stops accessing it. Therefore, we increase reference
// by number of callbacks accessing 'this' to allow callbacks to execute shard->Execute(this);
// safely.
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
// Set armed flags on all active shards
IterateActiveShards([](auto& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });
// We access sd.is_armed outside of shard-threads but we guard it with run_count_ release.
IterateActiveShards(
[](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });
// this fence prevents that a read or write operation before a release fence will be reordered
// with a write operation after a release fence. Specifically no writes below will be reordered
// upwards. Important, because it protects non-threadsafe local_mask from being accessed by
// IsArmedInShard in other threads.
run_count_.store(unique_shard_cnt_, memory_order_release);
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
// and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end!
run_barrier_.Start(unique_shard_cnt_);
auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
intrusive_ptr_release(this); // against use_count_.fetch_add above.
return;
}
auto cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); // for each pointer from poll_cb
auto poll_cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
DVLOG(3) << "ptr_release " << DebugId();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};
// IsArmedInShard is the protector of non-thread safe data.
IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); });
IterateActiveShards([&poll_cb](PerShardData& sd, auto i) { shard_set->Add(i, poll_cb); });
}
void Transaction::Conclude() {
@ -990,8 +974,7 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
DCHECK(!IsGlobal());
DVLOG(1) << "ExpireBlocking " << DebugId();
run_count_.store(unique_shard_cnt_, memory_order_release);
run_barrier_.Start(unique_shard_cnt_);
auto expire_cb = [this, &wcb] {
EngineShard* es = EngineShard::tlocal();
@ -999,7 +982,7 @@ void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
};
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });
WaitForShardCallbacks();
run_barrier_.Wait();
DVLOG(1) << "ExpireBlocking finished " << DebugId();
}
@ -1302,8 +1285,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Resume processing of transaction queue
shard->PollExecution("unwatchcb", nullptr);
DecreaseRunCnt();
run_barrier_.Dec(this);
}
OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
@ -1363,28 +1345,6 @@ void Transaction::UnlockMultiShardCb(absl::Span<const std::string_view> sharded_
shard->blocking_controller()->NotifyPending();
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();
}
void Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late.
::boost::intrusive_ptr guard(this);
// We use release so that no stores will be reordered after.
// It's needed because we need to enforce that all stores executed before this point
// are visible right after run_count_ is unblocked in the coordinator thread.
// The fact that run_ec_.notify() does release operation is not enough, because
// WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0.
uint32_t res = run_count_.fetch_sub(1, memory_order_release);
CHECK_GE(res, 1u) << unique_shard_cnt_ << " " << unique_shard_id_ << " " << cid_->name() << " "
<< use_count_.load(memory_order_relaxed) << " " << uint32_t(coordinator_state_);
if (res == 1) {
run_ec_.notify();
}
}
bool Transaction::IsGlobal() const {