diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 23b13fa6c..b468fb2d4 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -211,16 +211,6 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) { } } -void BlockingController::RegisterAwaitForConverge(Transaction* t) { - TxId notify_id = t->notify_txid(); - - DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id; - - // t->notify_txid might improve in parallel. it does not matter since convergence - // will happen even with stale notify_id. - waiting_convergence_.emplace(notify_id, t); -} - // Internal function called from ProcessAwakened(). // Marks the queue as active and notifies the first transaction in the queue. void BlockingController::NotifyWatchQueue(WatchQueue* wq) { @@ -245,6 +235,8 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) { } while (!queue.empty()); } +#if 0 + void BlockingController::OnTxFinish() { VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]"; @@ -276,9 +268,17 @@ void BlockingController::OnTxFinish() { } while (!waiting_convergence_.empty()); } -void BlockingController::NotifyConvergence(Transaction* tx) { - LOG(FATAL) << "TBD"; + +void BlockingController::RegisterAwaitForConverge(Transaction* t) { + TxId notify_id = t->notify_txid(); + + DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id; + + // t->notify_txid might improve in parallel. it does not matter since convergence + // will happen even with stale notify_id. + waiting_convergence_.emplace(notify_id, t); } +#endif size_t BlockingController::NumWatched(DbIndex db_indx) const { auto it = watched_dbs_.find(db_indx); diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 27bb868e9..329ecaa52 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -44,9 +44,9 @@ class BlockingController { // Called from operations that create keys like lpush, rename etc. void AwakeWatched(DbIndex db_index, std::string_view db_key); - void OnTxFinish(); + // void OnTxFinish(); - void RegisterAwaitForConverge(Transaction* t); + // void RegisterAwaitForConverge(Transaction* t); size_t NumWatched(DbIndex db_indx) const; @@ -60,7 +60,7 @@ class BlockingController { /// or null if all transactions in the queue have expired.. void NotifyWatchQueue(WatchQueue* wq); - void NotifyConvergence(Transaction* tx); + // void NotifyConvergence(Transaction* tx); EngineShard* owner_; @@ -75,6 +75,6 @@ class BlockingController { // could awaken arbitrary number of keys. absl::flat_hash_set awakened_transactions_; - absl::btree_multimap waiting_convergence_; + // absl::btree_multimap waiting_convergence_; }; } // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 7db252ee7..8b7cecdda 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -143,8 +143,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; if (!to_keep) { continuation_trans_ = nullptr; - if (blocking_controller_) - blocking_controller_->OnTxFinish(); } } } @@ -196,9 +194,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { continuation_trans_ = head; break; } - - if (blocking_controller_) - blocking_controller_->OnTxFinish(); } // while(!txq_.Empty()) } else { // if (continuation_trans_ == nullptr && !has_awaked_trans) DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; @@ -242,8 +237,6 @@ void EngineShard::ShutdownMulti(Transaction* multi) { if (continuation_trans_ == multi) { continuation_trans_ = nullptr; } - if (blocking_controller_) - blocking_controller_->OnTxFinish(); } #if 0 diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ceb244049..c8b125636 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -543,56 +543,11 @@ void Transaction::UnlockMulti() { sharded_keys[sid].push_back(k_v); } - auto cb = [&] { - EngineShard* shard = EngineShard::tlocal(); - - if (multi_->multi_opts & CO::GLOBAL_TRANS) { - shard->shard_lock()->Release(IntentLock::EXCLUSIVE); - } - - ShardId sid = shard->shard_id(); - for (const auto& k_v : sharded_keys[sid]) { - auto release = [&](IntentLock::Mode mode) { - if (k_v.second.cnt[mode]) { - shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]); - } - }; - - release(IntentLock::SHARED); - release(IntentLock::EXCLUSIVE); - } - - auto& sd = shard_data_[SidToId(shard->shard_id())]; - - // It does not have to be that all shards in multi transaction execute this tx. - // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from - // there. - if (sd.pq_pos != TxQueue::kEnd) { - DVLOG(1) << "unlockmulti: TxPopFront " << DebugId(); - - TxQueue* txq = shard->txq(); - DCHECK(!txq->Empty()); - Transaction* trans = absl::get(txq->Front()); - DCHECK(trans == this); - txq->PopFront(); - sd.pq_pos = TxQueue::kEnd; - } - - shard->ShutdownMulti(this); - - // notify awakened transactions. - if (shard->blocking_controller()) - shard->blocking_controller()->RunStep(nullptr); - shard->PollExecution("unlockmulti", nullptr); - - this->DecreaseRunCnt(); - }; - uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); DCHECK_EQ(prev, 0u); for (ShardId i = 0; i < shard_data_.size(); ++i) { - ess_->Add(i, cb); + ess_->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); }); } WaitForShardCallbacks(); DCHECK_GE(use_count(), 1u); @@ -1044,6 +999,49 @@ void Transaction::ExpireShardCb(EngineShard* shard) { CHECK_GE(DecreaseRunCnt(), 1u); } +void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard) { + if (multi_->multi_opts & CO::GLOBAL_TRANS) { + shard->shard_lock()->Release(IntentLock::EXCLUSIVE); + } + + ShardId sid = shard->shard_id(); + for (const auto& k_v : sharded_keys[sid]) { + auto release = [&](IntentLock::Mode mode) { + if (k_v.second.cnt[mode]) { + shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second.cnt[mode]); + } + }; + + release(IntentLock::SHARED); + release(IntentLock::EXCLUSIVE); + } + + auto& sd = shard_data_[SidToId(shard->shard_id())]; + + // It does not have to be that all shards in multi transaction execute this tx. + // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from + // there. + if (sd.pq_pos != TxQueue::kEnd) { + DVLOG(1) << "unlockmulti: TxPopFront " << DebugId(); + + TxQueue* txq = shard->txq(); + DCHECK(!txq->Empty()); + Transaction* trans = absl::get(txq->Front()); + DCHECK(trans == this); + txq->PopFront(); + sd.pq_pos = TxQueue::kEnd; + } + + shard->ShutdownMulti(this); + + // notify awakened transactions. + if (shard->blocking_controller()) + shard->blocking_controller()->RunStep(nullptr); + shard->PollExecution("unlockmulti", nullptr); + + this->DecreaseRunCnt(); +} + #if 0 // HasResultConverged has detailed documentation on how convergence is determined. void Transaction::CheckForConvergence(EngineShard* shard) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 4ffb84177..0cc995674 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -187,6 +187,13 @@ class Transaction { KeyLockArgs GetLockArgs(ShardId sid) const; private: + + struct LockCnt { + unsigned cnt[2] = {0, 0}; + }; + + using KeyList = std::vector>; + unsigned SidToId(ShardId sid) const { return sid < shard_data_.size() ? sid : 0; } @@ -215,7 +222,7 @@ class Transaction { OpStatus AddToWatchedShardCb(EngineShard* shard); bool RemoveFromWatchedShardCb(EngineShard* shard); void ExpireShardCb(EngineShard* shard); - void CheckForConvergence(EngineShard* shard); + void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); @@ -251,9 +258,6 @@ class Transaction { }; enum { kPerShardSize = sizeof(PerShardData) }; - struct LockCnt { - unsigned cnt[2] = {0, 0}; - }; struct Multi { absl::flat_hash_map locks;