diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 8477198f3..e1cd34f9c 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -575,7 +575,7 @@ void DebugCmd::Exec() { fb2::Mutex mu; std::map freq_cnt; - ess.pool()->Await([&](auto*) { + ess.pool()->AwaitFiberOnAll([&](auto*) { for (const auto& k_v : ServerState::tlocal()->exec_freq_count) { unique_lock lk(mu); freq_cnt[k_v.first] += k_v.second; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 577c9a9e9..45f086833 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -201,9 +201,12 @@ EngineShardSet* shard_set = nullptr; uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { + static_assert(sizeof(Stats) == 32); + defrag_attempt_total += o.defrag_attempt_total; defrag_realloc_total += o.defrag_realloc_total; defrag_task_invocation_total += o.defrag_task_invocation_total; + poll_execution_total += o.poll_execution_total; return *this; } @@ -441,9 +444,10 @@ void EngineShard::DestroyThreadLocal() { // Only runs in its own thread. void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(2) << "PollExecution " << context << " " << (trans ? trans->DebugId() : "") << " " - << txq_.size() << " " << continuation_trans_; + << txq_.size() << " " << (continuation_trans_ ? continuation_trans_->DebugId() : ""); ShardId sid = shard_id(); + stats_.poll_execution_total++; uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; if (trans_mask & Transaction::AWAKED_Q) { @@ -457,23 +461,27 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } + string dbg_id; + if (continuation_trans_) { if (trans == continuation_trans_) trans = nullptr; if (continuation_trans_->IsArmedInShard(sid)) { + if (VLOG_IS_ON(1)) { + dbg_id = continuation_trans_->DebugId(); + } bool to_keep = continuation_trans_->RunInShard(this, false); - DVLOG(1) << "RunContTrans: " << (continuation_trans_ ? continuation_trans_->DebugId() : "") - << " keep: " << to_keep; + DVLOG(1) << "RunContTrans: " << dbg_id << " keep: " << to_keep; if (!to_keep) { + // if this holds, we can remove this check altogether. + DCHECK(continuation_trans_ == nullptr); continuation_trans_ = nullptr; } } } Transaction* head = nullptr; - string dbg_id; - if (continuation_trans_ == nullptr) { while (!txq_.Empty()) { // we must check every iteration so that if the current transaction awakens @@ -701,25 +709,27 @@ void EngineShard::TEST_EnableHeartbeat() { auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { const TxQueue* queue = txq(); + ShardId sid = shard_id(); TxQueueInfo info; + if (queue->Empty()) return info; auto cur = queue->Head(); info.tx_total = queue->size(); unsigned max_db_id = 0; - ShardId sid = shard_id(); do { auto value = queue->At(cur); Transaction* trx = std::get(value); - // find maximum index of databases used by transactions if (trx->GetDbIndex() > max_db_id) { max_db_id = trx->GetDbIndex(); } - if (trx->IsArmedInShard(sid)) { + bool is_armed = trx->IsArmedInShard(sid); + DVLOG(1) << "Inspecting " << trx->DebugId() << " is_armed " << is_armed; + if (is_armed) { info.tx_armed++; if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 43b0894ec..d45eb42c6 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -40,7 +40,7 @@ class EngineShard { uint64_t defrag_attempt_total = 0; uint64_t defrag_realloc_total = 0; uint64_t defrag_task_invocation_total = 0; - + uint64_t poll_execution_total = 0; Stats& operator+=(const Stats&); }; diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 71c4bb8e6..752694065 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -239,6 +239,9 @@ bool MultiCommandSquasher::ExecuteSquashed() { } void MultiCommandSquasher::Run() { + DVLOG(1) << "Trying to squash " << cmds_.size() << " commands for transaction " + << cntx_->transaction->DebugId(); + for (auto& cmd : cmds_) { auto res = TrySquash(&cmd); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 3e35e8b5e..5f9b226c0 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -14,7 +14,7 @@ #include "server/journal/journal.h" #include "server/server_state.h" -ABSL_FLAG(uint32_t, tx_queue_warning_len, 30, +ABSL_FLAG(uint32_t, tx_queue_warning_len, 40, "Length threshold for warning about long transaction queue"); namespace dfly { @@ -33,7 +33,7 @@ constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction); } // namespace -IntentLock::Mode Transaction::Mode() const { +IntentLock::Mode Transaction::LockMode() const { return cid_->IsReadOnly() ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } @@ -166,7 +166,7 @@ void Transaction::RecordMultiLocks(const KeyIndex& key_index) { auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); }; - multi_->lock_mode.emplace(Mode()); + multi_->lock_mode.emplace(LockMode()); for (size_t i = key_index.start; i < key_index.end; i += key_index.step) lock_key(ArgS(full_args_, i)); if (key_index.bonus) @@ -362,6 +362,8 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { } void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) { + DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys"; + DCHECK(multi_); DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. @@ -383,7 +385,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads unique_shard_id_ = 0; - + multi_->cmd_seq_num++; unique_shard_cnt_ = 0; args_.clear(); @@ -411,8 +413,12 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { string Transaction::DebugId() const { DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); - - return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); + string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_); + if (multi_) { + absl::StrAppend(&res, ":", multi_->cmd_seq_num); + } + absl::StrAppend(&res, " (", trans_id(this), ")"); + return res; } void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) { @@ -449,7 +455,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING); bool tx_stop_runnig = is_concluding && !IsAtomicMulti(); - IntentLock::Mode mode = Mode(); + IntentLock::Mode mode = LockMode(); DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL)); @@ -516,7 +522,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { if (IsGlobal()) { DCHECK(!awaked_prerun && !became_suspended); // Global transactions can not be blocking. VLOG(2) << "Releasing shard lock"; - shard->shard_lock()->Release(Mode()); + shard->shard_lock()->Release(LockMode()); } else { // not global. largs = GetLockArgs(idx); DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); @@ -585,6 +591,7 @@ void Transaction::ScheduleInternal() { // Loop until successfully scheduled in all shards. ServerState* ss = ServerState::tlocal(); + DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << num_shards << " shards"; DCHECK(ss); while (true) { txid_ = op_seq.fetch_add(1, memory_order_relaxed); @@ -827,9 +834,9 @@ void Transaction::Execute(RunnableType cb, bool conclude) { ExecuteAsync(); - DVLOG(1) << "Wait on Exec " << DebugId(); + DVLOG(1) << "Execute::WaitForCbs " << DebugId(); WaitForShardCallbacks(); - DVLOG(1) << "Wait on Exec " << DebugId() << " completed"; + DVLOG(1) << "Execute::WaitForCbs " << DebugId() << " completed"; cb_ptr_ = nullptr; } @@ -1023,7 +1030,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); DCHECK_NE(unique_shard_id_, kInvalidSid); - auto mode = Mode(); + auto mode = LockMode(); auto lock_args = GetLockArgs(shard->shard_id()); auto& sd = shard_data_[SidToId(unique_shard_id_)]; @@ -1045,8 +1052,12 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { shard->db_slice().Acquire(mode, lock_args); sd.local_mask |= KEYLOCK_ACQUIRED; - DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); + DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size(); + // If there are blocked transactons waiting for this tx keys, we will add this transaction + // to the tx-queue (these keys will be contended). This will happen even if the queue was empty. + // In that case we must poll the queue, because there will be no other callback trigerring the + // queue before us. shard->PollExecution("schedule_unique", nullptr); return false; @@ -1062,7 +1073,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { TxQueue* txq = shard->txq(); KeyLockArgs lock_args; - IntentLock::Mode mode = Mode(); + IntentLock::Mode mode = LockMode(); bool lock_granted = false; ShardId sid = SidToId(shard->shard_id()); @@ -1112,12 +1123,20 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { if (now >= last_log_time + 10) { last_log_time = now; EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue(); - LOG(WARNING) << "TxQueue is too long. Tx count:" << info.tx_total - << ", armed:" << info.tx_armed << ", runnable:" << info.tx_runnable - << ", total locks: " << info.total_locks - << ", contended locks: " << info.contended_locks << "\n" - << "max contention score: " << info.max_contention_score - << ", lock: " << info.max_contention_lock_name; + string msg = + StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed, + ", runnable:", info.tx_runnable, ", total locks: ", info.total_locks, + ", contended locks: ", info.contended_locks, "\n"); + absl::StrAppend(&msg, "max contention score: ", info.max_contention_score, + ", lock: ", info.max_contention_lock_name, + "poll_executions:", shard->stats().poll_execution_total); + const Transaction* cont_tx = shard->GetContTx(); + if (cont_tx) { + absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ", + cont_tx->IsArmedInShard(sid) ? " armed" : ""); + } + + LOG(WARNING) << msg; } } DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size(); @@ -1143,14 +1162,14 @@ bool Transaction::CancelShardCb(EngineShard* shard) { txq->Remove(pos); if (sd.local_mask & KEYLOCK_ACQUIRED) { - auto mode = Mode(); + auto mode = LockMode(); auto lock_args = GetLockArgs(shard->shard_id()); DCHECK(lock_args.args.size() > 0); shard->db_slice().Release(mode, lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } if (IsGlobal()) { - shard->shard_lock()->Release(Mode()); + shard->shard_lock()->Release(LockMode()); } if (pos == head && !txq->Empty()) { @@ -1257,7 +1276,7 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) { void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { auto lock_args = GetLockArgs(shard->shard_id()); - shard->db_slice().Release(Mode(), lock_args); + shard->db_slice().Release(LockMode(), lock_args); unsigned sd_idx = SidToId(shard->shard_id()); auto& sd = shard_data_[sd_idx]; diff --git a/src/server/transaction.h b/src/server/transaction.h index 403ab127d..479b56dfa 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -261,7 +261,7 @@ class Transaction { return txid_; } - IntentLock::Mode Mode() const; // Based on command mask + IntentLock::Mode LockMode() const; // Based on command mask std::string_view Name() const; // Based on command name @@ -387,6 +387,7 @@ class Transaction { // executing multi-command. For every write to a shard journal, the corresponding index in the // vector is marked as true. absl::InlinedVector shard_journal_write; + unsigned cmd_seq_num = 0; // used for debugging purposes. }; enum CoordinatorState : uint8_t {