From ce7497071c8da273ced1128f0787c12c9403a483 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 25 Dec 2023 11:48:55 +0200 Subject: [PATCH] feat: introduce 'debug tx' command and periodic overload logs (#2333) This command shows the current state of transaction queues, specifically how many armed (ready to run) transactions there, how loaded these queue are and how many locks there are in each shard. In addition, if a tx queue becomes too long, we will output warning logs about the state of the queue, in order to be able to identify the bottlenecks post-factum. Signed-off-by: Roman Gershman --- src/core/intent_lock.h | 14 ++++++- src/server/db_slice.cc | 4 +- src/server/debugcmd.cc | 67 +++++++++------------------------- src/server/engine_shard_set.cc | 67 ++++++++++++++++++++++++++++++++++ src/server/engine_shard_set.h | 23 ++++++++++++ src/server/transaction.cc | 23 ++++++++++++ 6 files changed, 146 insertions(+), 52 deletions(-) diff --git a/src/core/intent_lock.h b/src/core/intent_lock.h index eb22b69ab..0bf4e62d8 100644 --- a/src/core/intent_lock.h +++ b/src/core/intent_lock.h @@ -23,12 +23,24 @@ class IntentLock { return m == SHARED || cnt_[EXCLUSIVE] == 1; } + // Returns true if lock can be acquired using `m` mode. bool Check(Mode m) const { unsigned s = cnt_[EXCLUSIVE]; if (s) return false; - return (m == SHARED) ? true : IsFree(); + return (m == SHARED) ? true : cnt_[SHARED] == 0; + } + + // Returns true if this lock would block transactions from running unless they are at the head + // of the transaction queue (first ones) + bool IsContended() const { + return (cnt_[EXCLUSIVE] > 1) || (cnt_[EXCLUSIVE] == 1 && cnt_[SHARED] > 0); + } + + // A heuristic function to estimate the contention amount with a single score. + unsigned ContentionScore() const { + return cnt_[EXCLUSIVE] * 256 + cnt_[SHARED]; } void Release(Mode m, unsigned val = 1) { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 8e17ed55b..c4f75e6c8 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -922,12 +922,12 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { if (lock_args.args.size() == 1) { string_view key = KeyLockArgs::GetLockKey(lock_args.args.front()); lock_acquired = lt[key].Acquire(mode); - uniq_keys_ = {key}; + uniq_keys_ = {key}; // needed only for tests. } else { uniq_keys_.clear(); for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - auto s = KeyLockArgs::GetLockKey(lock_args.args[i]); + string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); if (uniq_keys_.insert(s).second) { bool res = lt[s].Acquire(mode); lock_acquired &= res; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 3c2bdf5d3..210daf66f 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -248,6 +248,8 @@ void DebugCmd::Run(CmdArgList args) { " Prints the stacktraces of all current fibers to the logs.", "SHARDS", " Prints memory usage and key stats per shard, as well as min/max indicators.", + "TX", + " Performs transaction analysis per shard.", "HELP", " Prints this help.", }; @@ -282,7 +284,7 @@ void DebugCmd::Run(CmdArgList args) { return Inspect(key); } - if (subcmd == "TRANSACTION") { + if (subcmd == "TX") { return TxAnalysis(); } @@ -659,60 +661,27 @@ void DebugCmd::Watched() { } void DebugCmd::TxAnalysis() { - atomic_uint32_t queue_len{0}, free_cnt{0}, armed_cnt{0}; - - using SvLockTable = absl::flat_hash_map; - vector lock_table_arr(shard_set->size()); + vector shard_info(shard_set->size()); auto cb = [&](EngineShard* shard) { - ShardId sid = shard->shard_id(); - - TxQueue* queue = shard->txq(); - - if (queue->Empty()) - return; - - auto cur = queue->Head(); - do { - auto value = queue->At(cur); - Transaction* trx = std::get(value); - queue_len.fetch_add(1, std::memory_order_relaxed); - - if (trx->IsArmedInShard(sid)) { - armed_cnt.fetch_add(1, std::memory_order_relaxed); - - IntentLock::Mode mode = trx->Mode(); - - // We consider keys from the currently assigned command inside the transaction. - // Meaning that for multi-tx it does not take into account all the keys. - KeyLockArgs lock_args = trx->GetLockArgs(sid); - auto& lock_table = lock_table_arr[sid]; - - // We count locks ourselves and do not rely on the lock table inside dbslice. - // The reason for this - to account for ordering information. - // For example, consider T1, T2 both residing in the queue and both lock 'x' exclusively. - // DbSlice::CheckLock returns false for both transactions, but T1 in practice owns the lock. - bool can_take = true; - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view s = lock_args.args[i]; - bool was_ack = lock_table[s].Acquire(mode); - if (!was_ack) { - can_take = false; - } - } - - if (can_take) { - free_cnt.fetch_add(1, std::memory_order_relaxed); - } - } - cur = queue->Next(cur); - } while (cur != queue->Head()); + auto& info = shard_info[shard->shard_id()]; + info = shard->AnalyzeTxQueue(); }; shard_set->RunBriefInParallel(cb); - cntx_->SendSimpleString(absl::StrCat("queue_len:", queue_len.load(), "armed: ", armed_cnt.load(), - " free:", free_cnt.load())); + string result; + for (unsigned i = 0; i < shard_set->size(); ++i) { + const auto& info = shard_info[i]; + StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total, + ",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n"); + StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks, + "\n"); + StrAppend(&result, " max contention score: ", info.max_contention_score, + ",lock_name:", info.max_contention_lock_name, "\n"); + } + auto* rb = static_cast(cntx_->reply_builder()); + rb->SendVerbatimString(result); } void DebugCmd::ObjHist() { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 45d518e03..01c1ec1f0 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -708,6 +708,73 @@ void EngineShard::TEST_EnableHeartbeat() { }); } +auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { + const TxQueue* queue = txq(); + + TxQueueInfo info; + if (queue->Empty()) + return info; + + auto cur = queue->Head(); + info.tx_total = queue->size(); + unsigned max_db_id = 0; + ShardId sid = shard_id(); + + do { + auto value = queue->At(cur); + Transaction* trx = std::get(value); + + // find maximum index of databases used by transactions + if (trx->GetDbIndex() > max_db_id) { + max_db_id = trx->GetDbIndex(); + } + + if (trx->IsArmedInShard(sid)) { + info.tx_armed++; + + if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) { + info.tx_global++; + } else { + KeyLockArgs lock_args = trx->GetLockArgs(sid); + DbTable* table = db_slice().GetDBTable(trx->GetDbIndex()); + bool can_run = true; + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]); + auto it = table->trans_locks.find(s); + DCHECK(it != table->trans_locks.end()); + if (it != table->trans_locks.end()) { + if (it->second.IsContended()) { + can_run = false; + break; + } + } + } + if (can_run) { + info.tx_runnable++; + } + } + } + cur = queue->Next(cur); + } while (cur != queue->Head()); + + // Analyze locks + for (unsigned i = 0; i <= max_db_id; ++i) { + DbTable* table = db_slice().GetDBTable(i); + info.total_locks += table->trans_locks.size(); + for (const auto& k_v : table->trans_locks) { + if (k_v.second.IsContended()) { + info.contended_locks++; + if (k_v.second.ContentionScore() > info.max_contention_score) { + info.max_contention_score = k_v.second.ContentionScore(); + info.max_contention_lock_name = k_v.first; + } + } + } + } + + return info; +} + /** diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 3454534a8..43b0894ec 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -153,6 +153,29 @@ class EngineShard { void TEST_EnableHeartbeat(); + struct TxQueueInfo { + // Armed - those that the coordinator has armed with callbacks and wants them to run. + // Runnable - those that could run (they own the locks) but probably can not run due + // to head of line blocking in the transaction queue i.e. there is a transaction that + // either is not armed or not runnable that is blocking the runnable transactions. + // tx_total is the size of the transaction queue. + unsigned tx_armed = 0, tx_total = 0, tx_runnable = 0, tx_global = 0; + + // total_locks - total number of the transaction locks in the shard. + unsigned total_locks = 0; + + // contended_locks - number of locks that are contended by more than one transaction. + unsigned contended_locks = 0; + + // The score of the lock with maximum contention (see IntentLock::ContetionScore for details). + unsigned max_contention_score = 0; + + // the lock name with maximum contention + std::string max_contention_lock_name; + }; + + TxQueueInfo AnalyzeTxQueue(); + private: struct DefragTaskState { size_t dbid = 0u; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0de467a20..798ecd1b5 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -14,6 +14,9 @@ #include "server/journal/journal.h" #include "server/server_state.h" +ABSL_FLAG(uint32_t, tx_queue_warning_len, 30, + "Length threshold for warning about long transaction queue"); + namespace dfly { using namespace std; @@ -1098,7 +1101,25 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { TxQueue::Iterator it = txq->Insert(this); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); sd.pq_pos = it; + unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len); + if (txq->size() > q_limit) { + static thread_local time_t last_log_time = 0; + // TODO: glog provides LOG_EVERY_T, which uses precise clock. + // We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and + // uses low precision clock. + time_t now = time(nullptr); + if (now >= last_log_time + 10) { + last_log_time = now; + EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); + LOG(WARNING) << "TxQueue is too long. Tx count:" << info.tx_total + << ", armed:" << info.tx_armed << ", runnable:" << info.tx_runnable + << ", total locks: " << info.total_locks + << ", contended locks: " << info.contended_locks << "\n" + << "max contention score: " << info.max_contention_score + << ", lock: " << info.max_contention_lock_name; + } + } DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); return {true, lock_granted}; @@ -1330,6 +1351,8 @@ inline uint32_t Transaction::DecreaseRunCnt() { } bool Transaction::IsGlobal() const { + // Please note that a transaction can be non-global even if multi_->mode == GLOBAL. + // It happens when a transaction is squashed and switches to execute differrent commands. return global_; }