diff --git a/src/core/intent_lock.h b/src/core/intent_lock.h index eb22b69ab..0bf4e62d8 100644 --- a/src/core/intent_lock.h +++ b/src/core/intent_lock.h @@ -23,12 +23,24 @@ class IntentLock { return m == SHARED || cnt_[EXCLUSIVE] == 1; } + // Returns true if lock can be acquired using `m` mode. bool Check(Mode m) const { unsigned s = cnt_[EXCLUSIVE]; if (s) return false; - return (m == SHARED) ? true : IsFree(); + return (m == SHARED) ? true : cnt_[SHARED] == 0; + } + + // Returns true if this lock would block transactions from running unless they are at the head + // of the transaction queue (first ones) + bool IsContended() const { + return (cnt_[EXCLUSIVE] > 1) || (cnt_[EXCLUSIVE] == 1 && cnt_[SHARED] > 0); + } + + // A heuristic function to estimate the contention amount with a single score. + unsigned ContentionScore() const { + return cnt_[EXCLUSIVE] * 256 + cnt_[SHARED]; } void Release(Mode m, unsigned val = 1) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 8e17ed55b..c4f75e6c8 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -922,12 +922,12 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { if (lock_args.args.size() == 1) { string_view key = KeyLockArgs::GetLockKey(lock_args.args.front()); lock_acquired = lt[key].Acquire(mode); - uniq_keys_ = {key}; + uniq_keys_ = {key}; // needed only for tests. } else { uniq_keys_.clear(); for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - auto s = KeyLockArgs::GetLockKey(lock_args.args[i]); + string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); if (uniq_keys_.insert(s).second) { bool res = lt[s].Acquire(mode); lock_acquired &= res; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 3c2bdf5d3..210daf66f 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -248,6 +248,8 @@ void DebugCmd::Run(CmdArgList args) { " Prints the stacktraces of all current fibers to the logs.", "SHARDS", " Prints memory usage and key stats per shard, as well as min/max indicators.", + "TX", + " Performs transaction analysis per shard.", "HELP", " Prints this help.", }; @@ -282,7 +284,7 @@ void DebugCmd::Run(CmdArgList args) { return Inspect(key); } - if (subcmd == "TRANSACTION") { + if (subcmd == "TX") { return TxAnalysis(); } @@ -659,60 +661,27 @@ void DebugCmd::Watched() { } void DebugCmd::TxAnalysis() { - atomic_uint32_t queue_len{0}, free_cnt{0}, armed_cnt{0}; - - using SvLockTable = absl::flat_hash_map; - vector lock_table_arr(shard_set->size()); + vector shard_info(shard_set->size()); auto cb = [&](EngineShard* shard) { - ShardId sid = shard->shard_id(); - - TxQueue* queue = shard->txq(); - - if (queue->Empty()) - return; - - auto cur = queue->Head(); - do { - auto value = queue->At(cur); - Transaction* trx = std::get(value); - queue_len.fetch_add(1, std::memory_order_relaxed); - - if (trx->IsArmedInShard(sid)) { - armed_cnt.fetch_add(1, std::memory_order_relaxed); - - IntentLock::Mode mode = trx->Mode(); - - // We consider keys from the currently assigned command inside the transaction. - // Meaning that for multi-tx it does not take into account all the keys. - KeyLockArgs lock_args = trx->GetLockArgs(sid); - auto& lock_table = lock_table_arr[sid]; - - // We count locks ourselves and do not rely on the lock table inside dbslice. - // The reason for this - to account for ordering information. - // For example, consider T1, T2 both residing in the queue and both lock 'x' exclusively. - // DbSlice::CheckLock returns false for both transactions, but T1 in practice owns the lock. - bool can_take = true; - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view s = lock_args.args[i]; - bool was_ack = lock_table[s].Acquire(mode); - if (!was_ack) { - can_take = false; - } - } - - if (can_take) { - free_cnt.fetch_add(1, std::memory_order_relaxed); - } - } - cur = queue->Next(cur); - } while (cur != queue->Head()); + auto& info = shard_info[shard->shard_id()]; + info = shard->AnalyzeTxQueue(); }; shard_set->RunBriefInParallel(cb); - cntx_->SendSimpleString(absl::StrCat("queue_len:", queue_len.load(), "armed: ", armed_cnt.load(), - " free:", free_cnt.load())); + string result; + for (unsigned i = 0; i < shard_set->size(); ++i) { + const auto& info = shard_info[i]; + StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total, + ",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n"); + StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks, + "\n"); + StrAppend(&result, " max contention score: ", info.max_contention_score, + ",lock_name:", info.max_contention_lock_name, "\n"); + } + auto* rb = static_cast(cntx_->reply_builder()); + rb->SendVerbatimString(result); } void DebugCmd::ObjHist() { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 45d518e03..01c1ec1f0 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -708,6 +708,73 @@ void EngineShard::TEST_EnableHeartbeat() { }); } +auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { + const TxQueue* queue = txq(); + + TxQueueInfo info; + if (queue->Empty()) + return info; + + auto cur = queue->Head(); + info.tx_total = queue->size(); + unsigned max_db_id = 0; + ShardId sid = shard_id(); + + do { + auto value = queue->At(cur); + Transaction* trx = std::get(value); + + // find maximum index of databases used by transactions + if (trx->GetDbIndex() > max_db_id) { + max_db_id = trx->GetDbIndex(); + } + + if (trx->IsArmedInShard(sid)) { + info.tx_armed++; + + if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) { + info.tx_global++; + } else { + KeyLockArgs lock_args = trx->GetLockArgs(sid); + DbTable* table = db_slice().GetDBTable(trx->GetDbIndex()); + bool can_run = true; + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); + auto it = table->trans_locks.find(s); + DCHECK(it != table->trans_locks.end()); + if (it != table->trans_locks.end()) { + if (it->second.IsContended()) { + can_run = false; + break; + } + } + } + if (can_run) { + info.tx_runnable++; + } + } + } + cur = queue->Next(cur); + } while (cur != queue->Head()); + + // Analyze locks + for (unsigned i = 0; i <= max_db_id; ++i) { + DbTable* table = db_slice().GetDBTable(i); + info.total_locks += table->trans_locks.size(); + for (const auto& k_v : table->trans_locks) { + if (k_v.second.IsContended()) { + info.contended_locks++; + if (k_v.second.ContentionScore() > info.max_contention_score) { + info.max_contention_score = k_v.second.ContentionScore(); + info.max_contention_lock_name = k_v.first; + } + } + } + } + + return info; +} + /** diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 3454534a8..43b0894ec 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -153,6 +153,29 @@ class EngineShard { void TEST_EnableHeartbeat(); + struct TxQueueInfo { + // Armed - those that the coordinator has armed with callbacks and wants them to run. + // Runnable - those that could run (they own the locks) but probably can not run due + // to head of line blocking in the transaction queue i.e. there is a transaction that + // either is not armed or not runnable that is blocking the runnable transactions. + // tx_total is the size of the transaction queue. + unsigned tx_armed = 0, tx_total = 0, tx_runnable = 0, tx_global = 0; + + // total_locks - total number of the transaction locks in the shard. + unsigned total_locks = 0; + + // contended_locks - number of locks that are contended by more than one transaction. + unsigned contended_locks = 0; + + // The score of the lock with maximum contention (see IntentLock::ContetionScore for details). + unsigned max_contention_score = 0; + + // the lock name with maximum contention + std::string max_contention_lock_name; + }; + + TxQueueInfo AnalyzeTxQueue(); + private: struct DefragTaskState { size_t dbid = 0u; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0de467a20..798ecd1b5 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -14,6 +14,9 @@ #include "server/journal/journal.h" #include "server/server_state.h" +ABSL_FLAG(uint32_t, tx_queue_warning_len, 30, + "Length threshold for warning about long transaction queue"); + namespace dfly { using namespace std; @@ -1098,7 +1101,25 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { TxQueue::Iterator it = txq->Insert(this); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); sd.pq_pos = it; + unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len); + if (txq->size() > q_limit) { + static thread_local time_t last_log_time = 0; + // TODO: glog provides LOG_EVERY_T, which uses precise clock. + // We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and + // uses low precision clock. + time_t now = time(nullptr); + if (now >= last_log_time + 10) { + last_log_time = now; + EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); + LOG(WARNING) << "TxQueue is too long. Tx count:" << info.tx_total + << ", armed:" << info.tx_armed << ", runnable:" << info.tx_runnable + << ", total locks: " << info.total_locks + << ", contended locks: " << info.contended_locks << "\n" + << "max contention score: " << info.max_contention_score + << ", lock: " << info.max_contention_lock_name; + } + } DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); return {true, lock_granted}; @@ -1330,6 +1351,8 @@ inline uint32_t Transaction::DecreaseRunCnt() { } bool Transaction::IsGlobal() const { + // Please note that a transaction can be non-global even if multi_->mode == GLOBAL. + // It happens when a transaction is squashed and switches to execute differrent commands. return global_; }