diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 11339f3f2..1b75cd95d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -136,7 +136,7 @@ void Transaction::InitShardData(absl::Span shard_index, siz sd.arg_count = si.args.size(); sd.arg_start = args_.size(); - // Multi transactions can re-intitialize on different shards, so clear ACTIVE flag. + // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. if (multi_) sd.local_mask &= ~ACTIVE; @@ -212,7 +212,7 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { /** * * There are 4 options that we consider here: - * a. T spans a single shard and its not multi. + * a. T spans a single shard and it's not multi. * unique_shard_id_ is predefined before the schedule() is called. * In that case only a single thread will be scheduled and it will use shard_data[0] just because * shard_data.size() = 1. Coordinator thread can access any data because there is a @@ -828,7 +828,7 @@ void Transaction::ExecuteAsync() { // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be // executed by the engine shard once it has been armed and coordinator thread will finish the // transaction before engine shard thread stops accessing it. Therefore, we increase reference - // by number of callbacks accessesing 'this' to allow callbacks to execute shard->Execute(this); + // by number of callbacks accessing 'this' to allow callbacks to execute shard->Execute(this); // safely. use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); @@ -844,6 +844,15 @@ void Transaction::ExecuteAsync() { // IsArmedInShard in other threads. run_count_.store(unique_shard_cnt_, memory_order_release); + // Execute inline when we can. We can't use coordinator_index_ because we may offload this + // function to run in a different thread. + if (unique_shard_cnt_ == 1 && ServerState::tlocal()->thread_index() == unique_shard_id_) { + DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId(); + EngineShard::tlocal()->PollExecution("exec_cb", this); + intrusive_ptr_release(this); // against use_count_.fetch_add above. + return; + } + // We verify seq lock has the same generation number. See below for more info. auto cb = [seq, this] { EngineShard* shard = EngineShard::tlocal(); @@ -856,7 +865,7 @@ void Transaction::ExecuteAsync() { DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " << run_count_.load(memory_order_relaxed); - // We also make sure that for mult-operation transactions like Multi/Eval + // We also make sure that for multi-operation transactions like Multi/Eval // this callback runs on a correct operation. We want to avoid a situation // where the first operation is executed and the second operation is armed and // now this callback from the previous operation finally runs and calls PollExecution. diff --git a/src/server/transaction.h b/src/server/transaction.h index 6bb5eabe3..24a0a99da 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -131,7 +131,7 @@ class Transaction { // UNUSED = 1 << 1, OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired - SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard()) + SUSPENDED_Q = 1 << 4, // Whether is suspended (by WatchInShard()) AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb @@ -333,7 +333,7 @@ class Transaction { // this is the only variable that is accessed by both shard and coordinator threads. std::atomic_bool is_armed{false}; - // We pad with some memory so that atomic loads won't cause false sharing betweem threads. + // We pad with some memory so that atomic loads won't cause false sharing between threads. char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline. uint32_t arg_start = 0; // Indices into args_ array.