mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix(server): Don't recompute shard for squashed stub tx (#2017)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
6a75c6ddc5
commit
cb9a45f2a9
4 changed files with 20 additions and 9 deletions
|
@ -1633,10 +1633,10 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
});
|
||||
|
||||
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
|
||||
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx};
|
||||
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx, *sid};
|
||||
cntx->transaction = stub_tx.get();
|
||||
|
||||
tx->PrepareMultiForScheduleSingleHop(*sid, 0, args);
|
||||
tx->PrepareMultiForScheduleSingleHop(*sid, tx->GetDbIndex(), args);
|
||||
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
|
||||
result = interpreter->RunFunction(eval_args.sha, &error);
|
||||
return OpStatus::OK;
|
||||
|
|
|
@ -53,7 +53,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
|
|||
auto& sinfo = sharded_[sid];
|
||||
if (!sinfo.local_tx) {
|
||||
if (IsAtomic()) {
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction};
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction, sid};
|
||||
} else {
|
||||
sinfo.local_tx = new Transaction{base_cid_};
|
||||
sinfo.local_tx->StartMultiNonAtomic();
|
||||
|
|
|
@ -52,8 +52,11 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
|
|||
}
|
||||
}
|
||||
|
||||
Transaction::Transaction(const Transaction* parent)
|
||||
: multi_{make_unique<MultiData>()}, txid_{parent->txid()} {
|
||||
Transaction::Transaction(const Transaction* parent, ShardId shard_id)
|
||||
: multi_{make_unique<MultiData>()},
|
||||
txid_{parent->txid()},
|
||||
unique_shard_cnt_{1},
|
||||
unique_shard_id_{shard_id} {
|
||||
if (parent->multi_) {
|
||||
multi_->mode = parent->multi_->mode;
|
||||
} else {
|
||||
|
@ -235,7 +238,9 @@ void Transaction::InitByKeys(KeyIndex key_index) {
|
|||
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
|
||||
|
||||
// Stub transactions always operate only on single shard.
|
||||
if ((key_index.HasSingleKey() && !IsAtomicMulti()) || (multi_ && multi_->role == SQUASHED_STUB)) {
|
||||
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;
|
||||
|
||||
if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) {
|
||||
DCHECK_GT(key_index.step, 0u);
|
||||
// We don't have to split the arguments by shards, so we can copy them directly.
|
||||
StoreKeysInArgs(key_index, needs_reverse_mapping);
|
||||
|
@ -246,7 +251,10 @@ void Transaction::InitByKeys(KeyIndex key_index) {
|
|||
shard_data_.front().local_mask |= ACTIVE;
|
||||
|
||||
unique_shard_cnt_ = 1;
|
||||
unique_shard_id_ = Shard(args_.front(), shard_set->size()); // TODO: Squashed bug
|
||||
if (is_stub) // stub transactions don't migrate
|
||||
DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size()));
|
||||
else
|
||||
unique_shard_id_ = Shard(args_.front(), shard_set->size());
|
||||
|
||||
return;
|
||||
}
|
||||
|
@ -381,7 +389,9 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
|||
DCHECK(multi_);
|
||||
DCHECK(!cb_ptr_);
|
||||
|
||||
unique_shard_id_ = 0;
|
||||
if (multi_->role != SQUASHED_STUB) // stub transactions don't migrate between threads
|
||||
unique_shard_id_ = 0;
|
||||
|
||||
unique_shard_cnt_ = 0;
|
||||
args_.clear();
|
||||
cid_ = cid;
|
||||
|
|
|
@ -145,7 +145,8 @@ class Transaction {
|
|||
public:
|
||||
explicit Transaction(const CommandId* cid);
|
||||
|
||||
explicit Transaction(const Transaction* parent);
|
||||
// Initialize transaction for squashing placed on a specific shard with a given parent tx
|
||||
explicit Transaction(const Transaction* parent, ShardId shard_id);
|
||||
|
||||
// Initialize from command (args) on specific db.
|
||||
OpStatus InitByArgs(DbIndex index, CmdArgList args);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue