mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore(transaction): Simplify armed state (#2508)
* chore(transaction): Simplify armed state Remove atomic is_armed variable and turn it into a regular local state flag. This is now possible because we have clearly defined phases with the phased barrier and baton barrier for blocking commands --------- Signed-off-by: Vladislav <vlad@dragonflydb.io>
This commit is contained in:
parent
795d00021d
commit
963023f07c
4 changed files with 58 additions and 62 deletions
|
@ -56,7 +56,7 @@ void BlockingControllerTest::SetUp() {
|
||||||
|
|
||||||
trans_.reset(new Transaction{&cid_});
|
trans_.reset(new Transaction{&cid_});
|
||||||
|
|
||||||
str_vec_.assign({"blpop", "x", "z", "0"});
|
str_vec_.assign({"x", "z", "0"});
|
||||||
for (auto& s : str_vec_) {
|
for (auto& s : str_vec_) {
|
||||||
arg_vec_.emplace_back(s);
|
arg_vec_.emplace_back(s);
|
||||||
}
|
}
|
||||||
|
@ -78,16 +78,16 @@ void BlockingControllerTest::TearDown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(BlockingControllerTest, Basic) {
|
TEST_F(BlockingControllerTest, Basic) {
|
||||||
shard_set->Await(0, [&] {
|
trans_->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) {
|
||||||
EngineShard* shard = EngineShard::tlocal();
|
|
||||||
BlockingController bc(shard);
|
BlockingController bc(shard);
|
||||||
auto keys = trans_->GetShardArgs(shard->shard_id());
|
auto keys = t->GetShardArgs(shard->shard_id());
|
||||||
bc.AddWatched(
|
bc.AddWatched(
|
||||||
keys, [](auto...) { return true; }, trans_.get());
|
keys, [](auto...) { return true; }, t);
|
||||||
EXPECT_EQ(1, bc.NumWatched(0));
|
EXPECT_EQ(1, bc.NumWatched(0));
|
||||||
|
|
||||||
bc.FinalizeWatched(keys, trans_.get());
|
bc.FinalizeWatched(keys, t);
|
||||||
EXPECT_EQ(0, bc.NumWatched(0));
|
EXPECT_EQ(0, bc.NumWatched(0));
|
||||||
|
return OpStatus::OK;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -247,9 +247,7 @@ void BaseFamilyTest::ResetService() {
|
||||||
auto it = head;
|
auto it = head;
|
||||||
do {
|
do {
|
||||||
Transaction* trans = std::get<Transaction*>(es->txq()->At(it));
|
Transaction* trans = std::get<Transaction*>(es->txq()->At(it));
|
||||||
LOG(ERROR) << "Transaction " << trans->DebugId() << " "
|
LOG(ERROR) << "Transaction " << trans->DebugId(es->shard_id());
|
||||||
<< trans->GetLocalMask(es->shard_id()) << " "
|
|
||||||
<< trans->IsArmedInShard(es->shard_id());
|
|
||||||
it = txq->Next(it);
|
it = txq->Next(it);
|
||||||
} while (it != head);
|
} while (it != head);
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,21 +124,21 @@ uint32_t Transaction::PhasedBarrier::DEBUG_Count() const {
|
||||||
return count_.load(memory_order_relaxed);
|
return count_.load(memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Transaction::BatonBarrierrier::IsClaimed() const {
|
bool Transaction::BatonBarrier::IsClaimed() const {
|
||||||
return claimed_.load(memory_order_relaxed);
|
return claimed_.load(memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Transaction::BatonBarrierrier::TryClaim() {
|
bool Transaction::BatonBarrier::TryClaim() {
|
||||||
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
|
return !claimed_.exchange(true, memory_order_relaxed); // false means first means success
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::BatonBarrierrier::Close() {
|
void Transaction::BatonBarrier::Close() {
|
||||||
DCHECK(claimed_.load(memory_order_relaxed));
|
DCHECK(claimed_.load(memory_order_relaxed));
|
||||||
closed_.store(true, memory_order_relaxed);
|
closed_.store(true, memory_order_relaxed);
|
||||||
ec_.notify(); // release
|
ec_.notify(); // release
|
||||||
}
|
}
|
||||||
|
|
||||||
cv_status Transaction::BatonBarrierrier::Wait(time_point tp) {
|
cv_status Transaction::BatonBarrier::Wait(time_point tp) {
|
||||||
auto cb = [this] { return closed_.load(memory_order_acquire); };
|
auto cb = [this] { return closed_.load(memory_order_acquire); };
|
||||||
|
|
||||||
if (tp != time_point::max()) {
|
if (tp != time_point::max()) {
|
||||||
|
@ -407,7 +407,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
||||||
for (const auto& sd : shard_data_) {
|
for (const auto& sd : shard_data_) {
|
||||||
// sd.local_mask may be non-zero for multi transactions with instant locking.
|
// sd.local_mask may be non-zero for multi transactions with instant locking.
|
||||||
// Specifically EVALs may maintain state between calls.
|
// Specifically EVALs may maintain state between calls.
|
||||||
DCHECK(!sd.is_armed.load(std::memory_order_relaxed));
|
DCHECK_EQ(sd.local_mask & ARMED, 0);
|
||||||
if (!multi_) {
|
if (!multi_) {
|
||||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||||
}
|
}
|
||||||
|
@ -533,7 +533,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
||||||
DCHECK(IsAtomicMulti()); // Every command determines it's own active shards
|
DCHECK(IsAtomicMulti()); // Every command determines it's own active shards
|
||||||
sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED
|
sd.local_mask &= ~ACTIVE; // so remove ACTIVE flags, but keep KEYLOCK_ACQUIRED
|
||||||
}
|
}
|
||||||
DCHECK(!sd.is_armed.load(memory_order_relaxed));
|
DCHECK_EQ(sd.local_mask & ARMED, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (multi_->mode == NON_ATOMIC) {
|
if (multi_->mode == NON_ATOMIC) {
|
||||||
|
@ -547,13 +547,18 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
||||||
multi_->role = DEFAULT;
|
multi_->role = DEFAULT;
|
||||||
}
|
}
|
||||||
|
|
||||||
string Transaction::DebugId() const {
|
string Transaction::DebugId(std::optional<ShardId> sid) const {
|
||||||
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
|
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
|
||||||
string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_);
|
string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_);
|
||||||
if (multi_) {
|
if (multi_) {
|
||||||
absl::StrAppend(&res, ":", multi_->cmd_seq_num);
|
absl::StrAppend(&res, ":", multi_->cmd_seq_num);
|
||||||
}
|
}
|
||||||
absl::StrAppend(&res, " (", trans_id(this), ")");
|
absl::StrAppend(&res, " {id=", trans_id(this));
|
||||||
|
if (sid) {
|
||||||
|
absl::StrAppend(&res, ",mask[", *sid, "]=", int(shard_data_[SidToId(*sid)].local_mask),
|
||||||
|
",txqpos[]=", shard_data_[SidToId(*sid)].pq_pos);
|
||||||
|
}
|
||||||
|
absl::StrAppend(&res, "}");
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -573,16 +578,13 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
DCHECK_GT(txid_, 0u);
|
DCHECK_GT(txid_, 0u);
|
||||||
CHECK(cb_ptr_) << DebugId();
|
CHECK(cb_ptr_) << DebugId();
|
||||||
|
|
||||||
// Unlike with regular transactions we do not acquire locks upon scheduling
|
|
||||||
// because Scheduling is done before multi-exec batch is executed. Therefore we
|
|
||||||
// lock keys right before the execution of each statement.
|
|
||||||
|
|
||||||
unsigned idx = SidToId(shard->shard_id());
|
unsigned idx = SidToId(shard->shard_id());
|
||||||
auto& sd = shard_data_[idx];
|
auto& sd = shard_data_[idx];
|
||||||
|
|
||||||
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
|
CHECK(sd.local_mask & ARMED);
|
||||||
CHECK_GT(run_barrier_.DEBUG_Count(), 0u);
|
sd.local_mask &= ~ARMED;
|
||||||
|
|
||||||
|
CHECK_GT(run_barrier_.DEBUG_Count(), 0u);
|
||||||
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
|
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
|
||||||
|
|
||||||
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
||||||
|
@ -799,7 +801,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
|
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
|
||||||
|
|
||||||
InitTxTime();
|
InitTxTime();
|
||||||
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
|
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
|
||||||
|
|
||||||
// Start new phase, be careful with writes until phase end!
|
// Start new phase, be careful with writes until phase end!
|
||||||
run_barrier_.Start(1);
|
run_barrier_.Start(1);
|
||||||
|
@ -931,7 +933,7 @@ void Transaction::ExecuteAsync() {
|
||||||
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
|
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
|
||||||
|
|
||||||
// Set armed flags on all active shards
|
// Set armed flags on all active shards
|
||||||
IterateActiveShards([](auto& sd, auto i) { sd.is_armed.store(true, memory_order_relaxed); });
|
IterateActiveShards([](auto& sd, auto i) { sd.local_mask |= ARMED; });
|
||||||
|
|
||||||
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
|
// Start new phase: release semantics. From here we can be discovered by IsArmedInShard(),
|
||||||
// and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end!
|
// and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end!
|
||||||
|
@ -1010,7 +1012,8 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
|
||||||
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
|
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
|
||||||
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
|
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
|
||||||
|
|
||||||
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
|
CHECK(sd.local_mask & ARMED);
|
||||||
|
sd.local_mask &= ~ARMED;
|
||||||
|
|
||||||
// Calling the callback in somewhat safe way
|
// Calling the callback in somewhat safe way
|
||||||
RunnableResult result;
|
RunnableResult result;
|
||||||
|
@ -1071,6 +1074,11 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Transaction::IsArmedInShard(ShardId sid) const {
|
||||||
|
// Barrier has acquire semantics
|
||||||
|
return run_barrier_.Active() && (shard_data_[SidToId(sid)].local_mask & ARMED);
|
||||||
|
}
|
||||||
|
|
||||||
bool Transaction::IsActive(ShardId sid) const {
|
bool Transaction::IsActive(ShardId sid) const {
|
||||||
// If we have only one shard, we often don't store infromation about all shards, so determine it
|
// If we have only one shard, we often don't store infromation about all shards, so determine it
|
||||||
// solely by id
|
// solely by id
|
||||||
|
@ -1083,6 +1091,12 @@ bool Transaction::IsActive(ShardId sid) const {
|
||||||
return shard_data_[SidToId(sid)].local_mask & ACTIVE;
|
return shard_data_[SidToId(sid)].local_mask & ACTIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint16_t Transaction::GetLocalMask(ShardId sid) const {
|
||||||
|
DCHECK(IsActive(sid));
|
||||||
|
DCHECK_GT(run_barrier_.DEBUG_Count(), 0u);
|
||||||
|
return shard_data_[SidToId(sid)].local_mask;
|
||||||
|
}
|
||||||
|
|
||||||
// Runs within a engine shard thread.
|
// Runs within a engine shard thread.
|
||||||
// Optimized path that schedules and runs transactions out of order if possible.
|
// Optimized path that schedules and runs transactions out of order if possible.
|
||||||
// Returns true if eagerly executed, false if the callback will be handled by the transaction
|
// Returns true if eagerly executed, false if the callback will be handled by the transaction
|
||||||
|
@ -1114,7 +1128,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
|
||||||
|
|
||||||
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
|
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
|
||||||
// If we want to run again, we have to actually schedule this transaction
|
// If we want to run again, we have to actually schedule this transaction
|
||||||
DCHECK_EQ(sd.is_armed, false);
|
DCHECK_EQ(sd.local_mask & ARMED, 0);
|
||||||
continue_scheduling = true;
|
continue_scheduling = true;
|
||||||
} else {
|
} else {
|
||||||
LogAutoJournalOnShard(shard, result);
|
LogAutoJournalOnShard(shard, result);
|
||||||
|
|
|
@ -156,11 +156,12 @@ class Transaction {
|
||||||
|
|
||||||
// State on specific shard.
|
// State on specific shard.
|
||||||
enum LocalMask : uint16_t {
|
enum LocalMask : uint16_t {
|
||||||
ACTIVE = 1, // Set on all active shards.
|
ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops)
|
||||||
|
ARMED = 1 << 1, // Whether its armed (the hop was prepared)
|
||||||
OUT_OF_ORDER =
|
OUT_OF_ORDER =
|
||||||
1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED is not set.
|
1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set
|
||||||
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
|
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
|
||||||
SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard())
|
SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard())
|
||||||
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
|
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
|
||||||
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
|
UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb
|
||||||
};
|
};
|
||||||
|
@ -253,23 +254,15 @@ class Transaction {
|
||||||
// Runs in the shard thread.
|
// Runs in the shard thread.
|
||||||
KeyLockArgs GetLockArgs(ShardId sid) const;
|
KeyLockArgs GetLockArgs(ShardId sid) const;
|
||||||
|
|
||||||
// Returns true if the transaction spans this shard_id.
|
|
||||||
bool IsActive(ShardId shard_id) const;
|
|
||||||
|
|
||||||
// Returns true if the transaction is waiting for shard callbacks and the shard is armed.
|
// Returns true if the transaction is waiting for shard callbacks and the shard is armed.
|
||||||
// Safe to read transaction state (and update shard local) until following RunInShard() finishes.
|
// Safe to read transaction state (and update shard local) until following RunInShard() finishes.
|
||||||
bool IsArmedInShard(ShardId sid) const {
|
bool IsArmedInShard(ShardId sid) const;
|
||||||
if (sid >= shard_data_.size()) // For multi transactions shard_data_ spans all shards.
|
|
||||||
sid = 0;
|
|
||||||
|
|
||||||
// Barrier has acquire semantics
|
// Returns if the transaction spans this shard. Safe only when the transaction is armed.
|
||||||
return run_barrier_.Active() && shard_data_[sid].is_armed.load(std::memory_order_relaxed);
|
bool IsActive(ShardId sid) const;
|
||||||
}
|
|
||||||
|
|
||||||
// Called from engine set shard threads.
|
// Returns the state mask on this shard. Safe only when the transaction is armed (or blocked).
|
||||||
uint16_t GetLocalMask(ShardId sid) const {
|
uint16_t GetLocalMask(ShardId sid) const;
|
||||||
return shard_data_[SidToId(sid)].local_mask;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint32_t GetLocalTxqPos(ShardId sid) const {
|
uint32_t GetLocalTxqPos(ShardId sid) const {
|
||||||
return shard_data_[SidToId(sid)].pq_pos;
|
return shard_data_[SidToId(sid)].pq_pos;
|
||||||
|
@ -325,7 +318,8 @@ class Transaction {
|
||||||
return cid_;
|
return cid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string DebugId() const;
|
// Return debug information about a transaction, include shard local info if passed
|
||||||
|
std::string DebugId(std::optional<ShardId> sid = std::nullopt) const;
|
||||||
|
|
||||||
// Prepares for running ScheduleSingleHop() for a single-shard multi tx.
|
// Prepares for running ScheduleSingleHop() for a single-shard multi tx.
|
||||||
// It is safe to call ScheduleSingleHop() after calling this method, but the callback passed
|
// It is safe to call ScheduleSingleHop() after calling this method, but the callback passed
|
||||||
|
@ -364,30 +358,20 @@ class Transaction {
|
||||||
};
|
};
|
||||||
|
|
||||||
struct alignas(64) PerShardData {
|
struct alignas(64) PerShardData {
|
||||||
PerShardData(PerShardData&&) noexcept {
|
uint32_t arg_start = 0; // Subspan in kv_args_ with local arguments.
|
||||||
}
|
|
||||||
|
|
||||||
PerShardData() = default;
|
|
||||||
|
|
||||||
// this is the only variable that is accessed by both shard and coordinator threads.
|
|
||||||
std::atomic_bool is_armed{false};
|
|
||||||
|
|
||||||
// We pad with some memory so that atomic loads won't cause false sharing between threads.
|
|
||||||
char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline.
|
|
||||||
|
|
||||||
uint32_t arg_start = 0; // Indices into args_ array.
|
|
||||||
uint32_t arg_count = 0;
|
uint32_t arg_count = 0;
|
||||||
|
|
||||||
// Needed to rollback inconsistent schedulings or remove OOO transactions from
|
// Position in the tx queue. OOO or cancelled schedules remove themselves by this index.
|
||||||
// tx queue.
|
|
||||||
uint32_t pq_pos = TxQueue::kEnd;
|
uint32_t pq_pos = TxQueue::kEnd;
|
||||||
|
|
||||||
// Accessed within shard thread.
|
// State of shard - bitmask with LocalState flags
|
||||||
// Bitmask of LocalMask enums.
|
|
||||||
uint16_t local_mask = 0;
|
uint16_t local_mask = 0;
|
||||||
|
|
||||||
// Index of key relative to args in shard that the shard was woken up after blocking wait.
|
// Index of key relative to args in shard that the shard was woken up after blocking wait.
|
||||||
uint16_t wake_key_pos = UINT16_MAX;
|
uint16_t wake_key_pos = UINT16_MAX;
|
||||||
|
|
||||||
|
// Prevent "false sharing" between cache lines: occupy a full cache line (64 bytes)
|
||||||
|
char pad[64 - 4 * sizeof(uint32_t)];
|
||||||
};
|
};
|
||||||
|
|
||||||
static_assert(sizeof(PerShardData) == 64); // cacheline
|
static_assert(sizeof(PerShardData) == 64); // cacheline
|
||||||
|
@ -449,7 +433,7 @@ class Transaction {
|
||||||
// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
|
// "Single claim - single modification" barrier. Multiple threads might try to claim it, only one
|
||||||
// will succeed and will be allowed to modify the guarded object until it closes the barrier.
|
// will succeed and will be allowed to modify the guarded object until it closes the barrier.
|
||||||
// A closed barrier can't be claimed again or re-used in any way.
|
// A closed barrier can't be claimed again or re-used in any way.
|
||||||
class BatonBarrierrier {
|
class BatonBarrier {
|
||||||
public:
|
public:
|
||||||
bool IsClaimed() const; // Return if barrier is claimed, only for peeking
|
bool IsClaimed() const; // Return if barrier is claimed, only for peeking
|
||||||
bool TryClaim(); // Return if the barrier was claimed successfully
|
bool TryClaim(); // Return if the barrier was claimed successfully
|
||||||
|
@ -616,7 +600,7 @@ class Transaction {
|
||||||
UniqueSlotChecker unique_slot_checker_;
|
UniqueSlotChecker unique_slot_checker_;
|
||||||
|
|
||||||
// Barrier for waking blocking transactions that ensures exclusivity of waking operation.
|
// Barrier for waking blocking transactions that ensures exclusivity of waking operation.
|
||||||
BatonBarrierrier blocking_barrier_{};
|
BatonBarrier blocking_barrier_{};
|
||||||
|
|
||||||
// Transaction coordinator state, written and read by coordinator thread.
|
// Transaction coordinator state, written and read by coordinator thread.
|
||||||
uint8_t coordinator_state_ = 0;
|
uint8_t coordinator_state_ = 0;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue