chore: Add debug logs to help tracking transactional deadlocks (#4669)

* chore: reproduce a bug related to #4663

Add various debug logs to help tracking the deadlock.

Add more assertions in helio and provide state time for fibers
during stacktrace printings.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-04 16:33:13 +02:00 committed by GitHub
parent 028e08076a
commit 5625aa421d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 46 additions and 19 deletions

View file

@ -1,4 +1,4 @@
default_stages: [commit] default_stages: [pre-commit]
exclude: | exclude: |
(?x)( (?x)(
src/redis/.* | src/redis/.* |

View file

@ -898,6 +898,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
} }
void DbSlice::FlushDb(DbIndex db_ind) { void DbSlice::FlushDb(DbIndex db_ind) {
DVLOG(1) << "Flushing db " << db_ind;
// clear client tracking map. // clear client tracking map.
client_tracking_map_.clear(); client_tracking_map_.clear();

View file

@ -1066,12 +1066,7 @@ void DebugCmd::TxAnalysis(facade::SinkReplyBuilder* builder) {
string result; string result;
for (unsigned i = 0; i < shard_set->size(); ++i) { for (unsigned i = 0; i < shard_set->size(); ++i) {
const auto& info = shard_info[i]; const auto& info = shard_info[i];
StrAppend(&result, "shard", i, ":\n", " tx armed ", info.tx_armed, ", total: ", info.tx_total, StrAppend(&result, "shard", i, ":\n", info.Format(), "\n");
",global:", info.tx_global, ",runnable:", info.tx_runnable, "\n");
StrAppend(&result, " locks total:", info.total_locks, ",contended:", info.contended_locks,
"\n");
StrAppend(&result, " max contention score: ", info.max_contention_score,
",lock_name:", info.max_contention_lock, "\n");
} }
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendVerbatimString(result); rb->SendVerbatimString(result);
@ -1114,7 +1109,14 @@ void DebugCmd::ObjHist(facade::SinkReplyBuilder* builder) {
void DebugCmd::Stacktrace(facade::SinkReplyBuilder* builder) { void DebugCmd::Stacktrace(facade::SinkReplyBuilder* builder) {
fb2::Mutex m; fb2::Mutex m;
shard_set->pool()->AwaitFiberOnAll([&m](unsigned index, ProactorBase* base) { shard_set->pool()->AwaitFiberOnAll([&m](unsigned index, ProactorBase* base) {
EngineShard* es = EngineShard::tlocal();
string txq;
if (es) {
EngineShard::TxQueueInfo txq_info = es->AnalyzeTxQueue();
txq = txq_info.Format();
}
std::unique_lock lk(m); std::unique_lock lk(m);
LOG_IF(INFO, !txq.empty()) << "Shard" << index << ": " << txq;
fb2::detail::FiberInterface::PrintAllFiberStackTraces(); fb2::detail::FiberInterface::PrintAllFiberStackTraces();
}); });
base::FlushLogs(); base::FlushLogs();

View file

@ -5,6 +5,7 @@
#include "server/engine_shard.h" #include "server/engine_shard.h"
#include <absl/strings/match.h> #include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include "base/flags.h" #include "base/flags.h"
#include "io/proc_reader.h" #include "io/proc_reader.h"
@ -276,6 +277,25 @@ ShardId Shard(string_view v, ShardId shard_num) {
return hash % shard_num; return hash % shard_num;
} }
string EngineShard::TxQueueInfo::Format() const {
string res;
if (tx_total > 0) {
absl::StrAppend(&res, "tx armed ", tx_armed, ", total: ", tx_total, ",global:", tx_global,
",runnable:", tx_runnable, "\n");
absl::StrAppend(&res, ", head: ", head.debug_id_info, "\n");
}
if (total_locks > 0) {
absl::StrAppend(&res, "locks total:", total_locks, ",contended:", contended_locks, "\n");
}
if (max_contention_score > 0) {
absl::StrAppend(&res, "max contention score: ", max_contention_score,
", lock: ", max_contention_lock, "\n");
}
return res;
}
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
static_assert(sizeof(Stats) == 64); static_assert(sizeof(Stats) == 64);
@ -706,7 +726,7 @@ void EngineShard::RemoveContTx(Transaction* tx) {
} }
void EngineShard::Heartbeat() { void EngineShard::Heartbeat() {
DVLOG(2) << " Hearbeat"; DVLOG(3) << " Hearbeat";
DCHECK(namespaces); DCHECK(namespaces);
CacheStats(); CacheStats();

View file

@ -181,6 +181,8 @@ class EngineShard {
// We can use a vector to hold debug info for all items in the txqueue // We can use a vector to hold debug info for all items in the txqueue
TxQueueItem head; TxQueueItem head;
std::string Format() const;
}; };
TxQueueInfo AnalyzeTxQueue() const; TxQueueInfo AnalyzeTxQueue() const;

View file

@ -347,6 +347,7 @@ bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types)
error_code ProtocolClient::SendCommand(string_view command) { error_code ProtocolClient::SendCommand(string_view command) {
string formatted_command = RedisReplyBuilderBase::SerializeCommand(command); string formatted_command = RedisReplyBuilderBase::SerializeCommand(command);
DCHECK(sock_->proactor() == ProactorBase::me());
auto ec = sock_->Write(io::Buffer(formatted_command)); auto ec = sock_->Write(io::Buffer(formatted_command));
if (!ec) if (!ec)
TouchIoTime(); TouchIoTime();

View file

@ -87,7 +87,7 @@ Replica::~Replica() {
static const char kConnErr[] = "could not connect to master: "; static const char kConnErr[] = "could not connect to master: ";
GenericError Replica::Start() { GenericError Replica::Start() {
VLOG(1) << "Starting replication"; VLOG(1) << "Starting replication " << this;
ProactorBase* mythread = ProactorBase::me(); ProactorBase* mythread = ProactorBase::me();
CHECK(mythread); CHECK(mythread);
@ -138,7 +138,7 @@ void Replica::EnableReplication(facade::SinkReplyBuilder* builder) {
} }
void Replica::Stop() { void Replica::Stop() {
VLOG(1) << "Stopping replication"; VLOG(1) << "Stopping replication " << this;
// Stops the loop in MainReplicationFb. // Stops the loop in MainReplicationFb.
proactor_->Await([this] { proactor_->Await([this] {
@ -149,6 +149,7 @@ void Replica::Stop() {
// Make sure the replica fully stopped and did all cleanup, // Make sure the replica fully stopped and did all cleanup,
// so we can freely release resources (connections). // so we can freely release resources (connections).
sync_fb_.JoinIfNeeded(); sync_fb_.JoinIfNeeded();
DVLOG(1) << "MainReplicationFb stopped " << this;
acks_fb_.JoinIfNeeded(); acks_fb_.JoinIfNeeded();
for (auto& flow : shard_flows_) { for (auto& flow : shard_flows_) {
flow.reset(); flow.reset();
@ -183,7 +184,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
} }
void Replica::MainReplicationFb() { void Replica::MainReplicationFb() {
VLOG(1) << "Main replication fiber started"; VLOG(1) << "Main replication fiber started " << this;
// Switch shard states to replication. // Switch shard states to replication.
SetShardStates(true); SetShardStates(true);
@ -546,11 +547,14 @@ error_code Replica::InitiateDflySync() {
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0); std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0);
if (num_full_flows == num_df_flows) { if (num_full_flows == num_df_flows) {
DVLOG(1) << "Calling Flush on all slots " << this;
if (slot_range_.has_value()) { if (slot_range_.has_value()) {
JournalExecutor{&service_}.FlushSlots(slot_range_.value()); JournalExecutor{&service_}.FlushSlots(slot_range_.value());
} else { } else {
JournalExecutor{&service_}.FlushAll(); JournalExecutor{&service_}.FlushAll();
} }
DVLOG(1) << "Flush on all slots ended " << this;
} else if (num_full_flows == 0) { } else if (num_full_flows == 0) {
sync_type = "partial"; sync_type = "partial";
} else { } else {

View file

@ -2903,6 +2903,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
// If the replication attempt failed, clean up global state. The replica should have stopped // If the replication attempt failed, clean up global state. The replica should have stopped
// internally. // internally.
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
// If there was an error above during Start we must not start the main replication fiber. // If there was an error above during Start we must not start the main replication fiber.
// However, it could be the case that Start() above connected succefully and by the time // However, it could be the case that Start() above connected succefully and by the time
// we acquire the lock, the context got cancelled because another ReplicaOf command // we acquire the lock, the context got cancelled because another ReplicaOf command

View file

@ -49,19 +49,14 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) {
if (now >= last_log_time + 10) { if (now >= last_log_time + 10) {
last_log_time = now; last_log_time = now;
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
string msg = string msg = StrCat("TxQueue is too long. ", info.Format());
StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed, absl::StrAppend(&msg, "poll_executions:", shard->stats().poll_execution_total);
", runnable:", info.tx_runnable, ", total locks: ", info.total_locks,
", contended locks: ", info.contended_locks, "\n");
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
", lock: ", info.max_contention_lock,
", poll_executions:", shard->stats().poll_execution_total);
const Transaction* cont_tx = shard->GetContTx(); const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) { if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(shard->shard_id()), " ", absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(shard->shard_id()), " ",
cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : ""); cont_tx->DEBUG_IsArmedInShard(shard->shard_id()) ? " armed" : "");
} }
absl::StrAppend(&msg, "\nTxQueue head debug info ", info.head.debug_id_info);
LOG(WARNING) << msg; LOG(WARNING) << msg;
} }