fix "debug exec" command (#2354)

fix: fix "debug exec" command

It used mutex lock inside Await callback which is prohibited.

In addition, we improved loggings across the transaction code.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-01-01 18:29:20 +02:00 committed by GitHub
parent 4c2d37f3c6
commit fc1a70598d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 66 additions and 33 deletions

View file

@ -14,7 +14,7 @@
#include "server/journal/journal.h"
#include "server/server_state.h"
ABSL_FLAG(uint32_t, tx_queue_warning_len, 30,
ABSL_FLAG(uint32_t, tx_queue_warning_len, 40,
"Length threshold for warning about long transaction queue");
namespace dfly {
@ -33,7 +33,7 @@ constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction);
} // namespace
IntentLock::Mode Transaction::Mode() const {
IntentLock::Mode Transaction::LockMode() const {
return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
@ -166,7 +166,7 @@ void Transaction::RecordMultiLocks(const KeyIndex& key_index) {
auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); };
multi_->lock_mode.emplace(Mode());
multi_->lock_mode.emplace(LockMode());
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(full_args_, i));
if (key_index.bonus)
@ -362,6 +362,8 @@ void Transaction::StartMultiGlobal(DbIndex dbid) {
}
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) {
DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys";
DCHECK(multi_);
DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
@ -383,7 +385,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads
unique_shard_id_ = 0;
multi_->cmd_seq_num++;
unique_shard_cnt_ = 0;
args_.clear();
@ -411,8 +413,12 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
string Transaction::DebugId() const {
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_);
if (multi_) {
absl::StrAppend(&res, ":", multi_->cmd_seq_num);
}
absl::StrAppend(&res, " (", trans_id(this), ")");
return res;
}
void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) {
@ -449,7 +455,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING);
bool tx_stop_runnig = is_concluding && !IsAtomicMulti();
IntentLock::Mode mode = Mode();
IntentLock::Mode mode = LockMode();
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
@ -516,7 +522,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
if (IsGlobal()) {
DCHECK(!awaked_prerun && !became_suspended); // Global transactions can not be blocking.
VLOG(2) << "Releasing shard lock";
shard->shard_lock()->Release(Mode());
shard->shard_lock()->Release(LockMode());
} else { // not global.
largs = GetLockArgs(idx);
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
@ -585,6 +591,7 @@ void Transaction::ScheduleInternal() {
// Loop until successfully scheduled in all shards.
ServerState* ss = ServerState::tlocal();
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << num_shards << " shards";
DCHECK(ss);
while (true) {
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
@ -827,9 +834,9 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
ExecuteAsync();
DVLOG(1) << "Wait on Exec " << DebugId();
DVLOG(1) << "Execute::WaitForCbs " << DebugId();
WaitForShardCallbacks();
DVLOG(1) << "Wait on Exec " << DebugId() << " completed";
DVLOG(1) << "Execute::WaitForCbs " << DebugId() << " completed";
cb_ptr_ = nullptr;
}
@ -1023,7 +1030,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
auto mode = Mode();
auto mode = LockMode();
auto lock_args = GetLockArgs(shard->shard_id());
auto& sd = shard_data_[SidToId(unique_shard_id_)];
@ -1045,8 +1052,12 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
DVLOG(1) << "Rescheduling into TxQueue " << DebugId();
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
// If there are blocked transactons waiting for this tx keys, we will add this transaction
// to the tx-queue (these keys will be contended). This will happen even if the queue was empty.
// In that case we must poll the queue, because there will be no other callback trigerring the
// queue before us.
shard->PollExecution("schedule_unique", nullptr);
return false;
@ -1062,7 +1073,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
TxQueue* txq = shard->txq();
KeyLockArgs lock_args;
IntentLock::Mode mode = Mode();
IntentLock::Mode mode = LockMode();
bool lock_granted = false;
ShardId sid = SidToId(shard->shard_id());
@ -1112,12 +1123,20 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
if (now >= last_log_time + 10) {
last_log_time = now;
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
LOG(WARNING) << "TxQueue is too long. Tx count:" << info.tx_total
<< ", armed:" << info.tx_armed << ", runnable:" << info.tx_runnable
<< ", total locks: " << info.total_locks
<< ", contended locks: " << info.contended_locks << "\n"
<< "max contention score: " << info.max_contention_score
<< ", lock: " << info.max_contention_lock_name;
string msg =
StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed,
", runnable:", info.tx_runnable, ", total locks: ", info.total_locks,
", contended locks: ", info.contended_locks, "\n");
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
", lock: ", info.max_contention_lock_name,
"poll_executions:", shard->stats().poll_execution_total);
const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",
cont_tx->IsArmedInShard(sid) ? " armed" : "");
}
LOG(WARNING) << msg;
}
}
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size();
@ -1143,14 +1162,14 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
txq->Remove(pos);
if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = Mode();
auto mode = LockMode();
auto lock_args = GetLockArgs(shard->shard_id());
DCHECK(lock_args.args.size() > 0);
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
if (IsGlobal()) {
shard->shard_lock()->Release(Mode());
shard->shard_lock()->Release(LockMode());
}
if (pos == head && !txq->Empty()) {
@ -1257,7 +1276,7 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
auto lock_args = GetLockArgs(shard->shard_id());
shard->db_slice().Release(Mode(), lock_args);
shard->db_slice().Release(LockMode(), lock_args);
unsigned sd_idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sd_idx];