chore(transaction): Clean up scheduling code (#2422)

* chore(transction): Clean scheduling code
This commit is contained in:
Vladislav 2024-01-17 17:33:48 +03:00 committed by GitHub
parent 9f3b118b87
commit bf89c7eac2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 75 additions and 80 deletions

View file

@ -308,6 +308,10 @@ class DbSlice {
return db_arr_[id].get();
}
const DbTable* GetDBTable(DbIndex id) const {
return db_arr_[id].get();
}
std::pair<PrimeTable*, ExpireTable*> GetTables(DbIndex id) {
return std::pair<PrimeTable*, ExpireTable*>(&db_arr_[id]->prime, &db_arr_[id]->expire);
}

View file

@ -165,7 +165,7 @@ class RoundRobinSharder {
static Mutex mutex_;
};
bool HasContendedLocks(unsigned shard_id, Transaction* trx, DbTable* table) {
bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) {
bool has_contended_locks = false;
if (trx->IsMulti()) {
@ -713,7 +713,7 @@ void EngineShard::TEST_EnableHeartbeat() {
});
}
auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo {
auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo {
const TxQueue* queue = txq();
ShardId sid = shard_id();
@ -742,7 +742,7 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo {
if (trx->IsGlobal() || (trx->IsMulti() && trx->GetMultiMode() == Transaction::GLOBAL)) {
info.tx_global++;
} else {
DbTable* table = db_slice().GetDBTable(trx->GetDbIndex());
const DbTable* table = db_slice().GetDBTable(trx->GetDbIndex());
bool can_run = !HasContendedLocks(sid, trx, table);
if (can_run) {
info.tx_runnable++;
@ -754,7 +754,7 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo {
// Analyze locks
for (unsigned i = 0; i <= max_db_id; ++i) {
DbTable* table = db_slice().GetDBTable(i);
const DbTable* table = db_slice().GetDBTable(i);
if (table == nullptr)
continue;

View file

@ -86,6 +86,10 @@ class EngineShard {
return &txq_;
}
const TxQueue* txq() const {
return &txq_;
}
TxId committed_txid() const {
return committed_txid_;
}
@ -174,7 +178,7 @@ class EngineShard {
std::string max_contention_lock_name;
};
TxQueueInfo AnalyzeTxQueue();
TxQueueInfo AnalyzeTxQueue() const;
private:
struct DefragTaskState {

View file

@ -31,6 +31,48 @@ atomic_uint64_t op_seq{1};
constexpr size_t kTransSize [[maybe_unused]] = sizeof(Transaction);
void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) {
unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len);
if (txq->size() > q_limit) {
static thread_local time_t last_log_time = 0;
// TODO: glog provides LOG_EVERY_T, which uses precise clock.
// We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and
// uses low precision clock.
time_t now = time(nullptr);
if (now >= last_log_time + 10) {
last_log_time = now;
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
string msg =
StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed,
", runnable:", info.tx_runnable, ", total locks: ", info.total_locks,
", contended locks: ", info.contended_locks, "\n");
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
", lock: ", info.max_contention_lock_name,
", poll_executions:", shard->stats().poll_execution_total);
const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",
cont_tx->IsArmedInShard(shard->shard_id()) ? " armed" : "");
}
LOG(WARNING) << msg;
}
}
}
void RecordTxScheduleStats(const Transaction* tx) {
auto* ss = ServerState::tlocal();
DCHECK(ss);
ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]++;
if (tx->IsGlobal()) {
ss->stats.tx_type_cnt[ServerState::GLOBAL]++;
} else if (tx->IsOOO()) {
ss->stats.tx_type_cnt[ServerState::OOO]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
}
} // namespace
IntentLock::Mode Transaction::LockMode() const {
@ -464,10 +506,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
IntentLock::Mode mode = LockMode();
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
if (txq_ooo) {
DCHECK(sd.local_mask & OUT_OF_ORDER);
}
DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER));
/*************************************************************************/
// Actually running the callback.
@ -577,36 +616,21 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
return !is_concluding;
}
// TODO: For multi-transactions we should be able to deduce mode() at run-time based
// on the context. For regular multi-transactions we can actually inspect all commands.
// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or
// auto-tune based on the static analysis (by identifying commands with hardcoded command names).
void Transaction::ScheduleInternal() {
DCHECK(!shard_data_.empty());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & (COORD_SCHED | COORD_OOO));
DCHECK_GT(unique_shard_cnt_, 0u);
bool span_all = IsGlobal();
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards";
uint32_t num_shards;
std::function<bool(uint32_t)> is_active;
// TODO: For multi-transactions we should be able to deduce mode() at run-time based
// on the context. For regular multi-transactions we can actually inspect all commands.
// For eval-like transactions - we can decided based on the command flavor (EVAL/EVALRO) or
// auto-tune based on the static analysis (by identifying commands with hardcoded command names).
if (span_all) {
is_active = [](uint32_t) { return true; };
num_shards = shard_set->size();
} else {
num_shards = unique_shard_cnt_;
DCHECK_GT(num_shards, 0u);
is_active = [&](uint32_t i) {
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].local_mask & ACTIVE;
};
}
auto is_active = [this](uint32_t i) { return IsActive(i); };
// Loop until successfully scheduled in all shards.
ServerState* ss = ServerState::tlocal();
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << num_shards << " shards";
DCHECK(ss);
while (true) {
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
time_now_ms_ = GetCurrentTimeMs();
@ -621,26 +645,16 @@ void Transaction::ScheduleInternal() {
};
shard_set->RunBriefInParallel(std::move(cb), is_active);
if (success.load(memory_order_acquire) == num_shards) {
if (success.load(memory_order_acquire) == unique_shard_cnt_) {
coordinator_state_ |= COORD_SCHED;
bool ooo_disabled = IsAtomicMulti() && multi_->mode != LOCK_AHEAD;
DCHECK_GT(num_shards, 0u);
ss->stats.tx_width_freq_arr[num_shards - 1]++;
if (IsGlobal()) {
ss->stats.tx_type_cnt[ServerState::GLOBAL]++;
} else if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
// If we granted all locks, we can run out of order.
coordinator_state_ |= COORD_OOO;
ss->stats.tx_type_cnt[ServerState::OOO]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
if (lock_granted_cnt.load(memory_order_relaxed) == unique_shard_cnt_) {
coordinator_state_ |= COORD_OOO; // If we granted all locks, we can run out of order.
}
RecordTxScheduleStats(this);
VLOG(2) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO)
<< " num_shards: " << num_shards;
<< " num_shards: " << unique_shard_cnt_;
break;
}
@ -1150,33 +1164,8 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
TxQueue::Iterator it = txq->Insert(this);
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
sd.pq_pos = it;
unsigned q_limit = absl::GetFlag(FLAGS_tx_queue_warning_len);
if (txq->size() > q_limit) {
static thread_local time_t last_log_time = 0;
// TODO: glog provides LOG_EVERY_T, which uses precise clock.
// We should introduce inside helio LOG_PERIOD_ATLEAST macro that takes seconds and
// uses low precision clock.
time_t now = time(nullptr);
if (now >= last_log_time + 10) {
last_log_time = now;
EngineShard::TxQueueInfo info = shard->AnalyzeTxQueue();
string msg =
StrCat("TxQueue is too long. Tx count:", info.tx_total, ", armed:", info.tx_armed,
", runnable:", info.tx_runnable, ", total locks: ", info.total_locks,
", contended locks: ", info.contended_locks, "\n");
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
", lock: ", info.max_contention_lock_name,
", poll_executions:", shard->stats().poll_execution_total);
const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",
cont_tx->IsArmedInShard(sid) ? " armed" : "");
}
LOG(WARNING) << msg;
}
}
AnalyzeTxQueue(shard, txq);
DVLOG(1) << "Insert into tx-queue, sid(" << sid << ") " << DebugId() << ", qlen " << txq->size();
return {true, lock_granted};

View file

@ -157,7 +157,6 @@ class Transaction {
// State on specific shard.
enum LocalMask : uint16_t {
ACTIVE = 1, // Set on all active shards.
// UNUSED = 1 << 1,
OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard())
@ -252,11 +251,11 @@ class Transaction {
// Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
//! Returns true if the transaction spans this shard_id.
//! Runs from the coordinator thread.
// Returns true if the transaction spans this shard_id.
// Runs from the coordinator thread.
bool IsActive(ShardId shard_id) const {
return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id
: shard_data_[shard_id].arg_count > 0;
return unique_shard_cnt_ == 1 ? (unique_shard_id_ == shard_id)
: shard_data_[shard_id].local_mask & ACTIVE;
}
//! Returns true if the transaction is armed for execution on this sid (used to avoid
@ -420,7 +419,6 @@ class Transaction {
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_CONCLUDING = 1 << 1, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 2,
COORD_CANCELLED = 1 << 3,