chore: little transaction cleanup (#2608)

Make renabled_autojournal a regular bool, simplify CancelShardCb logic

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-03-08 09:50:09 +03:00 committed by GitHub
parent 22e413a00b
commit 292c5bcd71
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 63 additions and 67 deletions

View file

@ -1293,11 +1293,9 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
Transaction* transaction = cntx->transaction;
if (transaction->GetUniqueShardCnt() == 1) {
transaction->ReviveAutoJournal(); // Safe to use RENAME with single shard
auto cb = [&](Transaction* t, EngineShard* shard) {
auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
// Incase of uniqe shard count we can use rename command in replica.
t->RenableAutoJournal();
return ec;
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));

View file

@ -717,13 +717,10 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
OpResult<string> result;
if (cntx->transaction->GetUniqueShardCnt() == 1) {
cntx->transaction->ReviveAutoJournal(); // On single shard we can use the auto journal flow.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto ec = OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
// On single shard we can use the auto journal flow.
t->RenableAutoJournal();
return ec;
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
cntx->transaction->Schedule();

View file

@ -27,6 +27,7 @@ thread_local Transaction::TLTmpSpace Transaction::tmp_space;
namespace {
// Global txid sequence
atomic_uint64_t op_seq{1};
constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction);
@ -90,12 +91,12 @@ std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
return os << ms << "ms";
}
} // namespace
IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
uint16_t trans_id(const Transaction* ptr) {
return (intptr_t(ptr) >> 8) & 0xFFFF;
}
} // namespace
void Transaction::PhasedBarrier::Start(uint32_t count) {
DCHECK_EQ(DEBUG_Count(), 0u);
count_.store(count, memory_order_release);
@ -159,13 +160,6 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {
return cv_status::no_timeout;
}
/**
* @brief Construct a new Transaction:: Transaction object
*
* @param cid
* @param ess
* @param cs
*/
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
InitTxTime();
string_view cmd_name(cid_->name());
@ -1155,6 +1149,16 @@ uint16_t Transaction::GetLocalMask(ShardId sid) const {
return shard_data_[SidToId(sid)].local_mask;
}
IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
OpArgs Transaction::GetOpArgs(EngineShard* shard) const {
DCHECK(IsActive(shard->shard_id()));
DCHECK((multi_ && multi_->role == SQUASHED_STUB) || (run_barrier_.DEBUG_Count() > 0));
return OpArgs{shard, this, GetDbContext()};
}
// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if eagerly executed, false if the callback will be handled by the transaction
@ -1278,35 +1282,31 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
auto pos = sd.pq_pos;
if (pos == TxQueue::kEnd)
TxQueue::Iterator q_pos = exchange(sd.pq_pos, TxQueue::kEnd);
if (q_pos == TxQueue::kEnd) {
DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0);
return false;
sd.pq_pos = TxQueue::kEnd;
}
TxQueue* txq = shard->txq();
TxQueue::Iterator head = txq->Head();
auto val = txq->At(pos);
Transaction* trans = absl::get<Transaction*>(val);
DCHECK(trans == this) << "Pos " << pos << ", txq size " << txq->size() << ", trans " << trans;
txq->Remove(pos);
bool was_head = txq->Head() == q_pos;
Transaction* trans = absl::get<Transaction*>(txq->At(q_pos));
DCHECK(trans == this) << txq->size() << ' ' << sd.pq_pos << ' ' << trans->DebugId();
txq->Remove(q_pos);
if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = LockMode();
auto lock_args = GetLockArgs(shard->shard_id());
DCHECK(lock_args.args.size() > 0);
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
if (IsGlobal()) {
shard->shard_lock()->Release(LockMode());
} else {
auto lock_args = GetLockArgs(shard->shard_id());
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
DCHECK(!lock_args.args.empty());
shard->db_slice().Release(LockMode(), lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
if (pos == head && !txq->Empty()) {
return true;
}
return false;
// Check if we need to poll the next head
return was_head && !txq->Empty();
}
// runs in engine-shard thread.
@ -1541,7 +1541,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
}
// If autojournaling was disabled and not re-enabled, skip it
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed))
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_)
return;
// TODO: Handle complex commands like LMPOP correctly once they are implemented.
@ -1586,6 +1586,12 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
unique_slot_checker_.GetUniqueSlotId(), {}, false);
}
void Transaction::ReviveAutoJournal() {
DCHECK(cid_->opt_mask() & CO::NO_AUTOJOURNAL);
DCHECK_EQ(run_barrier_.DEBUG_Count(), 0u); // Can't be changed while dispatching
re_enabled_auto_journal_ = true;
}
void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
// We're on the owning thread of this transaction, so we can safely access it's data below.
// First, check if it makes sense to proceed.

View file

@ -158,12 +158,13 @@ class Transaction {
enum LocalMask : uint16_t {
ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops)
ARMED = 1 << 1, // Whether its armed (the hop was prepared)
OUT_OF_ORDER =
1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
// Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set
OUT_OF_ORDER = 1 << 2,
// Whether its key locks are acquired, never set for global commands.
KEYLOCK_ACQUIRED = 1 << 3,
SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
};
public:
@ -220,11 +221,6 @@ class Transaction {
// Must be called from coordinator thread.
void CancelBlocking(std::function<OpStatus(ArgSlice)>);
// In some cases for non auto-journaling commands we want to enable the auto journal flow.
void RenableAutoJournal() {
renabled_auto_journal_.store(true, std::memory_order_relaxed);
}
// Prepare a squashed hop on given shards.
// Only compatible with multi modes that acquire all locks ahead - global and lock_ahead.
void PrepareSquashedMultiHop(const CommandId* cid, absl::FunctionRef<bool(ShardId)> enabled);
@ -268,6 +264,12 @@ class Transaction {
// Returns the state mask on this shard. Safe only when the transaction is armed (or blocked).
uint16_t GetLocalMask(ShardId sid) const;
// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;
// Get OpArgs for specific shard
OpArgs GetOpArgs(EngineShard* shard) const;
uint32_t GetLocalTxqPos(ShardId sid) const {
return shard_data_[SidToId(sid)].pq_pos;
}
@ -311,13 +313,6 @@ class Transaction {
bool IsGlobal() const;
// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;
OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, this, GetDbContext()};
}
DbContext GetDbContext() const {
return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_};
}
@ -346,6 +341,10 @@ class Transaction {
bool multi_commands, bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.
void ReviveAutoJournal();
// Clear all state to make transaction re-usable
void Refurbish();
// Get keys multi transaction was initialized with, normalized and unique
@ -377,7 +376,7 @@ class Transaction {
uint32_t arg_count = 0;
// Position in the tx queue. OOO or cancelled schedules remove themselves by this index.
uint32_t pq_pos = TxQueue::kEnd;
TxQueue::Iterator pq_pos = TxQueue::kEnd;
// State of shard - bitmask with LocalState flags
uint16_t local_mask = 0;
@ -603,8 +602,8 @@ class Transaction {
// Stores the full undivided command.
CmdArgList full_args_;
// True if NO_AUTOJOURNAL command asked to enable auto journal
std::atomic<bool> renabled_auto_journal_ = false;
// Set if a NO_AUTOJOURNAL command asked to enable auto journal again
bool re_enabled_auto_journal_ = false;
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;
@ -661,10 +660,6 @@ template <typename F> auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f(
return res;
}
inline uint16_t trans_id(const Transaction* ptr) {
return (intptr_t(ptr) >> 8) & 0xFFFF;
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args);
} // namespace dfly