From 07a6dc0712324e74cf311a8eb97a99e9cf9e30fe Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 22 Jan 2024 10:38:10 +0300 Subject: [PATCH] feat(transaction): Independent out of order execution (#2426) Previously, transactions would run out of order only when all shards determined that the keys locks were free. With this change, each shard might decide to run out of order independently if the locks are free. COORD_OOO is now deprecated and the OUT_OF_ORDER per-shard flag should is used to indicate it Signed-off-by: Vladislav Oleshko --- src/server/conn_context.h | 1 - src/server/engine_shard_set.cc | 8 +++-- src/server/engine_shard_set.h | 1 + src/server/main_service.cc | 3 -- src/server/multi_test.cc | 49 +++++++++++++++++++++++++-- src/server/server_family.cc | 11 +++--- src/server/server_state.cc | 2 +- src/server/server_state.h | 4 +-- src/server/transaction.cc | 62 ++++++++++++++-------------------- src/server/transaction.h | 26 +++++--------- 10 files changed, 96 insertions(+), 71 deletions(-) diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 33fe6802e..ea6b86bde 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -173,7 +173,6 @@ class ConnectionContext : public facade::ConnectionContext { struct DebugInfo { uint32_t shards_count = 0; TxClock clock = 0; - bool is_ooo = false; // number of commands in the last exec body. unsigned exec_body_len = 0; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 614c46f10..6e5f3c433 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -208,12 +208,13 @@ EngineShardSet* shard_set = nullptr; uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 32); + static_assert(sizeof(Stats) == 40); defrag_attempt_total += o.defrag_attempt_total; defrag_realloc_total += o.defrag_realloc_total; defrag_task_invocation_total += o.defrag_task_invocation_total; poll_execution_total += o.poll_execution_total; + tx_ooo_total += o.tx_ooo_total; return *this; } @@ -553,7 +554,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DCHECK(trans != head); dbg_id.clear(); - if (VLOG_IS_ON(1)) { dbg_id = trans->DebugId(); } @@ -561,6 +561,10 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER; bool keep = trans->RunInShard(this, txq_ooo); + if (txq_ooo && !keep) { + stats_.tx_ooo_total++; + } + // If the transaction concluded, it must remove itself from the tx queue. // Otherwise it is required to stay there to keep the relative order. if (txq_ooo && !trans->IsMulti()) diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index a52007a34..0007f47b6 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -41,6 +41,7 @@ class EngineShard { uint64_t defrag_realloc_total = 0; uint64_t defrag_task_invocation_total = 0; uint64_t poll_execution_total = 0; + uint64_t tx_ooo_total = 0; Stats& operator+=(const Stats&); }; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 75857bdb3..26cbc94b7 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1240,10 +1240,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() && cntx->conn_state.script_info == nullptr) { - bool is_ooo = cntx->transaction->IsOOO(); - cntx->last_command_debug.clock = cntx->transaction->txid(); - cntx->last_command_debug.is_ooo = is_ooo; } return true; diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 6a1994167..e66b53ecf 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -235,6 +235,51 @@ TEST_F(MultiTest, MultiConsistent) { ASSERT_FALSE(service_->IsShardSetLocked()); } +TEST_F(MultiTest, MultiConsistent2) { + if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode == Transaction::NON_ATOMIC) { + GTEST_SKIP() << "Skipped MultiConsistent2 test because multi_exec_mode is non atomic"; + return; + } + + const int kKeyCount = 50; + const int kRuns = 50; + const int kJobs = 20; + + vector all_keys(kKeyCount); + for (size_t i = 0; i < kKeyCount; i++) + all_keys[i] = absl::StrCat("key", i); + + auto cb = [&](string id) { + for (size_t r = 0; r < kRuns; r++) { + size_t num_keys = (rand() % 5) + 1; + set keys; + for (size_t i = 0; i < num_keys; i++) + keys.insert(all_keys[rand() % kKeyCount]); + + Run(id, {"MULTI"}); + for (auto key : keys) + Run(id, {"INCR", key}); + for (auto key : keys) + Run(id, {"DECR", key}); + auto resp = Run(id, {"EXEC"}); + + ASSERT_EQ(resp.GetVec().size(), keys.size() * 2); + for (size_t i = 0; i < keys.size(); i++) { + EXPECT_EQ(resp.GetVec()[i].GetInt(), optional(1)); + EXPECT_EQ(resp.GetVec()[i + keys.size()].GetInt(), optional(0)); + } + } + }; + + vector fbs(kJobs); + for (size_t i = 0; i < kJobs; i++) { + fbs[i] = pp_->at(i % pp_->size())->LaunchFiber([i, cb]() { cb(absl::StrCat("worker", i)); }); + } + + for (auto& fb : fbs) + fb.Join(); +} + TEST_F(MultiTest, MultiRename) { RespExpr resp = Run({"mget", kKey1, kKey4}); ASSERT_EQ(1, GetDebugInfo().shards_count); @@ -511,9 +556,9 @@ TEST_F(MultiTest, MultiOOO) { // OOO works in LOCK_AHEAD mode. int mode = absl::GetFlag(FLAGS_multi_exec_mode); if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC) - EXPECT_EQ(200, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]); + EXPECT_EQ(200, metrics.shard_stats.tx_ooo_total); else - EXPECT_EQ(0, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]); + EXPECT_EQ(0, metrics.shard_stats.tx_ooo_total); } // Lua scripts lock their keys ahead and thus can run out of order. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 17ceadc7e..acaf1cf41 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1163,8 +1163,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { AppendMetricHeader("transaction_types_total", "Transaction counts by their types", MetricType::COUNTER, &resp->body()); - const char* kTxTypeNames[ServerState::NUM_TX_TYPES] = {"global", "normal", "ooo", "quick", - "inline"}; + const char* kTxTypeNames[ServerState::NUM_TX_TYPES] = {"global", "normal", "quick", "inline"}; for (unsigned type = 0; type < ServerState::NUM_TX_TYPES; ++type) { if (tc[type] > 0) { AppendMetricValue("transaction_types_total", tc[type], {"type"}, {kTxTypeNames[type]}, @@ -1917,8 +1916,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { if (should_enter("TRANSACTION", true)) { const auto& tc = m.coordinator_stats.tx_type_cnt; string val = StrCat("global=", tc[ServerState::GLOBAL], ",normal=", tc[ServerState::NORMAL], - ",ooo=", tc[ServerState::OOO], ",quick=", tc[ServerState::QUICK], - ",inline=", tc[ServerState::INLINE]); + ",quick=", tc[ServerState::QUICK], ",inline=", tc[ServerState::INLINE]); append("tx_type_cnt", val); val.clear(); for (unsigned width = 0; width < shard_set->size(); ++width) { @@ -1931,12 +1929,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { val.pop_back(); // last comma. append("tx_width_freq", val); } + append("tx_shard_ooo_total", m.shard_stats.tx_ooo_total); + append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); + append("tx_queue_len", m.tx_queue_len); append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt); append("eval_shardlocal_coordination_total", m.coordinator_stats.eval_shardlocal_coordination_cnt); append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); - append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); - append("tx_queue_len", m.tx_queue_len); append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions); append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec); append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index b12ca7aa7..fa9404813 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -48,7 +48,7 @@ auto ServerState::Stats::operator=(Stats&& other) -> Stats& { } ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 13 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 12 * 8, "Stats size mismatch"); for (int i = 0; i < NUM_TX_TYPES; ++i) { this->tx_type_cnt[i] += other.tx_type_cnt[i]; diff --git a/src/server/server_state.h b/src/server/server_state.h index 4b6c52f70..abf6d196a 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -93,15 +93,15 @@ class ServerState { // public struct - to allow initialization. void operator=(const ServerState&) = delete; public: - enum TxType { GLOBAL, NORMAL, OOO, QUICK, INLINE, NUM_TX_TYPES }; + enum TxType { GLOBAL, NORMAL, QUICK, INLINE, NUM_TX_TYPES }; struct Stats { std::array tx_type_cnt; + uint64_t tx_schedule_cancel_cnt = 0; uint64_t eval_io_coordination_cnt = 0; uint64_t eval_shardlocal_coordination_cnt = 0; uint64_t eval_squashed_flushes = 0; - uint64_t tx_schedule_cancel_cnt = 0; uint64_t multi_squash_executions = 0; uint64_t multi_squash_exec_hop_usec = 0; uint64_t multi_squash_exec_reply_usec = 0; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 9bf1dc5a8..655030fba 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -66,8 +66,6 @@ void RecordTxScheduleStats(const Transaction* tx) { ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]++; if (tx->IsGlobal()) { ss->stats.tx_type_cnt[ServerState::GLOBAL]++; - } else if (tx->IsOOO()) { - ss->stats.tx_type_cnt[ServerState::OOO]++; } else { ss->stats.tx_type_cnt[ServerState::NORMAL]++; } @@ -623,7 +621,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { void Transaction::ScheduleInternal() { DCHECK(!shard_data_.empty()); DCHECK_EQ(0u, txid_); - DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); + DCHECK_EQ(0, coordinator_state_ & COORD_SCHED); DCHECK_GT(unique_shard_cnt_, 0u); DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards"; @@ -635,27 +633,19 @@ void Transaction::ScheduleInternal() { txid_ = op_seq.fetch_add(1, memory_order_relaxed); time_now_ms_ = GetCurrentTimeMs(); - atomic_uint32_t lock_granted_cnt{0}; - atomic_uint32_t success{0}; - - auto cb = [&](EngineShard* shard) { - auto [is_success, is_granted] = ScheduleInShard(shard); - success.fetch_add(is_success, memory_order_relaxed); - lock_granted_cnt.fetch_add(is_granted, memory_order_relaxed); + atomic_uint32_t schedule_fails = 0; + auto cb = [this, &schedule_fails](EngineShard* shard) { + if (!ScheduleInShard(shard)) { + schedule_fails.fetch_add(1, memory_order_relaxed); + } }; shard_set->RunBriefInParallel(std::move(cb), is_active); - if (success.load(memory_order_acquire) == unique_shard_cnt_) { + if (schedule_fails.load(memory_order_relaxed) == 0) { coordinator_state_ |= COORD_SCHED; - if (lock_granted_cnt.load(memory_order_relaxed) == unique_shard_cnt_) { - coordinator_state_ |= COORD_OOO; // If we granted all locks, we can run out of order. - } RecordTxScheduleStats(this); - VLOG(2) << "Scheduled " << DebugId() - << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) - << " num_shards: " << unique_shard_cnt_; - + VLOG(2) << "Scheduled " << DebugId() << " num_shards: " << unique_shard_cnt_; break; } @@ -689,12 +679,6 @@ void Transaction::ScheduleInternal() { } } } - - if (IsOOO()) { - for (auto& sd : shard_data_) { - sd.local_mask |= OUT_OF_ORDER; - } - } } // Optimized "Schedule and execute" function for the most common use-case of a single hop @@ -775,7 +759,6 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { if (schedule_fast) { CHECK(!cb_ptr_); // we should have reset it within the callback. if (was_ooo) { - coordinator_state_ |= COORD_OOO; ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++; } else { ss->stats.tx_type_cnt[ServerState::NORMAL]++; @@ -1120,44 +1103,49 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { } // This function should not block since it's run via RunBriefInParallel. -pair Transaction::ScheduleInShard(EngineShard* shard) { - DCHECK(shard_data_[SidToId(shard->shard_id())].local_mask & ACTIVE); +bool Transaction::ScheduleInShard(EngineShard* shard) { + ShardId sid = SidToId(shard->shard_id()); + auto& sd = shard_data_[sid]; + + DCHECK(sd.local_mask & ACTIVE); + DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); + sd.local_mask &= ~OUT_OF_ORDER; // If a more recent transaction already commited, we abort if (shard->committed_txid() >= txid_) - return {false, false}; + return false; TxQueue* txq = shard->txq(); KeyLockArgs lock_args; IntentLock::Mode mode = LockMode(); bool lock_granted = false; - ShardId sid = SidToId(shard->shard_id()); - auto& sd = shard_data_[sid]; - - // Acquire intent locks + // Acquire intent locks. Intent locks are always acquired, even if already locked by others. if (!IsGlobal()) { lock_args = GetLockArgs(shard->shard_id()); - // Key locks are acquired even if the shard is locked since intent locks are always acquired bool shard_unlocked = shard->shard_lock()->Check(mode); bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args); + lock_granted = shard_unlocked && keys_unlocked; - lock_granted = keys_unlocked && shard_unlocked; sd.local_mask |= KEYLOCK_ACQUIRED; + if (lock_granted) { + sd.local_mask |= OUT_OF_ORDER; + } + DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId(); } // If the new transaction requires reordering of the pending queue (i.e. it comes before tail) // and some other transaction already locked its keys we can not reorder 'trans' because - // that other transaction could have deduced that it can run OOO and eagerly execute. Hence, we + // the transaction could have deduced that it can run OOO and eagerly execute. Hence, we // fail this scheduling attempt for trans. if (!txq->Empty() && txid_ < txq->TailScore() && !lock_granted) { if (sd.local_mask & KEYLOCK_ACQUIRED) { shard->db_slice().Release(mode, lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } - return {false, false}; + return false; } if (IsGlobal()) { @@ -1172,7 +1160,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { AnalyzeTxQueue(shard, txq); DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); - return {true, lock_granted}; + return true; } bool Transaction::CancelShardCb(EngineShard* shard) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 18d16c1ce..d3d5bb357 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -156,8 +156,9 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { - ACTIVE = 1, // Set on all active shards. - OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order + ACTIVE = 1, // Set on all active shards. + OUT_OF_ORDER = + 1 << 2, // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED is not set. KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard()) AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) @@ -311,10 +312,6 @@ class Transaction { bool IsGlobal() const; - bool IsOOO() const { - return coordinator_state_ & COORD_OOO; - } - // If blocking tx was woken up on this shard, get wake key. std::optional GetWakeKey(ShardId sid) const; @@ -424,7 +421,6 @@ class Transaction { COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction COORD_BLOCKED = 1 << 2, COORD_CANCELLED = 1 << 3, - COORD_OOO = 1 << 4, }; struct PerShardCache { @@ -470,12 +466,9 @@ class Transaction { // Returns true if transaction ran out-of-order during the scheduling phase. bool ScheduleUniqueShard(EngineShard* shard); - // Schedule on shards transaction queue. - // Returns pair(schedule_success, lock_granted) - // schedule_success is true if transaction was scheduled on db_slice. - // lock_granted is true if lock was granted for all the keys on this shard. - // Runs in the shard thread. - std::pair ScheduleInShard(EngineShard* shard); + // Schedule on shards transaction queue. Returns true if scheduled successfully, + // false if inconsistent order was detected and the schedule needs to be cancelled. + bool ScheduleInShard(EngineShard* shard); // Optimized version of RunInShard for single shard uncontended cases. RunnableResult RunQuickie(EngineShard* shard); @@ -547,12 +540,11 @@ class Transaction { // Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones. template void IterateActiveShards(F&& f) { - if (!global_ && unique_shard_cnt_ == 1) { // unique_shard_id_ is set only for non-global. - auto i = unique_shard_id_; - f(shard_data_[SidToId(i)], i); + if (unique_shard_cnt_ == 1) { + f(shard_data_[SidToId(unique_shard_id_)], unique_shard_id_); } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { - if (auto& sd = shard_data_[i]; global_ || (sd.local_mask & ACTIVE)) { + if (auto& sd = shard_data_[i]; sd.local_mask & ACTIVE) { f(sd, i); } }