diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ff83b5786..f07f3dd68 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -760,12 +760,9 @@ void Transaction::ScheduleInternal() { // We do not need to wait for this callback to finish - just make sure it will eventually run. // See https://github.com/dragonflydb/dragonfly/issues/150 for more info. if (should_poll_execution.load(memory_order_relaxed)) { - for (uint32_t i = 0; i < shard_set->size(); ++i) { - if (!is_active(i)) - continue; - + IterateActiveShards([](const auto& sd, auto i) { shard_set->Add(i, [] { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); }); - } + }); } } } @@ -871,6 +868,8 @@ void Transaction::UnlockMulti() { ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); + + DCHECK_EQ(shard_data_.size(), shard_set->size()); for (ShardId i = 0; i < shard_data_.size(); ++i) { shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() { this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt); @@ -931,12 +930,18 @@ void Transaction::ExecuteAsync() { DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); + DCHECK_LE(shard_data_.size(), 1024u); - // Set armed flags on all active shards - IterateActiveShards([](auto& sd, auto i) { sd.local_mask |= ARMED; }); + // Set armed flags on all active shards. Copy indices for dispatching poll tasks, + // because local_mask can be written concurrently after starting a new phase. + std::bitset<1024> poll_flags(0); + IterateActiveShards([&poll_flags](auto& sd, auto i) { + sd.local_mask |= ARMED; + poll_flags.set(i, true); + }); // Start new phase: release semantics. From here we can be discovered by IsArmedInShard(), - // and thus picked by a foreign thread's PollExecution(). Careful with writes until phase end! + // and thus picked by a foreign thread's PollExecution(). Careful with data access! run_barrier_.Start(unique_shard_cnt_); auto* ss = ServerState::tlocal(); @@ -954,7 +959,10 @@ void Transaction::ExecuteAsync() { DVLOG(3) << "ptr_release " << DebugId(); intrusive_ptr_release(this); // against use_count_.fetch_add above. }; - IterateActiveShards([&poll_cb](PerShardData& sd, auto i) { shard_set->Add(i, poll_cb); }); + IterateShards([&poll_cb, &poll_flags](PerShardData& sd, auto i) { + if (poll_flags.test(i)) + shard_set->Add(i, poll_cb); + }); } void Transaction::Conclude() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 5a709eb0f..4e59f990b 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -541,19 +541,25 @@ class Transaction { return sid < shard_data_.size() ? sid : 0; } - // Iterate over shards and run function accepting (PerShardData&, ShardId) on all active ones. - template void IterateActiveShards(F&& f) { + // Iterate over all available shards, run functor accepting (PerShardData&, ShardId) + template void IterateShards(F&& f) { if (unique_shard_cnt_ == 1) { f(shard_data_[SidToId(unique_shard_id_)], unique_shard_id_); } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { - if (auto& sd = shard_data_[i]; sd.local_mask & ACTIVE) { - f(sd, i); - } + f(shard_data_[i], i); } } } + // Iterate over ACTIVE shards, run functor accepting (PerShardData&, ShardId) + template void IterateActiveShards(F&& f) { + IterateShards([&f](auto& sd, auto i) { + if (sd.local_mask & ACTIVE) + f(sd, i); + }); + } + private: // Main synchronization point for dispatching hop callbacks and waiting for them to finish. // After scheduling, sequential hops are executed as follows: