From 623c5a85e3e0071c3d50e81bf163c3e3ad4bb8b0 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 19 Mar 2023 12:18:02 +0300 Subject: [PATCH] fix(server): Fix transaction index + shard_data multi re-use (#958) --- src/server/transaction.cc | 29 ++++++++++++++++++----------- src/server/transaction.h | 11 ++++++++--- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1fc10bec9..9681ec9e9 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -236,15 +236,15 @@ void Transaction::InitByKeys(KeyIndex key_index) { DCHECK_LT(key_index.start, args.size()); bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; - bool single_key = key_index.HasSingleKey() && !IsAtomicMulti(); + bool single_key = key_index.HasSingleKey(); - if (single_key) { + if (single_key && !IsAtomicMulti()) { DCHECK_GT(key_index.step, 0u); // We don't have to split the arguments by shards, so we can copy them directly. StoreKeysInArgs(key_index, needs_reverse_mapping); - shard_data_.resize(1); + shard_data_.resize(IsMulti() ? shard_set->size() : 1); shard_data_.front().local_mask |= ACTIVE; unique_shard_cnt_ = 1; @@ -274,7 +274,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { // Compress shard data, if we occupy only one shard. if (unique_shard_cnt_ == 1) { PerShardData* sd; - if (IsAtomicMulti()) { + if (IsMulti()) { sd = &shard_data_[unique_shard_id_]; } else { shard_data_.resize(1); @@ -380,7 +380,11 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { cb_ = nullptr; if (multi_->mode == NON_ATOMIC) { - shard_data_.resize(0); + for (auto& sd : shard_data_) { + sd.arg_count = sd.arg_start = sd.local_mask = 0; + sd.pq_pos = TxQueue::kEnd; + DCHECK_EQ(sd.is_armed.load(memory_order_relaxed), false); + } txid_ = 0; coordinator_state_ = 0; } @@ -647,10 +651,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // and directly dispatch the task to its destination shard. bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); if (schedule_fast) { - DCHECK_EQ(1u, shard_data_.size()); + DCHECK_NE(unique_shard_id_, kInvalidSid); + DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. - shard_data_[0].is_armed.store(true, memory_order_relaxed); + shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed); run_count_.store(1, memory_order_release); time_now_ms_ = GetCurrentTimeMs(); @@ -842,12 +847,13 @@ void Transaction::ExecuteAsync() { void Transaction::RunQuickie(EngineShard* shard) { DCHECK(!IsAtomicMulti()); - DCHECK_EQ(1u, shard_data_.size()); + DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); + DCHECK_NE(unique_shard_id_, kInvalidSid); DCHECK_EQ(0u, txid_); shard->IncQuickRun(); - auto& sd = shard_data_[0]; + auto& sd = shard_data_[SidToId(unique_shard_id_)]; DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; @@ -910,12 +916,13 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK(!IsAtomicMulti()); DCHECK_EQ(0u, txid_); - DCHECK_EQ(1u, shard_data_.size()); + DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); + DCHECK_NE(unique_shard_id_, kInvalidSid); auto mode = Mode(); auto lock_args = GetLockArgs(shard->shard_id()); - auto& sd = shard_data_.front(); + auto& sd = shard_data_[SidToId(unique_shard_id_)]; DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); // Fast path - for uncontended keys, just run the callback. diff --git a/src/server/transaction.h b/src/server/transaction.h index e7329ff16..5ea143290 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -287,7 +287,7 @@ class Transaction { // Accessed within shard thread. // Bitmask of LocalState enums. - uint16_t local_mask{0}; + uint16_t local_mask = 0; // Needed to rollback inconsistent schedulings or remove OOO transactions from // tx queue. @@ -452,6 +452,11 @@ class Transaction { // multiple threads access this array to synchronize between themselves using // PerShardData.state, it can be tricky. The complication comes from multi_ transactions where // scheduled transaction is accessed between operations as well. + + // Stores per-shard data. + // For non-multi transactions, it can be of size one in case only one shard is active + // (unique_shard_cnt_ = 1). + // Never access directly with index, always use SidToId. absl::InlinedVector shard_data_; // length = shard_count // Stores arguments of the transaction (i.e. keys + values) partitioned by shards. @@ -479,8 +484,8 @@ class Transaction { std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread. - uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ - ShardId unique_shard_id_{kInvalidSid}; + uint32_t unique_shard_cnt_{0}; // Number of unique shards active + ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions. util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks