From 07973d40eb4a3f1168876586d0029734115c5b25 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 9 Feb 2023 20:36:55 +0300 Subject: [PATCH] Update transaction and enable OOO for regular transactions (#769) * refactor(server): Update ScheduleSingleHop --------- Signed-off-by: Vladislav Oleshko --- src/server/journal/journal.cc | 10 --- src/server/journal/journal.h | 4 - src/server/transaction.cc | 138 +++++++++++----------------------- src/server/transaction.h | 15 ++++ 4 files changed, 57 insertions(+), 110 deletions(-) diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 0acf23b9b..78e868b18 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -87,16 +87,6 @@ void Journal::UnregisterOnChange(uint32_t id) { journal_slice.UnregisterOnChange(id); } -bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { - if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed)) - return false; - - // TODO: Handle tx entries. - // journal_slice.AddLogRecord(Entry::Sched(txid)); - - return true; -} - LSN Journal::GetLsn() const { return journal_slice.cur_lsn(); } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 9d75a56ab..1002cdf7b 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -34,10 +34,6 @@ class Journal { uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t id); - // Returns true if transaction was scheduled, false if journal is inactive - // or in lameduck mode and does not log new transactions. - bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards); - /* void AddCmd(TxId txid, Op opcode, Span args) { OpArgs(txid, opcode, args); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f275a191d..1985ec163 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -429,7 +429,6 @@ void Transaction::ScheduleInternal() { DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); bool span_all = IsGlobal(); - bool single_hop = (coordinator_state_ & COORD_EXEC_CONCLUDING); uint32_t num_shards; std::function is_active; @@ -456,57 +455,45 @@ void Transaction::ScheduleInternal() { }; } + // Loop until successfully scheduled in all shards. while (true) { txid_ = op_seq.fetch_add(1, memory_order_relaxed); + time_now_ms_ = GetCurrentTimeMs(); atomic_uint32_t lock_granted_cnt{0}; atomic_uint32_t success{0}; - time_now_ms_ = GetCurrentTimeMs(); - auto cb = [&](EngineShard* shard) { - pair res = ScheduleInShard(shard); - success.fetch_add(res.first, memory_order_relaxed); - lock_granted_cnt.fetch_add(res.second, memory_order_relaxed); + auto [is_success, is_granted] = ScheduleInShard(shard); + success.fetch_add(is_success, memory_order_relaxed); + lock_granted_cnt.fetch_add(is_granted, memory_order_relaxed); }; - shard_set->RunBriefInParallel(std::move(cb), is_active); if (success.load(memory_order_acquire) == num_shards) { - // We allow out of order execution only for single hop transactions. - // It might be possible to do it for multi-hop transactions as well but currently is - // too complicated to reason about. - if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { - // OOO can not happen with span-all transactions. We ensure it in ScheduleInShard when we - // refuse to acquire locks for these transactions.. - DCHECK(!span_all); + coordinator_state_ |= COORD_SCHED; + // If we granted all locks, we can run out of order. + if (!span_all && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + // Currently we don't support OOO for incremental locking. Sp far they are global. + DCHECK(!(multi_ && multi_->is_expanding)); coordinator_state_ |= COORD_OOO; } VLOG(2) << "Scheduled " << DebugId() << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) << " num_shards: " << num_shards; - if (mode == IntentLock::EXCLUSIVE) { - journal::Journal* j = ServerState::tlocal()->journal(); - // TODO: we may want to pass custom command name into journal. - if (j && j->SchedStartTx(txid_, 0, num_shards)) { - } - } - coordinator_state_ |= COORD_SCHED; break; } VLOG(2) << "Cancelling " << DebugId(); atomic_bool should_poll_execution{false}; - auto cancel = [&](EngineShard* shard) { bool res = CancelShardCb(shard); if (res) { should_poll_execution.store(true, memory_order_relaxed); } }; - shard_set->RunBriefInParallel(std::move(cancel), is_active); // We must follow up with PollExecution because in rare cases with multi-trans @@ -549,44 +536,36 @@ void Transaction::MultiData::AddLocks(IntentLock::Mode mode) { // BLPOP where a data must be read from multiple shards before performing another hop. OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK(!cb_); - cb_ = std::move(cb); - // single hop -> concluding. - coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); + DCHECK(multi_ || (coordinator_state_ & COORD_SCHED) == 0); // Only multi schedule in advance. + coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. - if (!multi_) { // for non-multi transactions we schedule exactly once. - DCHECK_EQ(0, coordinator_state_ & COORD_SCHED); - } + bool was_ooo = false; + // If we run only on one shard and conclude, we can avoid scheduling at all + // and directly dispatch the task to its destination shard. bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_; - bool run_eager = false; - - if (schedule_fast) { // Single shard (local) optimization. - // We never resize shard_data because that would affect MULTI transaction correctness. + if (schedule_fast) { DCHECK_EQ(1u, shard_data_.size()); + // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. shard_data_[0].local_mask |= ARMED; - - // memory_order_release because we do not want it to be reordered with shard_data writes - // above. - // IsArmedInShard() first checks run_count_ before accessing shard_data. run_count_.fetch_add(1, memory_order_release); + time_now_ms_ = GetCurrentTimeMs(); - // Please note that schedule_cb can not update any data on ScheduleSingleHop stack when - // run_fast is false. - // since ScheduleSingleHop can finish before ScheduleUniqueShard returns. - // The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue - // (hence run_fast is false), and then calls PollExecute that in turn runs - // the callback which calls DecreaseRunCnt. - // As a result WaitForShardCallbacks below is unblocked before schedule_cb returns. - // However, if run_fast is true, then we may mutate stack variables, but only - // before DecreaseRunCnt is called. - auto schedule_cb = [&] { + // NOTE: schedule_cb cannot update data on stack when run_fast is false. + // This is because ScheduleSingleHop can finish before the callback returns. + + // This happens when ScheduleUniqueShard schedules into TxQueue (hence run_fast is false), and + // then calls PollExecute that in turn runs the callback which calls DecreaseRunCnt. As a result + // WaitForShardCallbacks below is unblocked before schedule_cb returns. However, if run_fast is + // true, then we may mutate stack variables, but only before DecreaseRunCnt is called. + auto schedule_cb = [this, &was_ooo] { bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); if (run_fast) { - run_eager = true; + was_ooo = true; // it's important to DecreaseRunCnt only for run_fast and after run_eager is assigned. // If DecreaseRunCnt were called before ScheduleUniqueShard finishes // then WaitForShardCallbacks below could exit before schedule_cb assigns return value @@ -594,17 +573,15 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { CHECK_GE(DecreaseRunCnt(), 1u); } }; - shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. } else { - // Transaction spans multiple shards or it's global (like flushdb) or multi. - // Note that the logic here is a bit different from the public Schedule() function. - if (multi_) { - if (multi_->is_expanding) - multi_->AddLocks(Mode()); - } else { + // This transaction either spans multiple shards and/or is multi. + + if (!multi_) // Multi schedule in advance. ScheduleInternal(); - } + + if (multi_ && multi_->is_expanding) + multi_->AddLocks(Mode()); ExecuteAsync(); } @@ -612,12 +589,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); WaitForShardCallbacks(); DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId(); - if (run_eager) { + + if (was_ooo) coordinator_state_ |= COORD_OOO; - } cb_ = nullptr; - return local_result_; } @@ -709,19 +685,7 @@ void Transaction::ExecuteAsync() { // safely. use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); - bool is_global = IsGlobal(); - - if (unique_shard_cnt_ == 1) { - shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; - } else { - for (ShardId i = 0; i < shard_data_.size(); ++i) { - auto& sd = shard_data_[i]; - if (!is_global && sd.arg_count == 0) - continue; - DCHECK_LT(sd.arg_count, 1u << 15); - sd.local_mask |= ARMED; - } - } + IterateActiveShards([](PerShardData& sd, auto i) { sd.local_mask |= ARMED; }); uint32_t seq = seqlock_.load(memory_order_relaxed); @@ -762,16 +726,7 @@ void Transaction::ExecuteAsync() { }; // IsArmedInShard is the protector of non-thread safe data. - if (!is_global && unique_shard_cnt_ == 1) { - shard_set->Add(unique_shard_id_, std::move(cb)); // serves as a barrier. - } else { - for (ShardId i = 0; i < shard_data_.size(); ++i) { - auto& sd = shard_data_[i]; - if (!is_global && sd.arg_count == 0) - continue; - shard_set->Add(i, cb); // serves as a barrier. - } - } + IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); }); } void Transaction::RunQuickie(EngineShard* shard) { @@ -811,26 +766,17 @@ void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) { run_count_.store(unique_shard_cnt_, memory_order_release); - auto expire_cb = [&] { + auto expire_cb = [this, &wcb, should_expire] { EngineShard* es = EngineShard::tlocal(); ArgSlice wkeys = wcb(this, es); UnwatchShardCb(wkeys, should_expire, es); }; - if (unique_shard_cnt_ == 1) { - DCHECK_LT(unique_shard_id_, shard_set->size()); - shard_set->Add(unique_shard_id_, move(expire_cb)); - } else { - for (ShardId i = 0; i < shard_data_.size(); ++i) { - auto& sd = shard_data_[i]; - DCHECK_EQ(0, sd.local_mask & ARMED); - if (sd.arg_count == 0) - continue; - - shard_set->Add(i, expire_cb); - } - } + IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { + DCHECK_EQ(0, sd.local_mask & ARMED); + shard_set->Add(i, expire_cb); + }); // Wait for all callbacks to conclude. WaitForShardCallbacks(); diff --git a/src/server/transaction.h b/src/server/transaction.h index efd457cd7..c4f1f223a 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -341,6 +341,21 @@ class Transaction { return sid < shard_data_.size() ? sid : 0; } + // Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones. + template void IterateActiveShards(F&& f) { + bool is_global = IsGlobal(); + if (unique_shard_cnt_ == 1) { + auto i = unique_shard_id_; + f(shard_data_[SidToId(i)], i); + } else { + for (ShardId i = 0; i < shard_data_.size(); ++i) { + if (auto& sd = shard_data_[i]; is_global || sd.arg_count > 0) { + f(sd, i); + } + } + } + } + private: // shard_data spans all the shards in ess_. // I wish we could use a dense array of size [0..uniq_shards] but since