diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 4699ddf2e..5321950da 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -45,7 +45,7 @@ void BlockingControllerTest::SetUp() { shard_set = new EngineShardSet(pp_.get()); shard_set->Init(kNumThreads, false); - trans_.reset(new Transaction{&cid_, 0}); + trans_.reset(new Transaction{&cid_}); str_vec_.assign({"blpop", "x", "z", "0"}); for (auto& s : str_vec_) { diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index ad44fa8b2..7d5eb6a00 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -224,8 +224,7 @@ void DebugCmd::Load(string_view filename) { }; const CommandId* cid = sf_.service().FindCmd("FLUSHALL"); - intrusive_ptr flush_trans( - new Transaction{cid, ServerState::tlocal()->thread_index()}); + intrusive_ptr flush_trans(new Transaction{cid}); flush_trans->InitByArgs(0, {}); VLOG(1) << "Performing flush"; error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b278f7c6e..715c81765 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -943,7 +943,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) DCHECK(dfly_cntx->transaction == nullptr); if (cid->IsTransactional()) { - dist_trans.reset(new Transaction{cid, etl.thread_index()}); + dist_trans.reset(new Transaction{cid}); if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode. if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args_no_cmd); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 9b8609731..b5a96e0b1 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -41,7 +41,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar if (IsAtomic()) { sinfo.local_tx = new Transaction{cntx_->transaction}; } else { - sinfo.local_tx = new Transaction{cntx_->transaction->GetCId(), sid}; + sinfo.local_tx = new Transaction{cntx_->transaction->GetCId()}; sinfo.local_tx->StartMultiNonAtomic(); } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 92a680702..fe311ccdf 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1110,8 +1110,7 @@ GenericError DoPartialSave(fs::path full_filename, const dfly::StringVec& script GenericError ServerFamily::DoSave() { const CommandId* cid = service().FindCmd("SAVE"); CHECK_NOTNULL(cid); - boost::intrusive_ptr trans( - new Transaction{cid, ServerState::tlocal()->thread_index()}); + boost::intrusive_ptr trans(new Transaction{cid}); trans->InitByArgs(0, {}); return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get()); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0ec8495cb..2234ab8e1 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -41,8 +41,7 @@ IntentLock::Mode Transaction::Mode() const { * @param ess * @param cs */ -Transaction::Transaction(const CommandId* cid, uint32_t thread_index) - : cid_{cid}, coordinator_index_(thread_index) { +Transaction::Transaction(const CommandId* cid) : cid_{cid} { string_view cmd_name(cid_->name()); if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_.reset(new MultiData); @@ -695,7 +694,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } }; - if (coordinator_index_ == unique_shard_id_ && ServerState::tlocal()->AllowInlineScheduling()) { + if (auto* ss = ServerState::tlocal(); + ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { DVLOG(2) << "Inline scheduling a transaction"; schedule_cb(); } else { @@ -837,9 +837,9 @@ void Transaction::ExecuteAsync() { // IsArmedInShard in other threads. run_count_.store(unique_shard_cnt_, memory_order_release); - // Execute inline when we can. We can't use coordinator_index_ because we may offload this - // function to run in a different thread. - if (unique_shard_cnt_ == 1 && ServerState::tlocal()->thread_index() == unique_shard_id_) { + auto* ss = ServerState::tlocal(); + if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ && + ss->AllowInlineScheduling()) { DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId(); EngineShard::tlocal()->PollExecution("exec_cb", this); intrusive_ptr_release(this); // against use_count_.fetch_add above. diff --git a/src/server/transaction.h b/src/server/transaction.h index b8d17a9b7..54d1c59d9 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -138,7 +138,7 @@ class Transaction { }; public: - explicit Transaction(const CommandId* cid, uint32_t thread_index); + explicit Transaction(const CommandId* cid); explicit Transaction(const Transaction* parent); @@ -560,7 +560,6 @@ class Transaction { // they read this variable the coordinator thread is stalled and can not cause data races. // If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX. uint8_t coordinator_state_ = 0; - uint32_t coordinator_index_; // thread_index of the coordinator thread. // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. OpStatus local_result_ = OpStatus::OK;