diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 3e374b643..8cc4caca4 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1633,10 +1633,10 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret }); ++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt; - boost::intrusive_ptr stub_tx = new Transaction{tx}; + boost::intrusive_ptr stub_tx = new Transaction{tx, *sid}; cntx->transaction = stub_tx.get(); - tx->PrepareMultiForScheduleSingleHop(*sid, 0, args); + tx->PrepareMultiForScheduleSingleHop(*sid, tx->GetDbIndex(), args); tx->ScheduleSingleHop([&](Transaction*, EngineShard*) { result = interpreter->RunFunction(eval_args.sha, &error); return OpStatus::OK; diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index bb4fbee0f..5550007f9 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -53,7 +53,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar auto& sinfo = sharded_[sid]; if (!sinfo.local_tx) { if (IsAtomic()) { - sinfo.local_tx = new Transaction{cntx_->transaction}; + sinfo.local_tx = new Transaction{cntx_->transaction, sid}; } else { sinfo.local_tx = new Transaction{base_cid_}; sinfo.local_tx->StartMultiNonAtomic(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 903d22be4..b3b4972a0 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -52,8 +52,11 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { } } -Transaction::Transaction(const Transaction* parent) - : multi_{make_unique()}, txid_{parent->txid()} { +Transaction::Transaction(const Transaction* parent, ShardId shard_id) + : multi_{make_unique()}, + txid_{parent->txid()}, + unique_shard_cnt_{1}, + unique_shard_id_{shard_id} { if (parent->multi_) { multi_->mode = parent->multi_->mode; } else { @@ -235,7 +238,9 @@ void Transaction::InitByKeys(KeyIndex key_index) { bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; // Stub transactions always operate only on single shard. - if ((key_index.HasSingleKey() && !IsAtomicMulti()) || (multi_ && multi_->role == SQUASHED_STUB)) { + bool is_stub = multi_ && multi_->role == SQUASHED_STUB; + + if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) { DCHECK_GT(key_index.step, 0u); // We don't have to split the arguments by shards, so we can copy them directly. StoreKeysInArgs(key_index, needs_reverse_mapping); @@ -246,7 +251,10 @@ void Transaction::InitByKeys(KeyIndex key_index) { shard_data_.front().local_mask |= ACTIVE; unique_shard_cnt_ = 1; - unique_shard_id_ = Shard(args_.front(), shard_set->size()); // TODO: Squashed bug + if (is_stub) // stub transactions don't migrate + DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size())); + else + unique_shard_id_ = Shard(args_.front(), shard_set->size()); return; } @@ -381,7 +389,9 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); DCHECK(!cb_ptr_); - unique_shard_id_ = 0; + if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads + unique_shard_id_ = 0; + unique_shard_cnt_ = 0; args_.clear(); cid_ = cid; diff --git a/src/server/transaction.h b/src/server/transaction.h index e26a37352..ac63ad301 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -145,7 +145,8 @@ class Transaction { public: explicit Transaction(const CommandId* cid); - explicit Transaction(const Transaction* parent); + // Initialize transaction for squashing placed on a specific shard with a given parent tx + explicit Transaction(const Transaction* parent, ShardId shard_id); // Initialize from command (args) on specific db. OpStatus InitByArgs(DbIndex index, CmdArgList args);