fix(transaction): Replace with armed sync point (#2708)

1. Replaces run_barrier as a synchronization point with is_armed + an embedded blocking counter for awaiting running jobs
2. Replaces IsArmedInShard + GetLocalMask + is_armed.exchange chain with a single DisarmInShard() / DisarmInShardWhen

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-03-14 17:40:32 +03:00 committed by GitHub
parent 7e0536fd4c
commit 9c6e6a96b7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 103 additions and 128 deletions

View file

@ -53,7 +53,7 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) {
const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",
cont_tx->IsArmedInShard(shard->shard_id()) ? " armed" : "");
cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : "");
}
LOG(WARNING) << msg;
@ -97,34 +97,6 @@ uint16_t trans_id(const Transaction* ptr) {
} // namespace
void Transaction::PhasedBarrier::Start(uint32_t count) {
DCHECK_EQ(DEBUG_Count(), 0u);
count_.store(count, memory_order_release);
}
bool Transaction::PhasedBarrier::Active() const {
return count_.load(memory_order_acquire) > 0;
}
void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) {
// Prevent transaction from being destroyed after count was decreased and Wait() unlocked,
// but before this thread finished notifying.
::boost::intrusive_ptr guard(keep_alive);
uint32_t before = count_.fetch_sub(1);
CHECK_GE(before, 1u) << keep_alive->DEBUG_PrintFailState(EngineShard::tlocal()->shard_id());
if (before == 1)
ec_.notify();
}
void Transaction::PhasedBarrier::Wait() {
ec_.await([this] { return count_.load(memory_order_acquire) == 0; });
}
uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
return count_.load(memory_order_relaxed);
}
bool Transaction::BatonBarrier::IsClaimed() const {
return claimed_.load(memory_order_relaxed);
}
@ -401,7 +373,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
for (const auto& sd : shard_data_) {
// sd.local_mask may be non-zero for multi transactions with instant locking.
// Specifically EVALs may maintain state between calls.
DCHECK_EQ(sd.local_mask & ARMED, 0);
DCHECK(!sd.is_armed.load(memory_order_relaxed));
if (!multi_) {
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
}
@ -529,7 +501,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK(IsAtomicMulti()); // Every command determines it's own active shards
sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED
}
DCHECK_EQ(sd.local_mask & ARMED, 0);
DCHECK(!sd.is_armed.load(memory_order_relaxed));
}
if (multi_->mode == NON_ATOMIC) {
@ -588,16 +560,12 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK(run_barrier_.Active());
DCHECK_GT(txid_, 0u);
CHECK(cb_ptr_) << DebugId();
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id());
sd.local_mask &= ~ARMED;
sd.stats.total_runs++;
DCHECK_GT(run_barrier_.DEBUG_Count(), 0u);
@ -720,7 +688,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
}
}
run_barrier_.Dec(this); // From this point on we can not access 'this'.
FinishHop(); // From this point on we can not access 'this'.
return !is_concluding;
}
@ -824,10 +792,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DCHECK(shard_data_.size() == 1 || multi_);
InitTxTime();
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
// Start new phase, be careful with writes until phase end!
run_barrier_.Start(1);
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_release);
auto schedule_cb = [this, &was_ooo] {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
@ -835,7 +802,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// We didn't decrease the barrier, so the scope is valid UNTIL Dec() below
DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u);
was_ooo = true;
run_barrier_.Dec(this);
FinishHop();
}
// Otherwise it's not safe to access the function scope, as
// ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below.
@ -961,18 +928,19 @@ void Transaction::ExecuteAsync() {
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
DCHECK_LE(shard_data_.size(), 1024u);
// Set armed flags on all active shards. Copy indices for dispatching poll tasks,
// because local_mask can be written concurrently after starting a new phase.
// Hops can start executing immediately after being armed, so we
// initialize the run barrier before arming, as well as copy indices
// of active shards to avoid reading concurrently accessed shard data.
std::bitset<1024> poll_flags(0);
run_barrier_.Start(unique_shard_cnt_);
// Set armed flags on all active shards.
std::atomic_thread_fence(memory_order_release); // once to avoid flushing poll_flags in loop
IterateActiveShards([&poll_flags](auto& sd, auto i) {
sd.local_mask |= ARMED;
sd.is_armed.store(true, memory_order_relaxed);
poll_flags.set(i, true);
});
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
// and thus picked by a foreign thread's PollExecution(). Careful with data access!
run_barrier_.Start(unique_shard_cnt_);
auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
@ -994,6 +962,11 @@ void Transaction::ExecuteAsync() {
});
}
void Transaction::FinishHop() {
boost::intrusive_ptr<Transaction> guard(this); // Keep alive until Dec() fully finishes
run_barrier_.Dec();
}
void Transaction::Conclude() {
if (!IsScheduled())
return;
@ -1062,9 +1035,6 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id());
sd.local_mask &= ~ARMED;
sd.stats.total_runs++;
// Calling the callback in somewhat safe way
@ -1126,9 +1096,21 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
return res;
}
bool Transaction::IsArmedInShard(ShardId sid) const {
// Barrier has acquire semantics
return run_barrier_.Active() && (shard_data_[SidToId(sid)].local_mask & ARMED);
uint16_t Transaction::DisarmInShard(ShardId sid) {
auto& sd = shard_data_[SidToId(sid)];
// NOTE: Maybe compare_exchange is worth it to avoid redundant writes
return sd.is_armed.exchange(false, memory_order_acquire) ? sd.local_mask : 0;
}
pair<uint16_t, bool> Transaction::DisarmInShardWhen(ShardId sid, uint16_t relevant_flags) {
auto& sd = shard_data_[SidToId(sid)];
if (sd.is_armed.load(memory_order_acquire)) {
bool relevant = sd.local_mask & relevant_flags;
if (relevant)
CHECK(sd.is_armed.exchange(false, memory_order_release));
return {sd.local_mask, relevant};
}
return {0, false};
}
bool Transaction::IsActive(ShardId sid) const {
@ -1143,12 +1125,6 @@ bool Transaction::IsActive(ShardId sid) const {
return shard_data_[SidToId(sid)].local_mask & ACTIVE;
}
uint16_t Transaction::GetLocalMask(ShardId sid) const {
DCHECK(IsActive(sid));
DCHECK_GT(run_barrier_.DEBUG_Count(), 0u);
return shard_data_[SidToId(sid)].local_mask;
}
IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
@ -1185,12 +1161,13 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
// without acquiring them at all.
if (quick_run) {
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
auto result = RunQuickie(shard);
local_result_ = result.status;
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// If we want to run again, we have to actually schedule this transaction
DCHECK_EQ(sd.local_mask & ARMED, 0);
DCHECK(!sd.is_armed.load(memory_order_relaxed));
continue_scheduling = true;
} else {
LogAutoJournalOnShard(shard, result);
@ -1400,7 +1377,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Resume processing of transaction queue
shard->PollExecution("unwatchcb", nullptr);
run_barrier_.Dec(this);
FinishHop();
}
OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {