diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e6d0bd0e2..ee80c29a0 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -308,6 +308,10 @@ class DbSlice { return db_arr_[id].get(); } + const DbTable* GetDBTable(DbIndex id) const { + return db_arr_[id].get(); + } + std::pair GetTables(DbIndex id) { return std::pair(&db_arr_[id]->prime, &db_arr_[id]->expire); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 684ca4afb..cc63d4b3c 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -165,7 +165,7 @@ class RoundRobinSharder { static Mutex mutex_; }; -bool HasContendedLocks(unsigned shard_id, Transaction* trx, DbTable* table) { +bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) { bool has_contended_locks = false; if (trx->IsMulti()) { @@ -713,7 +713,7 @@ void EngineShard::TEST_EnableHeartbeat() { }); } -auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { +auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo { const TxQueue* queue = txq(); ShardId sid = shard_id(); @@ -742,7 +742,7 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) { info.tx_global++; } else { - DbTable* table = db_slice().GetDBTable(trx->GetDbIndex()); + const DbTable* table = db_slice().GetDBTable(trx->GetDbIndex()); bool can_run = !HasContendedLocks(sid, trx, table); if (can_run) { info.tx_runnable++; @@ -754,7 +754,7 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { // Analyze locks for (unsigned i = 0; i <= max_db_id; ++i) { - DbTable* table = db_slice().GetDBTable(i); + const DbTable* table = db_slice().GetDBTable(i); if (table == nullptr) continue; diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 82b8bdf81..976671279 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -86,6 +86,10 @@ class EngineShard { return &txq_; } + const TxQueue* txq() const { + return &txq_; + } + TxId committed_txid() const { return committed_txid_; } @@ -174,7 +178,7 @@ class EngineShard { std::string max_contention_lock_name; }; - TxQueueInfo AnalyzeTxQueue(); + TxQueueInfo AnalyzeTxQueue() const; private: struct DefragTaskState { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index d6bd5730c..8d7450568 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -31,6 +31,48 @@ atomic_uint64_t op_seq{1}; constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction); +void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) { + unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len); + if (txq->size() > q_limit) { + static thread_local time_t last_log_time = 0; + // TODO: glog provides LOG_EVERY_T, which uses precise clock. + // We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and + // uses low precision clock. + time_t now = time(nullptr); + if (now >= last_log_time + 10) { + last_log_time = now; + EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); + string msg = + StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed, + ", runnable:", info.tx_runnable, ", total locks: ", info.total_locks, + ", contended locks: ", info.contended_locks, "\n"); + absl::StrAppend(&msg, "max contention score: ", info.max_contention_score, + ", lock: ", info.max_contention_lock_name, + ", poll_executions:", shard->stats().poll_execution_total); + const Transaction* cont_tx = shard->GetContTx(); + if (cont_tx) { + absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ", + cont_tx->IsArmedInShard(shard->shard_id()) ? " armed" : ""); + } + + LOG(WARNING) << msg; + } + } +} + +void RecordTxScheduleStats(const Transaction* tx) { + auto* ss = ServerState::tlocal(); + DCHECK(ss); + ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]++; + if (tx->IsGlobal()) { + ss->stats.tx_type_cnt[ServerState::GLOBAL]++; + } else if (tx->IsOOO()) { + ss->stats.tx_type_cnt[ServerState::OOO]++; + } else { + ss->stats.tx_type_cnt[ServerState::NORMAL]++; + } +} + } // namespace IntentLock::Mode Transaction::LockMode() const { @@ -464,10 +506,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { IntentLock::Mode mode = LockMode(); DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL)); - - if (txq_ooo) { - DCHECK(sd.local_mask & OUT_OF_ORDER); - } + DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER)); /*************************************************************************/ // Actually running the callback. @@ -577,36 +616,21 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { return !is_concluding; } +// TODO: For multi-transactions we should be able to deduce mode() at run-time based +// on the context. For regular multi-transactions we can actually inspect all commands. +// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or +// auto-tune based on the static analysis (by identifying commands with hardcoded command names). void Transaction::ScheduleInternal() { DCHECK(!shard_data_.empty()); DCHECK_EQ(0u, txid_); DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO)); + DCHECK_GT(unique_shard_cnt_, 0u); - bool span_all = IsGlobal(); + DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards"; - uint32_t num_shards; - std::function is_active; - - // TODO: For multi-transactions we should be able to deduce mode() at run-time based - // on the context. For regular multi-transactions we can actually inspect all commands. - // For eval-like transactions - we can decided based on the command flavor (EVAL/EVALRO) or - // auto-tune based on the static analysis (by identifying commands with hardcoded command names). - if (span_all) { - is_active = [](uint32_t) { return true; }; - num_shards = shard_set->size(); - } else { - num_shards = unique_shard_cnt_; - DCHECK_GT(num_shards, 0u); - - is_active = [&](uint32_t i) { - return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].local_mask & ACTIVE; - }; - } + auto is_active = [this](uint32_t i) { return IsActive(i); }; // Loop until successfully scheduled in all shards. - ServerState* ss = ServerState::tlocal(); - DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << num_shards << " shards"; - DCHECK(ss); while (true) { txid_ = op_seq.fetch_add(1, memory_order_relaxed); time_now_ms_ = GetCurrentTimeMs(); @@ -621,26 +645,16 @@ void Transaction::ScheduleInternal() { }; shard_set->RunBriefInParallel(std::move(cb), is_active); - if (success.load(memory_order_acquire) == num_shards) { + if (success.load(memory_order_acquire) == unique_shard_cnt_) { coordinator_state_ |= COORD_SCHED; - bool ooo_disabled = IsAtomicMulti() && multi_->mode != LOCK_AHEAD; - - DCHECK_GT(num_shards, 0u); - - ss->stats.tx_width_freq_arr[num_shards - 1]++; - - if (IsGlobal()) { - ss->stats.tx_type_cnt[ServerState::GLOBAL]++; - } else if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { - // If we granted all locks, we can run out of order. - coordinator_state_ |= COORD_OOO; - ss->stats.tx_type_cnt[ServerState::OOO]++; - } else { - ss->stats.tx_type_cnt[ServerState::NORMAL]++; + if (lock_granted_cnt.load(memory_order_relaxed) == unique_shard_cnt_) { + coordinator_state_ |= COORD_OOO; // If we granted all locks, we can run out of order. } + + RecordTxScheduleStats(this); VLOG(2) << "Scheduled " << DebugId() << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) - << " num_shards: " << num_shards; + << " num_shards: " << unique_shard_cnt_; break; } @@ -1150,33 +1164,8 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { TxQueue::Iterator it = txq->Insert(this); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); sd.pq_pos = it; - unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len); - if (txq->size() > q_limit) { - static thread_local time_t last_log_time = 0; - // TODO: glog provides LOG_EVERY_T, which uses precise clock. - // We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and - // uses low precision clock. - time_t now = time(nullptr); - if (now >= last_log_time + 10) { - last_log_time = now; - EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); - string msg = - StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed, - ", runnable:", info.tx_runnable, ", total locks: ", info.total_locks, - ", contended locks: ", info.contended_locks, "\n"); - absl::StrAppend(&msg, "max contention score: ", info.max_contention_score, - ", lock: ", info.max_contention_lock_name, - ", poll_executions:", shard->stats().poll_execution_total); - const Transaction* cont_tx = shard->GetContTx(); - if (cont_tx) { - absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ", - cont_tx->IsArmedInShard(sid) ? " armed" : ""); - } - - LOG(WARNING) << msg; - } - } + AnalyzeTxQueue(shard, txq); DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); return {true, lock_granted}; diff --git a/src/server/transaction.h b/src/server/transaction.h index fa49c0910..c6ecb518f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -156,8 +156,7 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { - ACTIVE = 1, // Set on all active shards. - // UNUSED = 1 << 1, + ACTIVE = 1, // Set on all active shards. OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard()) @@ -252,11 +251,11 @@ class Transaction { // Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; - //! Returns true if the transaction spans this shard_id. - //! Runs from the coordinator thread. + // Returns true if the transaction spans this shard_id. + // Runs from the coordinator thread. bool IsActive(ShardId shard_id) const { - return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id - : shard_data_[shard_id].arg_count > 0; + return unique_shard_cnt_ == 1 ? (unique_shard_id_ == shard_id) + : shard_data_[shard_id].local_mask & ACTIVE; } //! Returns true if the transaction is armed for execution on this sid (used to avoid @@ -420,7 +419,6 @@ class Transaction { enum CoordinatorState : uint8_t { COORD_SCHED = 1, - COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction COORD_BLOCKED = 1 << 2, COORD_CANCELLED = 1 << 3,