Update transaction and enable OOO for regular transactions (#769)

* refactor(server): Update ScheduleSingleHop

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-02-09 20:36:55 +03:00 committed by GitHub
parent 41c1ebab18
commit 07973d40eb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 110 deletions

View file

@ -87,16 +87,6 @@ void Journal::UnregisterOnChange(uint32_t id) {
journal_slice.UnregisterOnChange(id);
}
bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) {
if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed))
return false;
// TODO: Handle tx entries.
// journal_slice.AddLogRecord(Entry::Sched(txid));
return true;
}
LSN Journal::GetLsn() const {
return journal_slice.cur_lsn();
}

View file

@ -34,10 +34,6 @@ class Journal {
uint32_t RegisterOnChange(ChangeCallback cb);
void UnregisterOnChange(uint32_t id);
// Returns true if transaction was scheduled, false if journal is inactive
// or in lameduck mode and does not log new transactions.
bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards);
/*
void AddCmd(TxId txid, Op opcode, Span args) {
OpArgs(txid, opcode, args);

View file

@ -429,7 +429,6 @@ void Transaction::ScheduleInternal() {
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
bool span_all = IsGlobal();
bool single_hop = (coordinator_state_ & COORD_EXEC_CONCLUDING);
uint32_t num_shards;
std::function<bool(uint32_t)> is_active;
@ -456,57 +455,45 @@ void Transaction::ScheduleInternal() {
};
}
// Loop until successfully scheduled in all shards.
while (true) {
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
time_now_ms_ = GetCurrentTimeMs();
atomic_uint32_t lock_granted_cnt{0};
atomic_uint32_t success{0};
time_now_ms_ = GetCurrentTimeMs();
auto cb = [&](EngineShard* shard) {
pair<bool, bool> res = ScheduleInShard(shard);
success.fetch_add(res.first, memory_order_relaxed);
lock_granted_cnt.fetch_add(res.second, memory_order_relaxed);
auto [is_success, is_granted] = ScheduleInShard(shard);
success.fetch_add(is_success, memory_order_relaxed);
lock_granted_cnt.fetch_add(is_granted, memory_order_relaxed);
};
shard_set->RunBriefInParallel(std::move(cb), is_active);
if (success.load(memory_order_acquire) == num_shards) {
// We allow out of order execution only for single hop transactions.
// It might be possible to do it for multi-hop transactions as well but currently is
// too complicated to reason about.
if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
// OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we
// refuse to acquire locks for these transactions..
DCHECK(!span_all);
coordinator_state_ |= COORD_SCHED;
// If we granted all locks, we can run out of order.
if (!span_all && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
// Currently we don't support OOO for incremental locking. Sp far they are global.
DCHECK(!(multi_ && multi_->is_expanding));
coordinator_state_ |= COORD_OOO;
}
VLOG(2) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO)
<< " num_shards: " << num_shards;
if (mode == IntentLock::EXCLUSIVE) {
journal::Journal* j = ServerState::tlocal()->journal();
// TODO: we may want to pass custom command name into journal.
if (j && j->SchedStartTx(txid_, 0, num_shards)) {
}
}
coordinator_state_ |= COORD_SCHED;
break;
}
VLOG(2) << "Cancelling " << DebugId();
atomic_bool should_poll_execution{false};
auto cancel = [&](EngineShard* shard) {
bool res = CancelShardCb(shard);
if (res) {
should_poll_execution.store(true, memory_order_relaxed);
}
};
shard_set->RunBriefInParallel(std::move(cancel), is_active);
// We must follow up with PollExecution because in rare cases with multi-trans
@ -549,44 +536,36 @@ void Transaction::MultiData::AddLocks(IntentLock::Mode mode) {
// BLPOP where a data must be read from multiple shards before performing another hop.
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DCHECK(!cb_);
cb_ = std::move(cb);
// single hop -> concluding.
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
DCHECK(multi_ || (coordinator_state_ & COORD_SCHED) == 0); // Only multi schedule in advance.
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude.
if (!multi_) { // for non-multi transactions we schedule exactly once.
DCHECK_EQ(0, coordinator_state_ & COORD_SCHED);
}
bool was_ooo = false;
// If we run only on one shard and conclude, we can avoid scheduling at all
// and directly dispatch the task to its destination shard.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
bool run_eager = false;
if (schedule_fast) { // Single shard (local) optimization.
// We never resize shard_data because that would affect MULTI transaction correctness.
if (schedule_fast) {
DCHECK_EQ(1u, shard_data_.size());
// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
shard_data_[0].local_mask |= ARMED;
// memory_order_release because we do not want it to be reordered with shard_data writes
// above.
// IsArmedInShard() first checks run_count_ before accessing shard_data.
run_count_.fetch_add(1, memory_order_release);
time_now_ms_ = GetCurrentTimeMs();
// Please note that schedule_cb can not update any data on ScheduleSingleHop stack when
// run_fast is false.
// since ScheduleSingleHop can finish before ScheduleUniqueShard returns.
// The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue
// (hence run_fast is false), and then calls PollExecute that in turn runs
// the callback which calls DecreaseRunCnt.
// As a result WaitForShardCallbacks below is unblocked before schedule_cb returns.
// However, if run_fast is true, then we may mutate stack variables, but only
// before DecreaseRunCnt is called.
auto schedule_cb = [&] {
// NOTE: schedule_cb cannot update data on stack when run_fast is false.
// This is because ScheduleSingleHop can finish before the callback returns.
// This happens when ScheduleUniqueShard schedules into TxQueue (hence run_fast is false), and
// then calls PollExecute that in turn runs the callback which calls DecreaseRunCnt. As a result
// WaitForShardCallbacks below is unblocked before schedule_cb returns. However, if run_fast is
// true, then we may mutate stack variables, but only before DecreaseRunCnt is called.
auto schedule_cb = [this, &was_ooo] {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
if (run_fast) {
run_eager = true;
was_ooo = true;
// it's important to DecreaseRunCnt only for run_fast and after run_eager is assigned.
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value
@ -594,17 +573,15 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
CHECK_GE(DecreaseRunCnt(), 1u);
}
};
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
} else {
// Transaction spans multiple shards or it's global (like flushdb) or multi.
// Note that the logic here is a bit different from the public Schedule() function.
if (multi_) {
if (multi_->is_expanding)
multi_->AddLocks(Mode());
} else {
// This transaction either spans multiple shards and/or is multi.
if (!multi_) // Multi schedule in advance.
ScheduleInternal();
}
if (multi_ && multi_->is_expanding)
multi_->AddLocks(Mode());
ExecuteAsync();
}
@ -612,12 +589,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
WaitForShardCallbacks();
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
if (run_eager) {
if (was_ooo)
coordinator_state_ |= COORD_OOO;
}
cb_ = nullptr;
return local_result_;
}
@ -709,19 +685,7 @@ void Transaction::ExecuteAsync() {
// safely.
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
bool is_global = IsGlobal();
if (unique_shard_cnt_ == 1) {
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
if (!is_global && sd.arg_count == 0)
continue;
DCHECK_LT(sd.arg_count, 1u << 15);
sd.local_mask |= ARMED;
}
}
IterateActiveShards([](PerShardData& sd, auto i) { sd.local_mask |= ARMED; });
uint32_t seq = seqlock_.load(memory_order_relaxed);
@ -762,16 +726,7 @@ void Transaction::ExecuteAsync() {
};
// IsArmedInShard is the protector of non-thread safe data.
if (!is_global && unique_shard_cnt_ == 1) {
shard_set->Add(unique_shard_id_, std::move(cb)); // serves as a barrier.
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
if (!is_global && sd.arg_count == 0)
continue;
shard_set->Add(i, cb); // serves as a barrier.
}
}
IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); });
}
void Transaction::RunQuickie(EngineShard* shard) {
@ -811,26 +766,17 @@ void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) {
run_count_.store(unique_shard_cnt_, memory_order_release);
auto expire_cb = [&] {
auto expire_cb = [this, &wcb, should_expire] {
EngineShard* es = EngineShard::tlocal();
ArgSlice wkeys = wcb(this, es);
UnwatchShardCb(wkeys, should_expire, es);
};
if (unique_shard_cnt_ == 1) {
DCHECK_LT(unique_shard_id_, shard_set->size());
shard_set->Add(unique_shard_id_, move(expire_cb));
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
DCHECK_EQ(0, sd.local_mask & ARMED);
if (sd.arg_count == 0)
continue;
shard_set->Add(i, expire_cb);
}
}
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) {
DCHECK_EQ(0, sd.local_mask & ARMED);
shard_set->Add(i, expire_cb);
});
// Wait for all callbacks to conclude.
WaitForShardCallbacks();

View file

@ -341,6 +341,21 @@ class Transaction {
return sid < shard_data_.size() ? sid : 0;
}
// Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones.
template <typename F> void IterateActiveShards(F&& f) {
bool is_global = IsGlobal();
if (unique_shard_cnt_ == 1) {
auto i = unique_shard_id_;
f(shard_data_[SidToId(i)], i);
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
if (auto& sd = shard_data_[i]; is_global || sd.arg_count > 0) {
f(sd, i);
}
}
}
}
private:
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since