mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
bug(server): global command stalls on server load with pipeline mode (#1909)
* bug(server): global command stalls on server load with pipeline mode fixes #1797 the bug: global command is not able to schedule into txq when high load pipelined commands. Only after the load finish the global transaction gets scheduled into the txq. The reason for this is when we start a global transaction we set the shard lock and all the transactions start to enter the txq. They compete with the global tx on the order they are inserted into the queue to preserve transaction atomicity. Because the global tx needs to be inserted to all shard queues its chance to schedule with order with all the other transactions is low. the solution: lock the global transaction inside the schedule in shard, locking closer to scheduling decreases the number of transactions in the queue and the competition on ordering correctly has higher chance now. Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
08ed830d27
commit
36ac31427d
4 changed files with 13 additions and 7 deletions
|
@ -1225,6 +1225,7 @@ Metrics ServerFamily::GetMetrics() const {
|
|||
result.ooo_tx_transaction_cnt += ss->stats.ooo_tx_cnt;
|
||||
result.eval_io_coordination_cnt += ss->stats.eval_io_coordination_cnt;
|
||||
result.eval_shardlocal_coordination_cnt += ss->stats.eval_shardlocal_coordination_cnt;
|
||||
result.tx_schedule_cancel_cnt += ss->stats.tx_schedule_cancel_cnt;
|
||||
|
||||
service_.mutable_registry()->MergeCallStats(
|
||||
index, [&dest_map = result.cmd_stats_map](string_view name, const CmdCallStats& src) {
|
||||
|
@ -1400,6 +1401,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
|
||||
append("eval_io_coordination_total", m.eval_io_coordination_cnt);
|
||||
append("eval_shardlocal_coordination_total", m.eval_shardlocal_coordination_cnt);
|
||||
append("tx_schedule_cancel_total", m.tx_schedule_cancel_cnt);
|
||||
}
|
||||
|
||||
if (should_enter("TIERED", true)) {
|
||||
|
|
|
@ -67,6 +67,7 @@ struct Metrics {
|
|||
uint64_t ooo_tx_transaction_cnt = 0;
|
||||
uint64_t eval_io_coordination_cnt = 0;
|
||||
uint64_t eval_shardlocal_coordination_cnt = 0;
|
||||
uint64_t tx_schedule_cancel_cnt = 0;
|
||||
uint32_t traverse_ttl_per_sec = 0;
|
||||
uint32_t delete_ttl_per_sec = 0;
|
||||
bool is_master = true;
|
||||
|
|
|
@ -94,6 +94,7 @@ class ServerState { // public struct - to allow initialization.
|
|||
uint64_t ooo_tx_cnt = 0;
|
||||
uint64_t eval_io_coordination_cnt = 0;
|
||||
uint64_t eval_shardlocal_coordination_cnt = 0;
|
||||
uint64_t tx_schedule_cancel_cnt = 0;
|
||||
};
|
||||
|
||||
static ServerState* tlocal() {
|
||||
|
|
|
@ -551,16 +551,9 @@ void Transaction::ScheduleInternal() {
|
|||
// on the context. For regular multi-transactions we can actually inspect all commands.
|
||||
// For eval-like transactions - we can decided based on the command flavor (EVAL/EVALRO) or
|
||||
// auto-tune based on the static analysis (by identifying commands with hardcoded command names).
|
||||
IntentLock::Mode mode = Mode();
|
||||
|
||||
if (span_all) {
|
||||
is_active = [](uint32_t) { return true; };
|
||||
num_shards = shard_set->size();
|
||||
|
||||
// Lock shards
|
||||
auto cb = [mode](EngineShard* shard) { shard->shard_lock()->Acquire(mode); };
|
||||
shard_set->RunBriefInParallel(std::move(cb));
|
||||
VLOG(1) << "Global shard lock acquired";
|
||||
} else {
|
||||
num_shards = unique_shard_cnt_;
|
||||
DCHECK_GT(num_shards, 0u);
|
||||
|
@ -601,6 +594,7 @@ void Transaction::ScheduleInternal() {
|
|||
}
|
||||
|
||||
VLOG(2) << "Cancelling " << DebugId();
|
||||
ServerState::tlocal()->stats.tx_schedule_cancel_cnt += 1;
|
||||
|
||||
atomic_bool should_poll_execution{false};
|
||||
auto cancel = [&](EngineShard* shard) {
|
||||
|
@ -1047,6 +1041,11 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
|||
return {false, false};
|
||||
}
|
||||
|
||||
if (IsGlobal()) {
|
||||
shard->shard_lock()->Acquire(mode);
|
||||
VLOG(1) << "Global shard lock acquired";
|
||||
}
|
||||
|
||||
TxQueue::Iterator it = txq->Insert(this);
|
||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||
sd.pq_pos = it;
|
||||
|
@ -1080,6 +1079,9 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
|
|||
shard->db_slice().Release(mode, lock_args);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
}
|
||||
if (IsGlobal()) {
|
||||
shard->shard_lock()->Release(Mode());
|
||||
}
|
||||
|
||||
if (pos == head && !txq->Empty()) {
|
||||
return true;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue