diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 68d943ac4..3682604cc 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -40,6 +40,11 @@ struct BlockingController::WatchQueue { state = SUSPENDED; notify_txid = UINT64_MAX; } + + auto Find(Transaction* tx) const { + return find_if(items.begin(), items.end(), + [tx](const WatchItem& wi) { return wi.get() == tx; }); + } }; // Watch state per db. @@ -50,30 +55,64 @@ struct BlockingController::DbWatchTable { // they reference key objects in queue_map. absl::flat_hash_set awakened_keys; - void RemoveEntry(WatchQueueMap::iterator it); - // returns true if awake event was added. // Requires that the key queue be in the required state. - bool AddAwakeEvent(WatchQueue::State cur_state, string_view key); + bool AddAwakeEvent(string_view key); + + // Returns true if awakened tx was removed from the queue. + bool UnwatchTx(string_view key, Transaction* tx); }; +bool BlockingController::DbWatchTable::UnwatchTx(string_view key, Transaction* tx) { + auto wq_it = queue_map.find(key); + + // With multiple same keys we may have misses because the first iteration + // on the same key could remove the queue. + if (wq_it == queue_map.end()) + return false; + + WatchQueue* wq = wq_it->second.get(); + DCHECK(!wq->items.empty()); + + bool res = false; + if (wq->state == WatchQueue::ACTIVE && wq->items.front().get() == tx) { + wq->items.pop_front(); + + // We suspend the queue and add keys to re-verification. + // If they are still present, this queue will be reactivated below. + wq->state = WatchQueue::SUSPENDED; + + if (!wq->items.empty()) + awakened_keys.insert(wq_it->first); // send for further validation. + res = true; + } else { + // tx can be is_awakened == true because of some other key and this queue would be + // in suspended and we still need to clean it up. + // the suspended item does not have to be the first one in the queue. + // This shard has not been awakened and in case this transaction in the queue + // we must clean it up. + + if (auto it = wq->Find(tx); it != wq->items.end()) { + wq->items.erase(it); + } + } + + if (wq->items.empty()) { + queue_map.erase(wq_it); + } + return res; +} + BlockingController::BlockingController(EngineShard* owner) : owner_(owner) { } BlockingController::~BlockingController() { } -void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) { - DVLOG(2) << "Erasing watchqueue key " << it->first; - - awakened_keys.erase(it->first); - queue_map.erase(it); -} - -bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state, string_view key) { +bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) { auto it = queue_map.find(key); - if (it == queue_map.end() || it->second->state != cur_state) + if (it == queue_map.end() || it->second->state != WatchQueue::SUSPENDED) return false; /// nobody watches this key or state does not match. string_view dbkey = it->first; @@ -81,31 +120,82 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state return awakened_keys.insert(dbkey).second; } -// Processes potentially awakened keys and verifies that these are indeed -// awakened to eliminate false positives. -// In addition, optionally removes completed_t from the front of the watch queues. -void BlockingController::RunStep(Transaction* completed_t) { - VLOG(1) << "RunStep [" << owner_->shard_id() << "] " << completed_t; +// Optionally removes tx from the front of the watch queues. +void BlockingController::FinalizeWatched(KeyLockArgs lock_args, Transaction* tx) { + DCHECK(tx); - if (completed_t) { - awakened_transactions_.erase(completed_t); + ShardId sid = owner_->shard_id(); - auto dbit = watched_dbs_.find(completed_t->GetDbIndex()); - if (dbit != watched_dbs_.end()) { - DbWatchTable& wt = *dbit->second; + uint16_t local_mask = tx->GetLocalMask(sid); + VLOG(1) << "FinalizeBlocking [" << sid << "]" << tx->DebugId() << " " << local_mask; - ShardId sid = owner_->shard_id(); - KeyLockArgs lock_args = completed_t->GetLockArgs(sid); + bool is_awakened = local_mask & Transaction::AWAKED_Q; - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view key = lock_args.args[i]; - if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) { - awakened_indices_.emplace(completed_t->GetDbIndex()); - } - } + if (is_awakened) + awakened_transactions_.erase(tx); + + auto dbit = watched_dbs_.find(tx->GetDbIndex()); + + // Can happen if it was the only transaction in the queue and it was notified and removed. + if (dbit == watched_dbs_.end()) + return; + + DbWatchTable& wt = *dbit->second; + + // Add keys of processed transaction so we could awake the next one in the queue + // in case those keys still exist. + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view key = lock_args.args[i]; + bool removed_awakened = wt.UnwatchTx(key, tx); + if (removed_awakened) { + CHECK(is_awakened) << tx->DebugId() << " " << key << " " << local_mask; } } + if (wt.queue_map.empty()) { + watched_dbs_.erase(dbit); + } + awakened_indices_.emplace(tx->GetDbIndex()); +} + +// Similar function but with ArgSlice. TODO: to fix the duplication. +void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) { + DCHECK(tx); + + ShardId sid = owner_->shard_id(); + + VLOG(1) << "FinalizeBlocking [" << sid << "]" << tx->DebugId(); + + uint16_t local_mask = tx->GetLocalMask(sid); + bool is_awakened = local_mask & Transaction::AWAKED_Q; + + if (is_awakened) + awakened_transactions_.erase(tx); + + auto dbit = watched_dbs_.find(tx->GetDbIndex()); + + // Can happen if it was the only transaction in the queue and it was notified and removed. + if (dbit == watched_dbs_.end()) + return; + + DbWatchTable& wt = *dbit->second; + + // Add keys of processed transaction so we could awake the next one in the queue + // in case those keys still exist. + for (string_view key : args) { + bool removed_awakened = wt.UnwatchTx(key, tx); + if (removed_awakened) { + CHECK(is_awakened) << tx->DebugId() << " " << key << " " << local_mask; + } + } + + if (wt.queue_map.empty()) { + watched_dbs_.erase(dbit); + } + awakened_indices_.emplace(tx->GetDbIndex()); +} + +void BlockingController::NotifyPending() { DbContext context; context.time_now_ms = GetCurrentTimeMs(); @@ -163,57 +253,19 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { } } -// Runs in O(N) complexity in the worst case. -void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) { - VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId(); - - auto dbit = watched_dbs_.find(trans->GetDbIndex()); - if (dbit == watched_dbs_.end()) - return; - - DbWatchTable& wt = *dbit->second; - for (auto key : keys) { - auto watch_it = wt.queue_map.find(key); - - // that can happen in case of duplicate keys or when we do not watch on all the argument keys - // like with BLPOPRPUSH. - if (watch_it == wt.queue_map.end()) - continue; - - WatchQueue& wq = *watch_it->second; - for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) { - if (items_it->trans == trans) { - wq.items.erase(items_it); - break; - } - } - // again, we may not find trans if we searched for the same key several times. - - if (wq.items.empty()) { - wt.RemoveEntry(watch_it); - } - } - - if (wt.queue_map.empty()) { - watched_dbs_.erase(dbit); - } -} - // Called from commands like lpush. void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) { auto it = watched_dbs_.find(db_index); if (it == watched_dbs_.end()) return; - VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key; - DbWatchTable& wt = *it->second; DCHECK(!wt.queue_map.empty()); - if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) { + if (wt.AddAwakeEvent(db_key)) { + VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key; + awakened_indices_.insert(db_index); - } else { - DVLOG(1) << "Skipped awakening " << db_index; } } @@ -224,6 +276,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key; WatchQueue* wq = w_it->second.get(); + DCHECK_EQ(wq->state, WatchQueue::SUSPENDED); wq->state = WatchQueue::ACTIVE; auto& queue = wq->items; @@ -232,15 +285,17 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w do { WatchItem& wi = queue.front(); Transaction* head = wi.get(); - DVLOG(2) << "Pop " << head << " from key " << key; - - queue.pop_front(); + DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key; if (head->NotifySuspended(owner_->committed_txid(), sid)) { + // We deliberately keep the notified transaction in the queue to know which queue + // must handled when this transaction finished. wq->notify_txid = owner_->committed_txid(); awakened_transactions_.insert(head); break; } + + queue.pop_front(); } while (!queue.empty()); if (wq->items.empty()) { @@ -248,51 +303,6 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w } } -#if 0 - -void BlockingController::OnTxFinish() { - VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]"; - - if (waiting_convergence_.empty()) - return; - - TxQueue* txq = owner_->txq(); - if (txq->Empty()) { - for (const auto& k_v : waiting_convergence_) { - NotifyConvergence(k_v.second); - } - waiting_convergence_.clear(); - return; - } - - TxId txq_score = txq->HeadScore(); - do { - auto tx_waiting = waiting_convergence_.begin(); - Transaction* trans = tx_waiting->second; - - // Instead of taking the map key, we use upto date notify_txid - // which could meanwhile improve (decrease). Not important though. - TxId notifyid = trans->notify_txid(); - if (owner_->committed_txid() < notifyid && txq_score <= notifyid) - break; // we can not converge for notifyid so we can not converge for larger ts as well. - - waiting_convergence_.erase(tx_waiting); - NotifyConvergence(trans); - } while (!waiting_convergence_.empty()); -} - - -void BlockingController::RegisterAwaitForConverge(Transaction* t) { - TxId notify_id = t->notify_txid(); - - DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id; - - // t->notify_txid might improve in parallel. it does not matter since convergence - // will happen even with stale notify_id. - waiting_convergence_.emplace(notify_id, t); -} -#endif - size_t BlockingController::NumWatched(DbIndex db_indx) const { auto it = watched_dbs_.find(db_indx); if (it == watched_dbs_.end()) diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index a5f2ae361..fb83fe934 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -28,22 +28,18 @@ class BlockingController { return awakened_transactions_; } - // Iterates over awakened key candidates in each db and moves verified ones into - // global verified_awakened_ array. - // Returns true if there are active awakened keys, false otherwise. - // It has 2 responsibilities. - // 1: to go over potential wakened keys, verify them and activate watch queues. - // 2: if t is awaked and finished running - to remove it from the head - // of the queue and notify the next one. - // If t is null then second part is omitted. - void RunStep(Transaction* t); + void FinalizeWatched(KeyLockArgs lock_args, Transaction* tx); + + // A mirror reflection but with ArgSlice. Yeah, I know.... + void FinalizeWatched(ArgSlice args, Transaction* tx); + // go over potential wakened keys, verify them and activate watch queues. + void NotifyPending(); // Blocking API // TODO: consider moving all watched functions to // EngineShard with separate per db map. //! AddWatched adds a transaction to the blocking queue. void AddWatched(ArgSlice watch_keys, Transaction* me); - void RemoveWatched(ArgSlice watch_keys, Transaction* me); // Called from operations that create keys like lpush, rename etc. void AwakeWatched(DbIndex db_index, std::string_view db_key); @@ -52,10 +48,6 @@ class BlockingController { size_t NumWatched(DbIndex db_indx) const; std::vector GetWatchedKeys(DbIndex db_indx) const; - void RemoveAwaked(Transaction* trans) { - awakened_transactions_.erase(trans); - } - private: struct WatchQueue; struct DbWatchTable; diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 2b9349a92..382899a04 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -75,7 +75,7 @@ TEST_F(BlockingControllerTest, Basic) { bc.AddWatched(keys, trans_.get()); EXPECT_EQ(1, bc.NumWatched(0)); - bc.RemoveWatched(keys, trans_.get()); + bc.FinalizeWatched(keys, trans_.get()); EXPECT_EQ(0, bc.NumWatched(0)); }); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1aadf9d0f..a2479420a 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -297,8 +297,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { bool keep = trans->RunInShard(this); if (keep) { return; - } else { - blocking_controller_->RemoveAwaked(trans); } } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 3600ab951..aad6b30fa 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -135,7 +135,8 @@ struct ShardFFResult { ShardId sid = kInvalidSid; }; -OpResult FindFirst(Transaction* trans) { +// Used by bpopper. +OpResult FindFirst(bool awaked_only, Transaction* trans) { VLOG(2) << "FindFirst::Find " << trans->DebugId(); // Holds Find results: (iterator to a found key, and its index in the passed arguments). @@ -144,18 +145,33 @@ OpResult FindFirst(Transaction* trans) { std::vector> find_res(shard_set->size()); fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); - auto cb = [&find_res](auto* t, EngineShard* shard) { + // We must capture notify_txid before we spawn callbacks. + // Otherwise, consider the following scenario: + // 0. The key is added in shard 0, with notify_txid = 100 + // 1. The cb runs first on shard1 and does not find anything. + // 2. A tx 99 runs on shard 1, adds a key, updates notify_txid to 99. + // 3. the cb on shard 0 runs and ignores the key due to lower notify_txid. + uint64_t notify_txid = trans->GetNotifyTxid(); + + auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->GetShardArgs(shard->shard_id()); - - OpResult> ff_res = - shard->db_slice().FindFirst(t->GetDbContext(), args); - - if (ff_res) { - FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); - find_res[shard->shard_id()] = move(ff_result); - } else { - find_res[shard->shard_id()] = ff_res.status(); + // if requested to consider awaked shards only, we check the AWAKED_Q flag. + if (awaked_only && (t->GetLocalMask(shard->shard_id()) & Transaction::AWAKED_Q) == 0) { + return OpStatus::OK; } + + if (shard->committed_txid() <= notify_txid) { + OpResult> ff_res = + shard->db_slice().FindFirst(t->GetDbContext(), args); + + if (ff_res) { + FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); + find_res[shard->shard_id()] = move(ff_result); + } else { + find_res[shard->shard_id()] = ff_res.status(); + } + } + return OpStatus::OK; }; @@ -249,7 +265,7 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) { auto* stats = ServerState::tl_connection_stats(); - OpResult result = FindFirst(trans); + OpResult result = FindFirst(false, trans); if (result.status() == OpStatus::KEY_NOTFOUND) { if (is_multi) { @@ -261,7 +277,7 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) { } // Block - auto wcb = [&](Transaction* t, EngineShard* shard) { + auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; @@ -274,7 +290,7 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) { return OpStatus::TIMED_OUT; // Now we have something for sure. - result = FindFirst(trans); // retry - must find something. + result = FindFirst(true, trans); // retry - must find something. } if (!result) { @@ -557,6 +573,14 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view ArgSlice span{&val, 1}; OpPush(op_args, key, dest_dir, false, span, true); + + // blocking_controller does not have to be set with non-blocking transactions. + if (shard->blocking_controller()) { + // hack, again. since we hacked which queue we are waiting on (see RunPair) + // we must clean-up src key here manually. See RunPair why we do this. + // in short- we suspended on "src" on both shards. + shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t); + } } else { DVLOG(1) << "Popping value from list: " << key; OpPop(op_args, key, src_dir, 1, false, true); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 3d8d3e534..b757ee50e 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -730,6 +730,35 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { ASSERT_EQ(0, NumWatched()); } +TEST_F(ListFamilyTest, BRPopContended) { + RespExpr resp; + atomic_bool done{false}; + constexpr auto kNumFibers = 4; + + // Run the fiber at creation. + fibers_ext::Fiber fb[kNumFibers]; + for (int i = 0; i < kNumFibers; i++) { + fb[i] = pp_->at(1)->LaunchFiber(fibers::launch::dispatch, [&] { + string id = StrCat("id", i); + while (!done) { + Run(id, {"brpop", "k0", "k1", "k2", "k3", "k4", "0.1"}); + }; + }); + } + + for (int i = 0; i < 500; i++) { + string key = absl::StrCat("k", i % 3); + Run({"lpush", key, "foo"}); + } + + done = true; + for (int i = 0; i < kNumFibers; i++) { + fb[i].Join(); + } + ASSERT_EQ(0, NumWatched()); + ASSERT_FALSE(HasAwakened()); +} + TEST_F(ListFamilyTest, BRPopLPushTwoShards) { RespExpr resp; EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL)); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 766e92860..d63981f18 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -505,7 +505,10 @@ bool Transaction::RunInShard(EngineShard* shard) { // of the queue and notify the next one. // RunStep is also called for global transactions because of commands like MOVE. if (shard->blocking_controller()) { - shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr); + if (awaked_prerun || was_suspended) { + shard->blocking_controller()->FinalizeWatched(largs, this); + } + shard->blocking_controller()->NotifyPending(); } } @@ -1114,9 +1117,11 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard auto& sd = shard_data_[sd_idx]; sd.local_mask |= EXPIRED_Q; sd.local_mask &= ~KEYLOCK_ACQUIRED; - } + shard->blocking_controller()->FinalizeWatched(wkeys, this); + DCHECK(!shard->blocking_controller()->awakened_transactions().contains(this)); - shard->blocking_controller()->RemoveWatched(wkeys, this); + shard->blocking_controller()->NotifyPending(); + } // Need to see why I decided to call this. // My guess - probably to trigger the run of stalled transactions in case @@ -1168,9 +1173,10 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E shard->ShutdownMulti(this); - // notify awakened transactions. + // notify awakened transactions, not sure we need it here because it's done after + // each operation if (shard->blocking_controller()) - shard->blocking_controller()->RunStep(nullptr); + shard->blocking_controller()->NotifyPending(); shard->PollExecution("unlockmulti", nullptr); this->DecreaseRunCnt();