mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
opt(server): Short-circuit ExecuteAsync(). (#1601)
* feat(server): Short-circuit ExecuteAsync(). * Do not leak (hopefully :) * Add documentation about coordinator_index_ * Use ServerState::tlocal()->thread_index()
This commit is contained in:
parent
4c85d5825d
commit
fba0800081
2 changed files with 15 additions and 6 deletions
|
@ -136,7 +136,7 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
|
|||
sd.arg_count = si.args.size();
|
||||
sd.arg_start = args_.size();
|
||||
|
||||
// Multi transactions can re-intitialize on different shards, so clear ACTIVE flag.
|
||||
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
|
||||
if (multi_)
|
||||
sd.local_mask &= ~ACTIVE;
|
||||
|
||||
|
@ -212,7 +212,7 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
|
|||
/**
|
||||
*
|
||||
* There are 4 options that we consider here:
|
||||
* a. T spans a single shard and its not multi.
|
||||
* a. T spans a single shard and it's not multi.
|
||||
* unique_shard_id_ is predefined before the schedule() is called.
|
||||
* In that case only a single thread will be scheduled and it will use shard_data[0] just because
|
||||
* shard_data.size() = 1. Coordinator thread can access any data because there is a
|
||||
|
@ -828,7 +828,7 @@ void Transaction::ExecuteAsync() {
|
|||
// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
|
||||
// executed by the engine shard once it has been armed and coordinator thread will finish the
|
||||
// transaction before engine shard thread stops accessing it. Therefore, we increase reference
|
||||
// by number of callbacks accessesing 'this' to allow callbacks to execute shard->Execute(this);
|
||||
// by number of callbacks accessing 'this' to allow callbacks to execute shard->Execute(this);
|
||||
// safely.
|
||||
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
|
||||
|
||||
|
@ -844,6 +844,15 @@ void Transaction::ExecuteAsync() {
|
|||
// IsArmedInShard in other threads.
|
||||
run_count_.store(unique_shard_cnt_, memory_order_release);
|
||||
|
||||
// Execute inline when we can. We can't use coordinator_index_ because we may offload this
|
||||
// function to run in a different thread.
|
||||
if (unique_shard_cnt_ == 1 && ServerState::tlocal()->thread_index() == unique_shard_id_) {
|
||||
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
|
||||
EngineShard::tlocal()->PollExecution("exec_cb", this);
|
||||
intrusive_ptr_release(this); // against use_count_.fetch_add above.
|
||||
return;
|
||||
}
|
||||
|
||||
// We verify seq lock has the same generation number. See below for more info.
|
||||
auto cb = [seq, this] {
|
||||
EngineShard* shard = EngineShard::tlocal();
|
||||
|
@ -856,7 +865,7 @@ void Transaction::ExecuteAsync() {
|
|||
DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") "
|
||||
<< run_count_.load(memory_order_relaxed);
|
||||
|
||||
// We also make sure that for mult-operation transactions like Multi/Eval
|
||||
// We also make sure that for multi-operation transactions like Multi/Eval
|
||||
// this callback runs on a correct operation. We want to avoid a situation
|
||||
// where the first operation is executed and the second operation is armed and
|
||||
// now this callback from the previous operation finally runs and calls PollExecution.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue