chore(transaction): Add debug stats for fail printing (#2600)

* chore(transaction): Add debug stats for per shard data

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-02-18 15:36:14 +03:00 committed by GitHub
parent 4d4fed6fec
commit 1b51e82e55
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 41 additions and 5 deletions

View file

@ -111,7 +111,7 @@ void Transaction::PhasedBarrier::Dec(Transaction* keep_alive) {
::boost::intrusive_ptr guard(keep_alive);
uint32_t before = count_.fetch_sub(1);
CHECK_GE(before, 1u);
CHECK_GE(before, 1u) << keep_alive->DEBUG_PrintFailState(EngineShard::tlocal()->shard_id());
if (before == 1)
ec_.notify();
}
@ -209,6 +209,7 @@ void Transaction::InitBase(DbIndex dbid, CmdArgList args) {
db_index_ = dbid;
full_args_ = args;
local_result_ = OpStatus::OK;
stats_.coordinator_index = ProactorBase::me() ? ProactorBase::me()->GetPoolIndex() : kInvalidSid;
}
void Transaction::InitGlobal() {
@ -600,10 +601,12 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
CHECK(sd.local_mask & ARMED);
CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id());
sd.local_mask &= ~ARMED;
CHECK_GT(run_barrier_.DEBUG_Count(), 0u);
sd.stats.total_runs++;
DCHECK_GT(run_barrier_.DEBUG_Count(), 0u);
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
bool was_suspended = sd.local_mask & SUSPENDED_Q;
@ -743,6 +746,8 @@ void Transaction::ScheduleInternal() {
// Loop until successfully scheduled in all shards.
while (true) {
stats_.schedule_attempts++;
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
InitTxTime();
@ -1021,6 +1026,21 @@ void Transaction::FIX_ConcludeJournalExec() {
MultiReportJournalOnShard(EngineShard::tlocal());
}
string Transaction::DEBUG_PrintFailState(ShardId sid) const {
auto res = StrCat(
"usc: ", unique_shard_cnt_, ", name:", GetCId()->name(),
", usecnt:", use_count_.load(memory_order_relaxed), ", runcnt: ", run_barrier_.DEBUG_Count(),
", coordstate: ", coordinator_state_, ", coord native thread: ", stats_.coordinator_index,
", schedule attempts: ", stats_.schedule_attempts, ", report from sid: ", sid, "\n");
std::atomic_thread_fence(memory_order_acquire);
for (unsigned i = 0; i < shard_data_.size(); ++i) {
const auto& sd = shard_data_[i];
absl::StrAppend(&res, "- shard: ", i, " local_mask:", sd.local_mask,
" total_runs: ", sd.stats.total_runs, "\n");
}
return res;
}
void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
@ -1048,9 +1068,11 @@ Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
CHECK(sd.local_mask & ARMED);
CHECK(sd.local_mask & ARMED) << DEBUG_PrintFailState(shard->shard_id());
sd.local_mask &= ~ARMED;
sd.stats.total_runs++;
// Calling the callback in somewhat safe way
RunnableResult result;
try {