chore(transaction): Simplify PollExecution (#2712)

This commit is contained in:
Vladislav 2024-03-12 09:09:29 +03:00 committed by GitHub
parent a3dc9382bb
commit 9ccf2b9871
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -472,6 +472,12 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
string dbg_id;
auto run = [this, &dbg_id](Transaction* tx, bool is_ooo) -> bool /* keep */ {
dbg_id = VLOG_IS_ON(1) ? tx->DebugId() : "";
bool keep = tx->RunInShard(this, is_ooo);
DLOG_IF(INFO, !dbg_id.empty()) << dbg_id << ", keep " << keep << ", ooo " << is_ooo;
return keep;
};
// Check the currently running transaction, we have to handle it first until it concludes
if (continuation_trans_) {
@ -479,14 +485,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
trans = nullptr;
if (continuation_trans_->IsArmedInShard(sid)) {
if (VLOG_IS_ON(1)) {
dbg_id = continuation_trans_->DebugId();
}
bool to_keep = continuation_trans_->RunInShard(this, false);
DVLOG(1) << "RunContTrans: " << dbg_id << " keep: " << to_keep;
if (!to_keep) {
if (bool keep = run(continuation_trans_, false); !keep) {
// if this holds, we can remove this check altogether.
DCHECK(continuation_trans_ == nullptr);
continuation_trans_ = nullptr;
@ -496,50 +495,34 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// Progress on the transaction queue if no transaction is running currently.
Transaction* head = nullptr;
if (continuation_trans_ == nullptr) {
while (!txq_.Empty()) {
// Break if there are any awakened transactions, as we must give way to them
// before continuing to handle regular transactions from the queue.
if (blocking_controller_ && blocking_controller_->HasAwakedTransaction())
break;
while (continuation_trans_ == nullptr && !txq_.Empty()) {
// Break if there are any awakened transactions, as we must give way to them
// before continuing to handle regular transactions from the queue.
if (blocking_controller_ && blocking_controller_->HasAwakedTransaction())
break;
head = get<Transaction*>(txq_.Front());
head = get<Transaction*>(txq_.Front());
bool is_armed = head->IsArmedInShard(sid);
VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed;
VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << head->IsArmedInShard(sid);
// If the transaction isn't armed yet, it will be handled by a successive poll
if (!is_armed)
break;
// If the transaction isn't armed yet, it will be handled by a successive poll
if (!head->IsArmedInShard(sid))
break;
// Avoid processing the caller transaction below if we found it in the queue,
// because it most likely won't have enough time to arm itself again.
if (head == trans)
trans = nullptr;
// Avoid processing the caller transaction below if we found it in the queue,
// because it most likely won't have enough time to arm itself again.
if (head == trans)
trans = nullptr;
TxId txid = head->txid();
TxId txid = head->txid();
// Update commited_txid before running, because RunInShard might block on i/o.
// This way scheduling transactions won't see an understated value.
DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq
committed_txid_ = txid;
// Update commited_txid before running, because RunInShard might block on i/o.
// This way scheduling transactions won't see an understated value.
DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq
committed_txid_ = txid;
if (VLOG_IS_ON(2)) {
dbg_id = head->DebugId();
}
bool keep = head->RunInShard(this, false);
// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
if (keep) {
continuation_trans_ = head;
break;
}
} // while(!txq_.Empty())
} else { // if (continuation_trans_ == nullptr)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_;
if (bool keep = run(head, false); keep)
continuation_trans_ = head;
}
// Either the poll had no caller or it was handled above
@ -559,12 +542,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
if (is_ooo || is_suspended) {
DCHECK(trans != head);
dbg_id.clear();
if (VLOG_IS_ON(1)) {
dbg_id = trans->DebugId();
}
bool keep = trans->RunInShard(this, is_ooo);
bool keep = run(trans, is_ooo);
if (is_ooo && !keep) {
stats_.tx_ooo_total++;
}
@ -573,8 +551,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
// Otherwise it is required to stay there to keep the relative order.
if (is_ooo && !trans->IsMulti())
DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd);
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
}
}