From 5625aa421d9cf538e1aca50a19fe15f01739f50d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 4 Mar 2025 16:33:13 +0200 Subject: [PATCH] chore: Add debug logs to help tracking transactional deadlocks (#4669) * chore: reproduce a bug related to #4663 Add various debug logs to help tracking the deadlock. Add more assertions in helio and provide state time for fibers during stacktrace printings. --------- Signed-off-by: Roman Gershman --- .pre-commit-config.yaml | 2 +- src/server/db_slice.cc | 2 ++ src/server/debugcmd.cc | 14 ++++++++------ src/server/engine_shard.cc | 22 +++++++++++++++++++++- src/server/engine_shard.h | 2 ++ src/server/protocol_client.cc | 1 + src/server/replica.cc | 10 +++++++--- src/server/server_family.cc | 1 + src/server/transaction.cc | 11 +++-------- 9 files changed, 46 insertions(+), 19 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1826eed6d..b8846b406 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -default_stages: [commit] +default_stages: [pre-commit] exclude: | (?x)( src/redis/.* | diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 07c2d05b0..d83216d84 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -898,6 +898,8 @@ void DbSlice::FlushDbIndexes(const std::vector& indexes) { } void DbSlice::FlushDb(DbIndex db_ind) { + DVLOG(1) << "Flushing db " << db_ind; + // clear client tracking map. client_tracking_map_.clear(); diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index ed4377363..3fdc1f73a 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -1066,12 +1066,7 @@ void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) { string result; for (unsigned i = 0; i < shard_set->size(); ++i) { const auto& info = shard_info[i]; - StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total, - ",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n"); - StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks, - "\n"); - StrAppend(&result, " max contention score: ", info.max_contention_score, - ",lock_name:", info.max_contention_lock, "\n"); + StrAppend(&result, "shard", i, ":\n", info.Format(), "\n"); } auto* rb = static_cast(builder); rb->SendVerbatimString(result); @@ -1114,7 +1109,14 @@ void DebugCmd::ObjHist(facade::SinkReplyBuilder* builder) { void DebugCmd::Stacktrace(facade::SinkReplyBuilder* builder) { fb2::Mutex m; shard_set->pool()->AwaitFiberOnAll([&m](unsigned index, ProactorBase* base) { + EngineShard* es = EngineShard::tlocal(); + string txq; + if (es) { + EngineShard::TxQueueInfo txq_info = es->AnalyzeTxQueue(); + txq = txq_info.Format(); + } std::unique_lock lk(m); + LOG_IF(INFO, !txq.empty()) << "Shard" << index << ": " << txq; fb2::detail::FiberInterface::PrintAllFiberStackTraces(); }); base::FlushLogs(); diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index e0428d560..c7c544185 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -5,6 +5,7 @@ #include "server/engine_shard.h" #include +#include #include "base/flags.h" #include "io/proc_reader.h" @@ -276,6 +277,25 @@ ShardId Shard(string_view v, ShardId shard_num) { return hash % shard_num; } +string EngineShard::TxQueueInfo::Format() const { + string res; + + if (tx_total > 0) { + absl::StrAppend(&res, "tx armed ", tx_armed, ", total: ", tx_total, ",global:", tx_global, + ",runnable:", tx_runnable, "\n"); + absl::StrAppend(&res, ", head: ", head.debug_id_info, "\n"); + } + if (total_locks > 0) { + absl::StrAppend(&res, "locks total:", total_locks, ",contended:", contended_locks, "\n"); + } + if (max_contention_score > 0) { + absl::StrAppend(&res, "max contention score: ", max_contention_score, + ", lock: ", max_contention_lock, "\n"); + } + + return res; +} + EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { static_assert(sizeof(Stats) == 64); @@ -706,7 +726,7 @@ void EngineShard::RemoveContTx(Transaction* tx) { } void EngineShard::Heartbeat() { - DVLOG(2) << " Hearbeat"; + DVLOG(3) << " Hearbeat"; DCHECK(namespaces); CacheStats(); diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 3a87b09f3..05c918a93 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -181,6 +181,8 @@ class EngineShard { // We can use a vector to hold debug info for all items in the txqueue TxQueueItem head; + + std::string Format() const; }; TxQueueInfo AnalyzeTxQueue() const; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index d17f64e9e..672d22905 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -347,6 +347,7 @@ bool ProtocolClient::CheckRespFirstTypes(initializer_list types) error_code ProtocolClient::SendCommand(string_view command) { string formatted_command = RedisReplyBuilderBase::SerializeCommand(command); + DCHECK(sock_->proactor() == ProactorBase::me()); auto ec = sock_->Write(io::Buffer(formatted_command)); if (!ec) TouchIoTime(); diff --git a/src/server/replica.cc b/src/server/replica.cc index 608685d18..2287c5e6b 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -87,7 +87,7 @@ Replica::~Replica() { static const char kConnErr[] = "could not connect to master: "; GenericError Replica::Start() { - VLOG(1) << "Starting replication"; + VLOG(1) << "Starting replication " << this; ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); @@ -138,7 +138,7 @@ void Replica::EnableReplication(facade::SinkReplyBuilder* builder) { } void Replica::Stop() { - VLOG(1) << "Stopping replication"; + VLOG(1) << "Stopping replication " << this; // Stops the loop in MainReplicationFb. proactor_->Await([this] { @@ -149,6 +149,7 @@ void Replica::Stop() { // Make sure the replica fully stopped and did all cleanup, // so we can freely release resources (connections). sync_fb_.JoinIfNeeded(); + DVLOG(1) << "MainReplicationFb stopped " << this; acks_fb_.JoinIfNeeded(); for (auto& flow : shard_flows_) { flow.reset(); @@ -183,7 +184,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) { } void Replica::MainReplicationFb() { - VLOG(1) << "Main replication fiber started"; + VLOG(1) << "Main replication fiber started " << this; // Switch shard states to replication. SetShardStates(true); @@ -546,11 +547,14 @@ error_code Replica::InitiateDflySync() { std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0); if (num_full_flows == num_df_flows) { + DVLOG(1) << "Calling Flush on all slots " << this; + if (slot_range_.has_value()) { JournalExecutor{&service_}.FlushSlots(slot_range_.value()); } else { JournalExecutor{&service_}.FlushAll(); } + DVLOG(1) << "Flush on all slots ended " << this; } else if (num_full_flows == 0) { sync_type = "partial"; } else { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 952fa1f64..1adc831a3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2903,6 +2903,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply // If the replication attempt failed, clean up global state. The replica should have stopped // internally. util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time + // If there was an error above during Start we must not start the main replication fiber. // However, it could be the case that Start() above connected succefully and by the time // we acquire the lock, the context got cancelled because another ReplicaOf command diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ca80a108c..d8b35dc2c 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -49,19 +49,14 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) { if (now >= last_log_time + 10) { last_log_time = now; EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); - string msg = - StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed, - ", runnable:", info.tx_runnable, ", total locks: ", info.total_locks, - ", contended locks: ", info.contended_locks, "\n"); - absl::StrAppend(&msg, "max contention score: ", info.max_contention_score, - ", lock: ", info.max_contention_lock, - ", poll_executions:", shard->stats().poll_execution_total); + string msg = StrCat("TxQueue is too long. ", info.Format()); + absl::StrAppend(&msg, "poll_executions:", shard->stats().poll_execution_total); + const Transaction* cont_tx = shard->GetContTx(); if (cont_tx) { absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(shard->shard_id()), " ", cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : ""); } - absl::StrAppend(&msg, "\nTxQueue head debug info ", info.head.debug_id_info); LOG(WARNING) << msg; }