feat: introduce 'debug tx' command and periodic overload logs (#2333)

This command shows the current state of transaction queues,
specifically how many armed (ready to run) transactions there,
how loaded these queue are and how many locks there are in each shard.

In addition, if a tx queue becomes too long, we will output warning logs about
the state of the queue, in order to be able to identify
the bottlenecks post-factum.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-12-25 11:48:55 +02:00 committed by GitHub
parent a360b308c9
commit ce7497071c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 146 additions and 52 deletions

View file

@ -23,12 +23,24 @@ class IntentLock {
return m == SHARED || cnt_[EXCLUSIVE] == 1;
}
// Returns true if lock can be acquired using `m` mode.
bool Check(Mode m) const {
unsigned s = cnt_[EXCLUSIVE];
if (s)
return false;
return (m == SHARED) ? true : IsFree();
return (m == SHARED) ? true : cnt_[SHARED] == 0;
}
// Returns true if this lock would block transactions from running unless they are at the head
// of the transaction queue (first ones)
bool IsContended() const {
return (cnt_[EXCLUSIVE] > 1) || (cnt_[EXCLUSIVE] == 1 && cnt_[SHARED] > 0);
}
// A heuristic function to estimate the contention amount with a single score.
unsigned ContentionScore() const {
return cnt_[EXCLUSIVE] * 256 + cnt_[SHARED];
}
void Release(Mode m, unsigned val = 1) {

View file

@ -922,12 +922,12 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
if (lock_args.args.size() == 1) {
string_view key = KeyLockArgs::GetLockKey(lock_args.args.front());
lock_acquired = lt[key].Acquire(mode);
uniq_keys_ = {key};
uniq_keys_ = {key}; // needed only for tests.
} else {
uniq_keys_.clear();
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
auto s = KeyLockArgs::GetLockKey(lock_args.args[i]);
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
if (uniq_keys_.insert(s).second) {
bool res = lt[s].Acquire(mode);
lock_acquired &= res;

View file

@ -248,6 +248,8 @@ void DebugCmd::Run(CmdArgList args) {
" Prints the stacktraces of all current fibers to the logs.",
"SHARDS",
" Prints memory usage and key stats per shard, as well as min/max indicators.",
"TX",
" Performs transaction analysis per shard.",
"HELP",
" Prints this help.",
};
@ -282,7 +284,7 @@ void DebugCmd::Run(CmdArgList args) {
return Inspect(key);
}
if (subcmd == "TRANSACTION") {
if (subcmd == "TX") {
return TxAnalysis();
}
@ -659,60 +661,27 @@ void DebugCmd::Watched() {
}
void DebugCmd::TxAnalysis() {
atomic_uint32_t queue_len{0}, free_cnt{0}, armed_cnt{0};
using SvLockTable = absl::flat_hash_map<string_view, IntentLock>;
vector<SvLockTable> lock_table_arr(shard_set->size());
vector<EngineShard::TxQueueInfo> shard_info(shard_set->size());
auto cb = [&](EngineShard* shard) {
ShardId sid = shard->shard_id();
TxQueue* queue = shard->txq();
if (queue->Empty())
return;
auto cur = queue->Head();
do {
auto value = queue->At(cur);
Transaction* trx = std::get<Transaction*>(value);
queue_len.fetch_add(1, std::memory_order_relaxed);
if (trx->IsArmedInShard(sid)) {
armed_cnt.fetch_add(1, std::memory_order_relaxed);
IntentLock::Mode mode = trx->Mode();
// We consider keys from the currently assigned command inside the transaction.
// Meaning that for multi-tx it does not take into account all the keys.
KeyLockArgs lock_args = trx->GetLockArgs(sid);
auto& lock_table = lock_table_arr[sid];
// We count locks ourselves and do not rely on the lock table inside dbslice.
// The reason for this - to account for ordering information.
// For example, consider T1, T2 both residing in the queue and both lock 'x' exclusively.
// DbSlice::CheckLock returns false for both transactions, but T1 in practice owns the lock.
bool can_take = true;
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view s = lock_args.args[i];
bool was_ack = lock_table[s].Acquire(mode);
if (!was_ack) {
can_take = false;
}
}
if (can_take) {
free_cnt.fetch_add(1, std::memory_order_relaxed);
}
}
cur = queue->Next(cur);
} while (cur != queue->Head());
auto& info = shard_info[shard->shard_id()];
info = shard->AnalyzeTxQueue();
};
shard_set->RunBriefInParallel(cb);
cntx_->SendSimpleString(absl::StrCat("queue_len:", queue_len.load(), "armed: ", armed_cnt.load(),
" free:", free_cnt.load()));
string result;
for (unsigned i = 0; i < shard_set->size(); ++i) {
const auto& info = shard_info[i];
StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total,
",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n");
StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks,
"\n");
StrAppend(&result, " max contention score: ", info.max_contention_score,
",lock_name:", info.max_contention_lock_name, "\n");
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx_->reply_builder());
rb->SendVerbatimString(result);
}
void DebugCmd::ObjHist() {

View file

@ -708,6 +708,73 @@ void EngineShard::TEST_EnableHeartbeat() {
});
}
auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo {
const TxQueue* queue = txq();
TxQueueInfo info;
if (queue->Empty())
return info;
auto cur = queue->Head();
info.tx_total = queue->size();
unsigned max_db_id = 0;
ShardId sid = shard_id();
do {
auto value = queue->At(cur);
Transaction* trx = std::get<Transaction*>(value);
// find maximum index of databases used by transactions
if (trx->GetDbIndex() > max_db_id) {
max_db_id = trx->GetDbIndex();
}
if (trx->IsArmedInShard(sid)) {
info.tx_armed++;
if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) {
info.tx_global++;
} else {
KeyLockArgs lock_args = trx->GetLockArgs(sid);
DbTable* table = db_slice().GetDBTable(trx->GetDbIndex());
bool can_run = true;
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
auto it = table->trans_locks.find(s);
DCHECK(it != table->trans_locks.end());
if (it != table->trans_locks.end()) {
if (it->second.IsContended()) {
can_run = false;
break;
}
}
}
if (can_run) {
info.tx_runnable++;
}
}
}
cur = queue->Next(cur);
} while (cur != queue->Head());
// Analyze locks
for (unsigned i = 0; i <= max_db_id; ++i) {
DbTable* table = db_slice().GetDBTable(i);
info.total_locks += table->trans_locks.size();
for (const auto& k_v : table->trans_locks) {
if (k_v.second.IsContended()) {
info.contended_locks++;
if (k_v.second.ContentionScore() > info.max_contention_score) {
info.max_contention_score = k_v.second.ContentionScore();
info.max_contention_lock_name = k_v.first;
}
}
}
}
return info;
}
/**

View file

@ -153,6 +153,29 @@ class EngineShard {
void TEST_EnableHeartbeat();
struct TxQueueInfo {
// Armed - those that the coordinator has armed with callbacks and wants them to run.
// Runnable - those that could run (they own the locks) but probably can not run due
// to head of line blocking in the transaction queue i.e. there is a transaction that
// either is not armed or not runnable that is blocking the runnable transactions.
// tx_total is the size of the transaction queue.
unsigned tx_armed = 0, tx_total = 0, tx_runnable = 0, tx_global = 0;
// total_locks - total number of the transaction locks in the shard.
unsigned total_locks = 0;
// contended_locks - number of locks that are contended by more than one transaction.
unsigned contended_locks = 0;
// The score of the lock with maximum contention (see IntentLock::ContetionScore for details).
unsigned max_contention_score = 0;
// the lock name with maximum contention
std::string max_contention_lock_name;
};
TxQueueInfo AnalyzeTxQueue();
private:
struct DefragTaskState {
size_t dbid = 0u;

View file

@ -14,6 +14,9 @@
#include "server/journal/journal.h"
#include "server/server_state.h"
ABSL_FLAG(uint32_t, tx_queue_warning_len, 30,
"Length threshold for warning about long transaction queue");
namespace dfly {
using namespace std;
@ -1098,7 +1101,25 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
TxQueue::Iterator it = txq->Insert(this);
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
sd.pq_pos = it;
unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len);
if (txq->size() > q_limit) {
static thread_local time_t last_log_time = 0;
// TODO: glog provides LOG_EVERY_T, which uses precise clock.
// We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and
// uses low precision clock.
time_t now = time(nullptr);
if (now >= last_log_time + 10) {
last_log_time = now;
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
LOG(WARNING) << "TxQueue is too long. Tx count:" << info.tx_total
<< ", armed:" << info.tx_armed << ", runnable:" << info.tx_runnable
<< ", total locks: " << info.total_locks
<< ", contended locks: " << info.contended_locks << "\n"
<< "max contention score: " << info.max_contention_score
<< ", lock: " << info.max_contention_lock_name;
}
}
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size();
return {true, lock_granted};
@ -1330,6 +1351,8 @@ inline uint32_t Transaction::DecreaseRunCnt() {
}
bool Transaction::IsGlobal() const {
// Please note that a transaction can be non-global even if multi_->mode == GLOBAL.
// It happens when a transaction is squashed and switches to execute differrent commands.
return global_;
}