chore(transaction): Simplify PollExecution() (#2457)

* chore(transaction): Simplify PollExecution()

Remove seqlock_ from transaction. This change is possible because:
- We don't re-use shard_data[0] for multi transactions anymore
- We disarm atomically and poll callbacks are stateless

This makes it safe to call PollExecution() unconditionally that will determine on it's own whether the caller needs to run or is already expired

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-26 14:34:17 +03:00 committed by GitHub
parent ea5955962e
commit 7b6181641c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 59 additions and 80 deletions

View file

@ -324,12 +324,12 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
}
auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); };
*block_flag = true;
const auto key_checker = [req_obj_type](EngineShard* owner, const DbContext& context,
Transaction*, std::string_view key) -> bool {
return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok();
};
*block_flag = true;
auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker);
*block_flag = false;

View file

@ -457,20 +457,32 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
ShardId sid = shard_id();
stats_.poll_execution_total++;
uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0;
if (trans_mask & Transaction::AWAKED_Q) {
CHECK(continuation_trans_ == nullptr)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId()
<< "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " << trans_mask;
// Check if the caller was handled by a previous poll.
if (trans && !trans->IsArmedInShard(sid))
return;
bool keep = trans->RunInShard(this, false);
if (keep) {
auto local_mask = trans ? trans->GetLocalMask(sid) : 0; // safe only when trans is armed
// Blocked transactions are executed immediately after waking up
if (local_mask & Transaction::AWAKED_Q) {
CHECK(continuation_trans_ == nullptr || continuation_trans_ == trans)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId()
<< "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs "
<< trans->GetLocalMask(sid);
// Commands like BRPOPLPUSH don't conclude immediately
if (trans->RunInShard(this, false)) {
continuation_trans_ = trans;
return;
}
trans = nullptr; // Avoid handling the caller below
continuation_trans_ = nullptr;
}
string dbg_id;
// Check the currently running transaction, we have to handle it first until it concludes
if (continuation_trans_) {
if (trans == continuation_trans_)
trans = nullptr;
@ -479,7 +491,9 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
if (VLOG_IS_ON(1)) {
dbg_id = continuation_trans_->DebugId();
}
bool to_keep = continuation_trans_->RunInShard(this, false);
DVLOG(1) << "RunContTrans: " << dbg_id << " keep: " << to_keep;
if (!to_keep) {
// if this holds, we can remove this check altogether.
@ -489,44 +503,36 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
}
// Progress on the transaction queue if no transaction is running currently.
Transaction* head = nullptr;
if (continuation_trans_ == nullptr) {
while (!txq_.Empty()) {
// we must check every iteration so that if the current transaction awakens
// another transaction, the loop won't proceed further and will break, because we must run
// the notified transaction before all other transactions in the queue can proceed.
bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction();
if (has_awaked_trans)
// Break if there are any awakened transactions, as we must give way to them
// before continuing to handle regular transactions from the queue.
if (blocking_controller_ && blocking_controller_->HasAwakedTransaction())
break;
auto val = txq_.Front();
head = absl::get<Transaction*>(val);
head = get<Transaction*>(txq_.Front());
// The fact that Tx is in the queue, already means that coordinator fiber will not progress,
// hence here it's enough to test for run_count and check local_mask.
bool is_armed = head->IsArmedInShard(sid);
VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed;
// If the transaction isn't armed yet, it will be handled by a successive poll
if (!is_armed)
break;
// It could be that head is processed and unblocks multi-hop transaction .
// The transaction will schedule again and will arm another callback.
// Then we will reach invalid state by running trans after this loop,
// which is not what we want.
// This function should not process 2 different callbacks for the same transaction.
// Hence we make sure to reset trans if it has been processed via tx-queue.
// Avoid processing the caller transaction below if we found it in the queue,
// because it most likely won't have enough time to arm itself again.
if (head == trans)
trans = nullptr;
TxId txid = head->txid();
// committed_txid_ is strictly increasing when processed via TxQueue.
DCHECK_LT(committed_txid_, txid);
// We update committed_txid_ before calling RunInShard() to avoid cases where
// a transaction stalls the execution with IO while another fiber queries this shard for
// committed_txid_ (for example during the scheduling).
// Update commited_txid before running, because RunInShard might block on i/o.
// This way scheduling transactions won't see an understated value.
DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq
committed_txid_ = txid;
if (VLOG_IS_ON(2)) {
dbg_id = head->DebugId();
}
@ -545,12 +551,21 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DVLOG(1) << "Skipped TxQueue " << continuation_trans_;
}
// we need to run trans if it's OOO or when trans is blocked in this shard.
bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q);
// Either the poll had no caller or it was handled above
if (trans == nullptr)
return;
// It may be that there are other transactions that touch those keys but they necessary ordered
// after trans in the queue, hence it's safe to run trans out of order.
if (trans && should_run) {
// If the pointer is valid, we didn't handle it above, so trans is still armed
DCHECK(trans->IsArmedInShard(sid));
// OOO means no transaction before us in the txq accesses our keys, so we can run earlier
bool is_ooo = local_mask & Transaction::OUT_OF_ORDER;
// Still suspended shards need to run just to finalize and unregister, so we can run anytime
bool is_suspended = local_mask & Transaction::SUSPENDED_Q;
DCHECK_EQ(local_mask & Transaction::AWAKED_Q, 0);
if (is_ooo || is_suspended) {
DCHECK(trans != head);
dbg_id.clear();
@ -558,16 +573,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
dbg_id = trans->DebugId();
}
bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER;
bool keep = trans->RunInShard(this, txq_ooo);
if (txq_ooo && !keep) {
bool keep = trans->RunInShard(this, is_ooo);
if (is_ooo && !keep) {
stats_.tx_ooo_total++;
}
// If the transaction concluded, it must remove itself from the tx queue.
// Otherwise it is required to stay there to keep the relative order.
if (txq_ooo && !trans->IsMulti())
if (is_ooo && !trans->IsMulti())
DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd);
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;

View file

@ -883,8 +883,6 @@ void Transaction::ExecuteAsync() {
IterateActiveShards(
[](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });
uint32_t seq = seqlock_.load(memory_order_relaxed);
// this fence prevents that a read or write operation before a release fence will be reordered
// with a write operation after a release fence. Specifically no writes below will be reordered
// upwards. Important, because it protects non-threadsafe local_mask from being accessed by
@ -900,42 +898,10 @@ void Transaction::ExecuteAsync() {
return;
}
// We verify seq lock has the same generation number. See below for more info.
auto cb = [seq, this] {
EngineShard* shard = EngineShard::tlocal();
auto cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
bool is_armed = IsArmedInShard(shard->shard_id());
// First we check that this shard should run a callback by checking IsArmedInShard.
if (is_armed) {
uint32_t seq_after = seqlock_.load(memory_order_relaxed);
DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") "
<< run_count_.load(memory_order_relaxed);
// We also make sure that for multi-operation transactions like Multi/Eval
// this callback runs on a correct operation. We want to avoid a situation
// where the first operation is executed and the second operation is armed and
// now this callback from the previous operation finally runs and calls PollExecution.
// It is usually ok, but for single shard operations we abuse index 0 in shard_data_
// Therefore we may end up with a situation where this old callback runs on shard 7,
// accessing shard_data_[0] that now represents shard 5 for the next operation.
// seqlock provides protection for that so each cb will only run on the operation it has
// been tasked with.
// We also must first check is_armed and only then seqlock. The first check ensures that
// the coordinator thread crossed
// "run_count_.store(unique_shard_cnt_, memory_order_release);" barrier and our seqlock_
// is valid.
if (seq_after == seq) {
// shard->PollExecution(this) does not necessarily execute this transaction.
// Therefore, everything that should be handled during the callback execution
// should go into RunInShard.
shard->PollExecution("exec_cb", this);
} else {
VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")";
}
}
DVLOG(3) << "ptr_release " << DebugId() << " " << seq;
DVLOG(3) << "ptr_release " << DebugId();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
};
@ -993,6 +959,8 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
// Calling the callback in somewhat safe way
RunnableResult result;
try {
@ -1008,7 +976,6 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
// Handling the result, along with conclusion and journaling, is done by the caller
sd.is_armed.store(false, memory_order_relaxed);
cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback.
return result;
}

View file

@ -504,7 +504,6 @@ class Transaction {
// In this specific case we synchronize with DecreaseRunCnt that releases run_count_.
// See #997 before changing it.
std::atomic_thread_fence(std::memory_order_acquire);
seqlock_.fetch_add(1, std::memory_order_relaxed);
}
// Log command in shard's journal, if this is a write command with auto-journaling enabled.
@ -582,7 +581,7 @@ class Transaction {
uint64_t time_now_ms_{0};
std::atomic_uint32_t wakeup_requested_{0}; // whether tx was woken up
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
std::atomic_uint32_t use_count_{0}, run_count_{0};
// unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // Number of unique shards active