diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 371d0edff..448316ac0 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -324,12 +324,12 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty } auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; - - *block_flag = true; const auto key_checker = [req_obj_type](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok(); }; + + *block_flag = true; auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker); *block_flag = false; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 6e5f3c433..e6df641d9 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -457,20 +457,32 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { ShardId sid = shard_id(); stats_.poll_execution_total++; - uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; - if (trans_mask & Transaction::AWAKED_Q) { - CHECK(continuation_trans_ == nullptr) - << continuation_trans_->DebugId() << " when polling " << trans->DebugId() - << "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " << trans_mask; + // Check if the caller was handled by a previous poll. + if (trans && !trans->IsArmedInShard(sid)) + return; - bool keep = trans->RunInShard(this, false); - if (keep) { + auto local_mask = trans ? trans->GetLocalMask(sid) : 0; // safe only when trans is armed + + // Blocked transactions are executed immediately after waking up + if (local_mask & Transaction::AWAKED_Q) { + CHECK(continuation_trans_ == nullptr || continuation_trans_ == trans) + << continuation_trans_->DebugId() << " when polling " << trans->DebugId() + << "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " + << trans->GetLocalMask(sid); + + // Commands like BRPOPLPUSH don't conclude immediately + if (trans->RunInShard(this, false)) { + continuation_trans_ = trans; return; } + + trans = nullptr; // Avoid handling the caller below + continuation_trans_ = nullptr; } string dbg_id; + // Check the currently running transaction, we have to handle it first until it concludes if (continuation_trans_) { if (trans == continuation_trans_) trans = nullptr; @@ -479,7 +491,9 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { if (VLOG_IS_ON(1)) { dbg_id = continuation_trans_->DebugId(); } + bool to_keep = continuation_trans_->RunInShard(this, false); + DVLOG(1) << "RunContTrans: " << dbg_id << " keep: " << to_keep; if (!to_keep) { // if this holds, we can remove this check altogether. @@ -489,44 +503,36 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } + // Progress on the transaction queue if no transaction is running currently. Transaction* head = nullptr; if (continuation_trans_ == nullptr) { while (!txq_.Empty()) { - // we must check every iteration so that if the current transaction awakens - // another transaction, the loop won't proceed further and will break, because we must run - // the notified transaction before all other transactions in the queue can proceed. - bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); - if (has_awaked_trans) + // Break if there are any awakened transactions, as we must give way to them + // before continuing to handle regular transactions from the queue. + if (blocking_controller_ && blocking_controller_->HasAwakedTransaction()) break; - auto val = txq_.Front(); - head = absl::get(val); + head = get(txq_.Front()); - // The fact that Tx is in the queue, already means that coordinator fiber will not progress, - // hence here it's enough to test for run_count and check local_mask. bool is_armed = head->IsArmedInShard(sid); VLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed; + // If the transaction isn't armed yet, it will be handled by a successive poll if (!is_armed) break; - // It could be that head is processed and unblocks multi-hop transaction . - // The transaction will schedule again and will arm another callback. - // Then we will reach invalid state by running trans after this loop, - // which is not what we want. - // This function should not process 2 different callbacks for the same transaction. - // Hence we make sure to reset trans if it has been processed via tx-queue. + // Avoid processing the caller transaction below if we found it in the queue, + // because it most likely won't have enough time to arm itself again. if (head == trans) trans = nullptr; + TxId txid = head->txid(); - // committed_txid_ is strictly increasing when processed via TxQueue. - DCHECK_LT(committed_txid_, txid); - - // We update committed_txid_ before calling RunInShard() to avoid cases where - // a transaction stalls the execution with IO while another fiber queries this shard for - // committed_txid_ (for example during the scheduling). + // Update commited_txid before running, because RunInShard might block on i/o. + // This way scheduling transactions won't see an understated value. + DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq committed_txid_ = txid; + if (VLOG_IS_ON(2)) { dbg_id = head->DebugId(); } @@ -545,12 +551,21 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(1) << "Skipped TxQueue " << continuation_trans_; } - // we need to run trans if it's OOO or when trans is blocked in this shard. - bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q); + // Either the poll had no caller or it was handled above + if (trans == nullptr) + return; - // It may be that there are other transactions that touch those keys but they necessary ordered - // after trans in the queue, hence it's safe to run trans out of order. - if (trans && should_run) { + // If the pointer is valid, we didn't handle it above, so trans is still armed + DCHECK(trans->IsArmedInShard(sid)); + + // OOO means no transaction before us in the txq accesses our keys, so we can run earlier + bool is_ooo = local_mask & Transaction::OUT_OF_ORDER; + + // Still suspended shards need to run just to finalize and unregister, so we can run anytime + bool is_suspended = local_mask & Transaction::SUSPENDED_Q; + DCHECK_EQ(local_mask & Transaction::AWAKED_Q, 0); + + if (is_ooo || is_suspended) { DCHECK(trans != head); dbg_id.clear(); @@ -558,16 +573,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { dbg_id = trans->DebugId(); } - bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER; - bool keep = trans->RunInShard(this, txq_ooo); - - if (txq_ooo && !keep) { + bool keep = trans->RunInShard(this, is_ooo); + if (is_ooo && !keep) { stats_.tx_ooo_total++; } // If the transaction concluded, it must remove itself from the tx queue. // Otherwise it is required to stay there to keep the relative order. - if (txq_ooo && !trans->IsMulti()) + if (is_ooo && !trans->IsMulti()) DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd); DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ed607b7a4..d2e0c7e09 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -883,8 +883,6 @@ void Transaction::ExecuteAsync() { IterateActiveShards( [](PerShardData& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); }); - uint32_t seq = seqlock_.load(memory_order_relaxed); - // this fence prevents that a read or write operation before a release fence will be reordered // with a write operation after a release fence. Specifically no writes below will be reordered // upwards. Important, because it protects non-threadsafe local_mask from being accessed by @@ -900,42 +898,10 @@ void Transaction::ExecuteAsync() { return; } - // We verify seq lock has the same generation number. See below for more info. - auto cb = [seq, this] { - EngineShard* shard = EngineShard::tlocal(); + auto cb = [this] { + EngineShard::tlocal()->PollExecution("exec_cb", this); - bool is_armed = IsArmedInShard(shard->shard_id()); - // First we check that this shard should run a callback by checking IsArmedInShard. - if (is_armed) { - uint32_t seq_after = seqlock_.load(memory_order_relaxed); - - DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " - << run_count_.load(memory_order_relaxed); - - // We also make sure that for multi-operation transactions like Multi/Eval - // this callback runs on a correct operation. We want to avoid a situation - // where the first operation is executed and the second operation is armed and - // now this callback from the previous operation finally runs and calls PollExecution. - // It is usually ok, but for single shard operations we abuse index 0 in shard_data_ - // Therefore we may end up with a situation where this old callback runs on shard 7, - // accessing shard_data_[0] that now represents shard 5 for the next operation. - // seqlock provides protection for that so each cb will only run on the operation it has - // been tasked with. - // We also must first check is_armed and only then seqlock. The first check ensures that - // the coordinator thread crossed - // "run_count_.store(unique_shard_cnt_, memory_order_release);" barrier and our seqlock_ - // is valid. - if (seq_after == seq) { - // shard->PollExecution(this) does not necessarily execute this transaction. - // Therefore, everything that should be handled during the callback execution - // should go into RunInShard. - shard->PollExecution("exec_cb", this); - } else { - VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")"; - } - } - - DVLOG(3) << "ptr_release " << DebugId() << " " << seq; + DVLOG(3) << "ptr_release " << DebugId(); intrusive_ptr_release(this); // against use_count_.fetch_add above. }; @@ -993,6 +959,8 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id(); DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id(); + CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); + // Calling the callback in somewhat safe way RunnableResult result; try { @@ -1008,7 +976,6 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { // Handling the result, along with conclusion and journaling, is done by the caller - sd.is_armed.store(false, memory_order_relaxed); cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback. return result; } diff --git a/src/server/transaction.h b/src/server/transaction.h index 82ded12cf..67d14d6d4 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -504,7 +504,6 @@ class Transaction { // In this specific case we synchronize with DecreaseRunCnt that releases run_count_. // See #997 before changing it. std::atomic_thread_fence(std::memory_order_acquire); - seqlock_.fetch_add(1, std::memory_order_relaxed); } // Log command in shard's journal, if this is a write command with auto-journaling enabled. @@ -582,7 +581,7 @@ class Transaction { uint64_t time_now_ms_{0}; std::atomic_uint32_t wakeup_requested_{0}; // whether tx was woken up - std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; + std::atomic_uint32_t use_count_{0}, run_count_{0}; // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // Number of unique shards active