diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 8a7d45343..2a970c8e8 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1293,11 +1293,9 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des Transaction* transaction = cntx->transaction; if (transaction->GetUniqueShardCnt() == 1) { + transaction->ReviveAutoJournal(); // Safe to use RENAME with single shard auto cb = [&](Transaction* t, EngineShard* shard) { - auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); - // Incase of uniqe shard count we can use rename command in replica. - t->RenableAutoJournal(); - return ec; + return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 99e20b338..fc06a484b 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -717,13 +717,10 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis OpResult result; if (cntx->transaction->GetUniqueShardCnt() == 1) { + cntx->transaction->ReviveAutoJournal(); // On single shard we can use the auto journal flow. auto cb = [&](Transaction* t, EngineShard* shard) { - auto ec = OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); - // On single shard we can use the auto journal flow. - t->RenableAutoJournal(); - return ec; + return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); }; - result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { cntx->transaction->Schedule(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 3ce72ed62..4d1c7765b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -27,6 +27,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space; namespace { +// Global txid sequence atomic_uint64_t op_seq{1}; constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction); @@ -90,12 +91,12 @@ std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) { return os << ms << "ms"; } -} // namespace - -IntentLock::Mode Transaction::LockMode() const { - return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; +uint16_t trans_id(const Transaction* ptr) { + return (intptr_t(ptr) >> 8) & 0xFFFF; } +} // namespace + void Transaction::PhasedBarrier::Start(uint32_t count) { DCHECK_EQ(DEBUG_Count(), 0u); count_.store(count, memory_order_release); @@ -159,13 +160,6 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { return cv_status::no_timeout; } -/** - * @brief Construct a new Transaction:: Transaction object - * - * @param cid - * @param ess - * @param cs - */ Transaction::Transaction(const CommandId* cid) : cid_{cid} { InitTxTime(); string_view cmd_name(cid_->name()); @@ -1155,6 +1149,16 @@ uint16_t Transaction::GetLocalMask(ShardId sid) const { return shard_data_[SidToId(sid)].local_mask; } +IntentLock::Mode Transaction::LockMode() const { + return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; +} + +OpArgs Transaction::GetOpArgs(EngineShard* shard) const { + DCHECK(IsActive(shard->shard_id())); + DCHECK((multi_ && multi_->role == SQUASHED_STUB) || (run_barrier_.DEBUG_Count() > 0)); + return OpArgs{shard, this, GetDbContext()}; +} + // Runs within a engine shard thread. // Optimized path that schedules and runs transactions out of order if possible. // Returns true if eagerly executed, false if the callback will be handled by the transaction @@ -1278,35 +1282,31 @@ bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - auto pos = sd.pq_pos; - if (pos == TxQueue::kEnd) + TxQueue::Iterator q_pos = exchange(sd.pq_pos, TxQueue::kEnd); + if (q_pos == TxQueue::kEnd) { + DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); return false; - - sd.pq_pos = TxQueue::kEnd; + } TxQueue* txq = shard->txq(); - TxQueue::Iterator head = txq->Head(); - auto val = txq->At(pos); - Transaction* trans = absl::get(val); - DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans; - txq->Remove(pos); + bool was_head = txq->Head() == q_pos; + + Transaction* trans = absl::get(txq->At(q_pos)); + DCHECK(trans == this) << txq->size() << ' ' << sd.pq_pos << ' ' << trans->DebugId(); + txq->Remove(q_pos); - if (sd.local_mask & KEYLOCK_ACQUIRED) { - auto mode = LockMode(); - auto lock_args = GetLockArgs(shard->shard_id()); - DCHECK(lock_args.args.size() > 0); - shard->db_slice().Release(mode, lock_args); - sd.local_mask &= ~KEYLOCK_ACQUIRED; - } if (IsGlobal()) { shard->shard_lock()->Release(LockMode()); + } else { + auto lock_args = GetLockArgs(shard->shard_id()); + DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); + DCHECK(!lock_args.args.empty()); + shard->db_slice().Release(LockMode(), lock_args); + sd.local_mask &= ~KEYLOCK_ACQUIRED; } - if (pos == head && !txq->Empty()) { - return true; - } - - return false; + // Check if we need to poll the next head + return was_head && !txq->Empty(); } // runs in engine-shard thread. @@ -1541,7 +1541,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul } // If autojournaling was disabled and not re-enabled, skip it - if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) + if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_) return; // TODO: Handle complex commands like LMPOP correctly once they are implemented. @@ -1586,6 +1586,12 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt unique_slot_checker_.GetUniqueSlotId(), {}, false); } +void Transaction::ReviveAutoJournal() { + DCHECK(cid_->opt_mask() & CO::NO_AUTOJOURNAL); + DCHECK_EQ(run_barrier_.DEBUG_Count(), 0u); // Can't be changed while dispatching + re_enabled_auto_journal_ = true; +} + void Transaction::CancelBlocking(std::function status_cb) { // We're on the owning thread of this transaction, so we can safely access it's data below. // First, check if it makes sense to proceed. diff --git a/src/server/transaction.h b/src/server/transaction.h index f1ea955b0..ba1c23df8 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -158,12 +158,13 @@ class Transaction { enum LocalMask : uint16_t { ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) ARMED = 1 << 1, // Whether its armed (the hop was prepared) - OUT_OF_ORDER = - 1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set - KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired - SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) - AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) - UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb + // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set + OUT_OF_ORDER = 1 << 2, + // Whether its key locks are acquired, never set for global commands. + KEYLOCK_ACQUIRED = 1 << 3, + SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) + AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) + UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb }; public: @@ -220,11 +221,6 @@ class Transaction { // Must be called from coordinator thread. void CancelBlocking(std::function); - // In some cases for non auto-journaling commands we want to enable the auto journal flow. - void RenableAutoJournal() { - renabled_auto_journal_.store(true, std::memory_order_relaxed); - } - // Prepare a squashed hop on given shards. // Only compatible with multi modes that acquire all locks ahead - global and lock_ahead. void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef enabled); @@ -268,6 +264,12 @@ class Transaction { // Returns the state mask on this shard. Safe only when the transaction is armed (or blocked). uint16_t GetLocalMask(ShardId sid) const; + // If blocking tx was woken up on this shard, get wake key. + std::optional GetWakeKey(ShardId sid) const; + + // Get OpArgs for specific shard + OpArgs GetOpArgs(EngineShard* shard) const; + uint32_t GetLocalTxqPos(ShardId sid) const { return shard_data_[SidToId(sid)].pq_pos; } @@ -311,13 +313,6 @@ class Transaction { bool IsGlobal() const; - // If blocking tx was woken up on this shard, get wake key. - std::optional GetWakeKey(ShardId sid) const; - - OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, this, GetDbContext()}; - } - DbContext GetDbContext() const { return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_}; } @@ -346,6 +341,10 @@ class Transaction { bool multi_commands, bool allow_await) const; void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. + void ReviveAutoJournal(); + + // Clear all state to make transaction re-usable void Refurbish(); // Get keys multi transaction was initialized with, normalized and unique @@ -377,7 +376,7 @@ class Transaction { uint32_t arg_count = 0; // Position in the tx queue. OOO or cancelled schedules remove themselves by this index. - uint32_t pq_pos = TxQueue::kEnd; + TxQueue::Iterator pq_pos = TxQueue::kEnd; // State of shard - bitmask with LocalState flags uint16_t local_mask = 0; @@ -603,8 +602,8 @@ class Transaction { // Stores the full undivided command. CmdArgList full_args_; - // True if NO_AUTOJOURNAL command asked to enable auto journal - std::atomic renabled_auto_journal_ = false; + // Set if a NO_AUTOJOURNAL command asked to enable auto journal again + bool re_enabled_auto_journal_ = false; // Reverse argument mapping for ReverseArgIndex to convert from shard index to original index. std::vector reverse_index_; @@ -661,10 +660,6 @@ template auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f( return res; } -inline uint16_t trans_id(const Transaction* ptr) { - return (intptr_t(ptr) >> 8) & 0xFFFF; -} - OpResult DetermineKeys(const CommandId* cid, CmdArgList args); } // namespace dfly