From 72e90bb7295f988e7ac32c4d719acc9c1a18d7b5 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 27 Apr 2022 10:36:25 +0300 Subject: [PATCH] More work on blocking commands like BLPOP. Fixes #1 and fixes #24. --- helio | 2 +- src/server/CMakeLists.txt | 5 +- src/server/blocking_controller.cc | 291 ++++++++++++++++++++++ src/server/blocking_controller.h | 80 +++++++ src/server/blocking_controller_test.cc | 99 ++++++++ src/server/engine_shard_set.cc | 318 ++++--------------------- src/server/engine_shard_set.h | 76 ++---- src/server/generic_family.cc | 12 +- src/server/list_family.cc | 28 +-- src/server/list_family_test.cc | 139 +++++++---- src/server/transaction.cc | 120 ++++++---- src/server/transaction.h | 6 +- 12 files changed, 739 insertions(+), 437 deletions(-) create mode 100644 src/server/blocking_controller.cc create mode 100644 src/server/blocking_controller.h create mode 100644 src/server/blocking_controller_test.cc diff --git a/helio b/helio index 9b96ae52b..a024151f2 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 9b96ae52be8ccf35c959c366757ae30174d84a0e +Subproject commit a024151f24180d493b51909b6853cfd16ae6367d diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 85c63dabf..34ee337d2 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -1,7 +1,8 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib channel_slice.cc command_registry.cc common.cc config_flags.cc +add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc + common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc @@ -22,7 +23,7 @@ cxx_test(set_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY) - +cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_dependencies(check_dfly dragonfly_test list_family_test diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc new file mode 100644 index 000000000..23b13fa6c --- /dev/null +++ b/src/server/blocking_controller.cc @@ -0,0 +1,291 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/blocking_controller.h" + +#include + +extern "C" { +#include "redis/object.h" +} + +#include "base/logging.h" +#include "server/engine_shard_set.h" +#include "server/transaction.h" + +namespace dfly { + +using namespace std; + +struct WatchItem { + Transaction* trans; + + Transaction* get() { + return trans; + } + + WatchItem(Transaction* t) : trans(t) { + } +}; + +struct BlockingController::WatchQueue { + deque items; + TxId notify_txid = UINT64_MAX; + + // Updated by both coordinator and shard threads but at different times. + enum State { SUSPENDED, ACTIVE } state = SUSPENDED; + + void Suspend() { + state = SUSPENDED; + notify_txid = UINT64_MAX; + } +}; + +// Watch state per db. +struct BlockingController::DbWatchTable { + WatchQueueMap queue_map; + + // awakened keys point to blocked keys that can potentially be unblocked. + // they reference key objects in queue_map. + absl::flat_hash_set awakened_keys; + + void RemoveEntry(WatchQueueMap::iterator it); + + // returns true if awake event was added. + // Requires that the key queue be in the required state. + bool AddAwakeEvent(WatchQueue::State cur_state, string_view key); +}; + +BlockingController::BlockingController(EngineShard* owner) : owner_(owner) { +} + +BlockingController::~BlockingController() { +} + +void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) { + DVLOG(1) << "Erasing watchqueue key " << it->first; + + awakened_keys.erase(it->first); + queue_map.erase(it); +} + +bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state, string_view key) { + auto it = queue_map.find(key); + + if (it == queue_map.end() || it->second->state != cur_state) + return false; /// nobody watches this key or state does not match. + + string_view dbkey = it->first; + + return awakened_keys.insert(dbkey).second; +} + +// Processes potentially awakened keys and verifies that these are indeed +// awakened to eliminate false positives. +// In addition, optionally removes completed_t from the front of the watch queues. +void BlockingController::RunStep(Transaction* completed_t) { + VLOG(1) << "RunStep [" << owner_->shard_id() << "] " << completed_t; + + if (completed_t) { + awakened_transactions_.erase(completed_t); + + auto dbit = watched_dbs_.find(completed_t->db_index()); + if (dbit != watched_dbs_.end()) { + DbWatchTable& wt = *dbit->second; + + ShardId sid = owner_->shard_id(); + KeyLockArgs lock_args = completed_t->GetLockArgs(sid); + + for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { + string_view key = lock_args.args[i]; + if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) { + awakened_indices_.emplace(completed_t->db_index()); + } + } + } + } + + for (DbIndex index : awakened_indices_) { + auto dbit = watched_dbs_.find(index); + if (dbit == watched_dbs_.end()) + continue; + + DbWatchTable& wt = *dbit->second; + for (auto key : wt.awakened_keys) { + string_view sv_key = static_cast(key); + + // Double verify we still got the item. + auto [it, exp_it] = owner_->db_slice().FindExt(index, sv_key); + if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. + continue; + + auto w_it = wt.queue_map.find(sv_key); + CHECK(w_it != wt.queue_map.end()); + DVLOG(1) << "NotifyWatchQueue " << key; + WatchQueue* wq = w_it->second.get(); + NotifyWatchQueue(wq); + if (wq->items.empty()) { + wt.queue_map.erase(w_it); + } + } + wt.awakened_keys.clear(); + + if (wt.queue_map.empty()) { + watched_dbs_.erase(dbit); + } + } + awakened_indices_.clear(); +} + +void BlockingController::AddWatched(Transaction* trans) { + VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId(); + + auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr); + if (added) { + dbit->second.reset(new DbWatchTable); + } + + DbWatchTable& wt = *dbit->second; + + auto args = trans->ShardArgsInShard(owner_->shard_id()); + for (auto key : args) { + auto [res, inserted] = wt.queue_map.emplace(key, nullptr); + if (inserted) { + res->second.reset(new WatchQueue); + } + + res->second->items.emplace_back(trans); + } +} + +// Runs in O(N) complexity. +void BlockingController::RemoveWatched(Transaction* trans) { + VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId(); + + auto dbit = watched_dbs_.find(trans->db_index()); + CHECK(dbit != watched_dbs_.end()); + + DbWatchTable& wt = *dbit->second; + auto args = trans->ShardArgsInShard(owner_->shard_id()); + for (auto key : args) { + auto watch_it = wt.queue_map.find(key); + CHECK(watch_it != wt.queue_map.end()); + + WatchQueue& wq = *watch_it->second; + bool erased = false; + for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) { + if (items_it->trans == trans) { + wq.items.erase(items_it); + erased = true; + break; + } + } + CHECK(erased); + + if (wq.items.empty()) { + wt.RemoveEntry(watch_it); + } + } + + if (wt.queue_map.empty()) { + watched_dbs_.erase(dbit); + } + + awakened_transactions_.erase(trans); +} + +// Called from commands like lpush. +void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) { + auto it = watched_dbs_.find(db_index); + if (it == watched_dbs_.end()) + return; + + VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key; + + DbWatchTable& wt = *it->second; + DCHECK(!wt.queue_map.empty()); + + if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) { + awakened_indices_.insert(db_index); + } +} + +void BlockingController::RegisterAwaitForConverge(Transaction* t) { + TxId notify_id = t->notify_txid(); + + DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id; + + // t->notify_txid might improve in parallel. it does not matter since convergence + // will happen even with stale notify_id. + waiting_convergence_.emplace(notify_id, t); +} + +// Internal function called from ProcessAwakened(). +// Marks the queue as active and notifies the first transaction in the queue. +void BlockingController::NotifyWatchQueue(WatchQueue* wq) { + VLOG(1) << "Notify WQ: [" << owner_->shard_id() << "]"; + + wq->state = WatchQueue::ACTIVE; + + auto& queue = wq->items; + ShardId sid = owner_->shard_id(); + + do { + WatchItem& wi = queue.front(); + Transaction* head = wi.get(); + + queue.pop_front(); + + if (head->NotifySuspended(owner_->committed_txid(), sid)) { + wq->notify_txid = owner_->committed_txid(); + awakened_transactions_.insert(head); + break; + } + } while (!queue.empty()); +} + +void BlockingController::OnTxFinish() { + VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]"; + + if (waiting_convergence_.empty()) + return; + + TxQueue* txq = owner_->txq(); + if (txq->Empty()) { + for (const auto& k_v : waiting_convergence_) { + NotifyConvergence(k_v.second); + } + waiting_convergence_.clear(); + return; + } + + TxId txq_score = txq->HeadScore(); + do { + auto tx_waiting = waiting_convergence_.begin(); + Transaction* trans = tx_waiting->second; + + // Instead of taking the map key, we use upto date notify_txid + // which could meanwhile improve (decrease). Not important though. + TxId notifyid = trans->notify_txid(); + if (owner_->committed_txid() < notifyid && txq_score <= notifyid) + break; // we can not converge for notifyid so we can not converge for larger ts as well. + + waiting_convergence_.erase(tx_waiting); + NotifyConvergence(trans); + } while (!waiting_convergence_.empty()); +} + +void BlockingController::NotifyConvergence(Transaction* tx) { + LOG(FATAL) << "TBD"; +} + +size_t BlockingController::NumWatched(DbIndex db_indx) const { + auto it = watched_dbs_.find(db_indx); + if (it == watched_dbs_.end()) + return 0; + + return it->second->queue_map.size(); +} + +} // namespace dfly diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h new file mode 100644 index 000000000..27bb868e9 --- /dev/null +++ b/src/server/blocking_controller.h @@ -0,0 +1,80 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include + +#include "base/string_view_sso.h" +#include "server/common.h" + +namespace dfly { + +class Transaction; + +class BlockingController { + public: + explicit BlockingController(EngineShard* owner); + ~BlockingController(); + + bool HasAwakedTransaction() const { + return !awakened_transactions_.empty(); + } + + // Iterates over awakened key candidates in each db and moves verified ones into + // global verified_awakened_ array. + // Returns true if there are active awakened keys, false otherwise. + // It has 2 responsibilities. + // 1: to go over potential wakened keys, verify them and activate watch queues. + // 2: if t is awaked and finished running - to remove it from the head + // of the queue and notify the next one. + // If t is null then second part is omitted. + void RunStep(Transaction* t); + + // Blocking API + // TODO: consider moving all watched functions to + // EngineShard with separate per db map. + //! AddWatched adds a transaction to the blocking queue. + void AddWatched(Transaction* me); + void RemoveWatched(Transaction* me); + + // Called from operations that create keys like lpush, rename etc. + void AwakeWatched(DbIndex db_index, std::string_view db_key); + + void OnTxFinish(); + + void RegisterAwaitForConverge(Transaction* t); + + size_t NumWatched(DbIndex db_indx) const; + + private: + struct WatchQueue; + struct DbWatchTable; + + using WatchQueueMap = absl::flat_hash_map>; + + /// Returns the notified transaction, + /// or null if all transactions in the queue have expired.. + void NotifyWatchQueue(WatchQueue* wq); + + void NotifyConvergence(Transaction* tx); + + EngineShard* owner_; + + absl::flat_hash_map> watched_dbs_; + + // serves as a temporary queue that aggregates all the possible awakened dbs. + // flushed by RunStep(). + absl::flat_hash_set awakened_indices_; + + // tracks currently notified and awaked transactions. + // There can be multiple transactions like this because a transaction + // could awaken arbitrary number of keys. + absl::flat_hash_set awakened_transactions_; + + absl::btree_multimap waiting_convergence_; +}; +} // namespace dfly diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc new file mode 100644 index 000000000..3f56ec61e --- /dev/null +++ b/src/server/blocking_controller_test.cc @@ -0,0 +1,99 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/blocking_controller.h" + +#include + +#include "base/logging.h" +#include "server/command_registry.h" +#include "server/engine_shard_set.h" +#include "server/transaction.h" +#include "util/uring/uring_pool.h" + +namespace dfly { + +using namespace util; +using namespace std; +using namespace std::chrono; +using namespace testing; + +class BlockingControllerTest : public Test { + protected: + BlockingControllerTest() : cid_("blpop", 0, -3, 1, -2, 1) { + } + void SetUp() override; + void TearDown() override; + + std::unique_ptr pp_; + std::unique_ptr ess_; + boost::intrusive_ptr trans_; + CommandId cid_; + StringVec str_vec_; + CmdArgVec arg_vec_; +}; + +constexpr size_t kNumThreads = 3; + +void BlockingControllerTest::SetUp() { + pp_.reset(new uring::UringPool(16, kNumThreads)); + pp_->Run(); + ess_.reset(new EngineShardSet(pp_.get())); + ess_->Init(kNumThreads); + + auto cb = [&](uint32_t index, ProactorBase* pb) { ess_->InitThreadLocal(pb, false); }; + + pp_->AwaitFiberOnAll(cb); + + trans_.reset(new Transaction{&cid_, ess_.get()}); + + str_vec_.assign({"blpop", "x", "z", "0"}); + for (auto& s : str_vec_) { + arg_vec_.emplace_back(s); + } + + trans_->InitByArgs(0, {arg_vec_.data(), arg_vec_.size()}); + CHECK_EQ(0u, Shard("x", ess_->size())); + CHECK_EQ(2u, Shard("z", ess_->size())); + + const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info(); + LOG(INFO) << "Starting " << test_info->name(); +} + +void BlockingControllerTest::TearDown() { + ess_->RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); }); + ess_.reset(); + pp_->Stop(); + pp_.reset(); +} + +TEST_F(BlockingControllerTest, Basic) { + ess_->Await(0, [&] { + BlockingController bc(EngineShard::tlocal()); + bc.AddWatched(trans_.get()); + EXPECT_EQ(1, bc.NumWatched(0)); + + bc.RemoveWatched(trans_.get()); + EXPECT_EQ(0, bc.NumWatched(0)); + }); +} + +TEST_F(BlockingControllerTest, Timeout) { + time_point tp = steady_clock::now() + chrono::milliseconds(10); + + trans_->Schedule(); + + bool res = trans_->WaitOnWatch(tp); + + EXPECT_FALSE(res); + unsigned num_watched = + ess_->Await(0, [&] { return EngineShard::tlocal()->blocking_controller()->NumWatched(0); }); + + EXPECT_EQ(0, num_watched); + trans_.reset(); + + +} + +} // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a5838d4bb..7db252ee7 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -10,6 +10,7 @@ extern "C" { } #include "base/logging.h" +#include "server/blocking_controller.h" #include "server/tiered_storage.h" #include "server/transaction.h" #include "util/fiber_sched_algo.h" @@ -33,35 +34,6 @@ vector cached_stats; // initialized in EngineShard thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; -struct WatchItem { - ::boost::intrusive_ptr trans; - - WatchItem(Transaction* t) : trans(t) { - } -}; - -struct EngineShard::WatchQueue { - deque items; - TxId notify_txid = UINT64_MAX; - - // Updated by both coordinator and shard threads but at different times. - enum State { SUSPENDED, ACTIVE } state = SUSPENDED; - - void Suspend() { - state = SUSPENDED; - notify_txid = UINT64_MAX; - } -}; - -bool EngineShard::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) { - DVLOG(1) << "Erasing watchqueue key " << it->first; - - awakened_keys.erase(it->first); - queue_map.erase(it); - - return queue_map.empty(); -} - EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), db_slice_(pb->GetIndex(), this) { @@ -125,7 +97,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_)); error_code ec = shard_->tiered_storage_->Open(fn); - CHECK(!ec) << ec.message(); // TODO + CHECK(!ec) << ec.message(); // TODO } } @@ -171,12 +143,13 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; if (!to_keep) { continuation_trans_ = nullptr; - OnTxFinish(); + if (blocking_controller_) + blocking_controller_->OnTxFinish(); } } } - bool has_awaked_trans = HasAwakedTransaction(); + bool has_awaked_trans = blocking_controller_ && blocking_controller_->HasAwakedTransaction(); Transaction* head = nullptr; string dbg_id; @@ -188,6 +161,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { // The fact that Tx is in the queue, already means that coordinator fiber will not progress, // hence here it's enough to test for run_count and check local_mask. bool is_armed = head->IsArmedInShard(sid); + DVLOG(2) << "Considering head " << head->DebugId() << " isarmed: " << is_armed; + if (!is_armed) break; @@ -213,6 +188,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } bool keep = head->RunInShard(this); + // We should not access head from this point since RunInShard callback decrements refcount. DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; @@ -221,7 +197,8 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { break; } - OnTxFinish(); + if (blocking_controller_) + blocking_controller_->OnTxFinish(); } // while(!txq_.Empty()) } else { // if (continuation_trans_ == nullptr && !has_awaked_trans) DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans; @@ -230,8 +207,10 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { // For SUSPENDED_Q - if transaction has not been notified, it will still be // in the watch queue. We need to unlock an Execute by running a noop. if (trans_mask & Transaction::SUSPENDED_Q) { - TxId notify_txid = trans->notify_txid(); - DCHECK(HasResultConverged(notify_txid)); + // This case happens when some other shard notified the transaction and now it + // runs FindFirst on all shards. + // TxId notify_txid = trans->notify_txid(); + // DCHECK(HasResultConverged(notify_txid)); trans->RunNoop(this); return; } @@ -239,7 +218,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { // If trans is out of order, i.e. locks keys that previous transactions have not locked. // It may be that there are other transactions that touch those keys but they necessary ordered // after trans in the queue, hence it's safe to run trans out of order. - if (trans && trans_mask & Transaction::OUT_OF_ORDER) { + if (trans && (trans_mask & Transaction::OUT_OF_ORDER)) { DCHECK(trans != head); DCHECK(!trans->IsMulti()); // multi, global transactions can not be OOO. DCHECK(trans_mask & Transaction::ARMED); @@ -259,239 +238,15 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } } -// Internal function called from ProcessAwakened(). -// Marks the queue as active and notifies the first transaction in the queue. -Transaction* EngineShard::NotifyWatchQueue(WatchQueue* wq) { - wq->state = WatchQueue::ACTIVE; - - auto& q = wq->items; - ShardId sid = shard_id(); - - do { - const WatchItem& wi = q.front(); - Transaction* head = wi.trans.get(); - - if (head->NotifySuspended(committed_txid_, sid)) { - wq->notify_txid = committed_txid_; - return head; - } - - q.pop_front(); - } while (!q.empty()); - - return nullptr; -} - -// Processes potentially awakened keys and verifies that these are indeed -// awakened to eliminate false positives. -// In addition, optionally removes completed_t from the watch queues. -void EngineShard::ProcessAwakened(Transaction* completed_t) { - for (DbIndex index : awakened_indices_) { - DbWatchTable& wt = watched_dbs_[index]; - - for (auto key : wt.awakened_keys) { - string_view sv_key = static_cast(key); - auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item. - if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. - continue; - - auto w_it = wt.queue_map.find(sv_key); - CHECK(w_it != wt.queue_map.end()); - DVLOG(1) << "NotifyWatchQueue " << key; - - Transaction* t2 = NotifyWatchQueue(w_it->second.get()); - if (t2) { - awakened_transactions_.insert(t2); - } - } - wt.awakened_keys.clear(); - } - awakened_indices_.clear(); - - if (!completed_t) - return; - - auto dbit = watched_dbs_.find(completed_t->db_index()); - if (dbit == watched_dbs_.end()) - return; - - DbWatchTable& wt = dbit->second; - KeyLockArgs lock_args = completed_t->GetLockArgs(shard_id()); - - for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { - string_view key = lock_args.args[i]; - auto w_it = wt.queue_map.find(key); - - if (w_it == wt.queue_map.end() || w_it->second->state != WatchQueue::ACTIVE) - continue; - - WatchQueue& wq = *w_it->second; - - DCHECK_LE(wq.notify_txid, committed_txid_); - - auto& queue = wq.items; - DCHECK(!queue.empty()); // since it's active - - if (queue.front().trans != completed_t) - continue; - - DVLOG(1) << "Wakening next transaction for key " << key; - - do { - const WatchItem& bi = queue.front(); - Transaction* head = bi.trans.get(); - - if (head->NotifySuspended(wq.notify_txid, shard_id())) - break; - queue.pop_front(); - } while (!queue.empty()); - - if (queue.empty()) { - wt.RemoveEntry(w_it); - } - } - - if (wt.queue_map.empty()) { - watched_dbs_.erase(dbit); - } - awakened_transactions_.erase(completed_t); -} - -void EngineShard::AddWatched(string_view key, Transaction* me) { - DbWatchTable& wt = watched_dbs_[me->db_index()]; - auto [res, inserted] = wt.queue_map.emplace(key, nullptr); - if (inserted) { - res->second.reset(new WatchQueue); - } - - res->second->items.emplace_back(me); -} - -// Runs in O(N) complexity. -bool EngineShard::RemovedWatched(string_view key, Transaction* me) { - auto dbit = watched_dbs_.find(me->db_index()); - CHECK(dbit != watched_dbs_.end()); - - DbWatchTable& wt = dbit->second; - auto watch_it = wt.queue_map.find(key); - CHECK(watch_it != wt.queue_map.end()); - - WatchQueue& wq = *watch_it->second; - for (auto j = wq.items.begin(); j != wq.items.end(); ++j) { - if (j->trans == me) { - wq.items.erase(j); - if (wq.items.empty()) { - if (wt.RemoveEntry(watch_it)) { - watched_dbs_.erase(dbit); - } - } - return true; - } - } - - LOG(FATAL) << "should not happen"; - - return false; -} - -void EngineShard::GCWatched(const KeyLockArgs& largs) { - auto dbit = watched_dbs_.find(largs.db_index); - CHECK(dbit != watched_dbs_.end()); - - DbWatchTable& wt = dbit->second; - - for (size_t i = 0; i < largs.args.size(); i += largs.key_step) { - string_view key = largs.args[i]; - auto watch_it = wt.queue_map.find(key); - CHECK(watch_it != wt.queue_map.end()); - - WatchQueue& wq = *watch_it->second; - DCHECK(!wq.items.empty()); - do { - auto local_mask = wq.items.front().trans->GetLocalMask(shard_id()); - if ((local_mask & Transaction::EXPIRED_Q) == 0) { - break; - } - wq.items.pop_front(); - } while (!wq.items.empty()); - - if (wq.items.empty()) { - if (wt.RemoveEntry(watch_it)) { - watched_dbs_.erase(dbit); - return; - } - } - } -} - -// Called from commands like lpush. -void EngineShard::AwakeWatched(DbIndex db_index, string_view db_key) { - auto it = watched_dbs_.find(db_index); - if (it == watched_dbs_.end()) - return; - - DbWatchTable& wt = it->second; - DCHECK(!wt.queue_map.empty()); - - auto wit = wt.queue_map.find(db_key); - - if (wit == wt.queue_map.end()) - return; /// Similarly, nobody watches this key. - - string_view key = wit->first; - - // Already awakened this key. - if (wt.awakened_keys.find(key) != wt.awakened_keys.end()) - return; - - wt.awakened_keys.insert(wit->first); - awakened_indices_.insert(db_index); -} - void EngineShard::ShutdownMulti(Transaction* multi) { if (continuation_trans_ == multi) { continuation_trans_ = nullptr; } - OnTxFinish(); -} - -void EngineShard::WaitForConvergence(TxId notifyid, Transaction* t) { - DVLOG(1) << "ConvergeNotification " << t->DebugId() << " at notify " << notifyid; - waiting_convergence_.emplace(notifyid, t); -} - -void EngineShard::OnTxFinish() { - DCHECK(continuation_trans_ == nullptr); // By definition of OnTxFinish. - - if (waiting_convergence_.empty()) - return; - - if (txq_.Empty()) { - for (const auto& k_v : waiting_convergence_) { - NotifyConvergence(k_v.second); - } - waiting_convergence_.clear(); - return; - } - - TxId txq_score = txq_.HeadScore(); - do { - auto tx_waiting = waiting_convergence_.begin(); - - // Instead of taking the map key, we use upto date notify_txid - // That could meanwhile improve. Not important though. - TxId notifyid = tx_waiting->second->notify_txid(); - if (notifyid > committed_txid_ && txq_score <= tx_waiting->first) - break; - auto nh = waiting_convergence_.extract(tx_waiting); - NotifyConvergence(nh.mapped()); - } while (!waiting_convergence_.empty()); -} - -void EngineShard::NotifyConvergence(Transaction* tx) { - LOG(FATAL) << "TBD"; + if (blocking_controller_) + blocking_controller_->OnTxFinish(); } +#if 0 // There are several cases that contain proof of convergence for this shard: // 1. txq_ empty - it means that anything that is goonna be scheduled will already be scheduled // with txid > notifyid. @@ -499,15 +254,35 @@ void EngineShard::NotifyConvergence(Transaction* tx) { // notifyid. // 3. committed_txid_ == notifyid, then if a transaction in progress (continuation_trans_ != NULL) // the this transaction can still affect the result, hence we require continuation_trans_ is null -// which will point to converged result @notifyid. -// 4. Finally with committed_txid_ < notifyid and continuation_trans_ == nullptr, +// which will point to converged result @notifyid. However, we never awake a transaction +// when there is a multi-hop transaction in progress to avoid false positives. +// Therefore, continuation_trans_ must always be null when calling this function. +// 4. Finally with committed_txid_ < notifyid. // we can check if the next in line (HeadScore) is after notifyid in that case we can also // conclude regarding the result convergence for this shard. +// bool EngineShard::HasResultConverged(TxId notifyid) const { - return txq_.Empty() || committed_txid_ > notifyid || - (continuation_trans_ == nullptr && - (committed_txid_ == notifyid || txq_.HeadScore() > notifyid)); + CHECK(continuation_trans_ == nullptr); + + if (committed_txid_ >= notifyid) + return true; + + // This could happen if a single lpush (not in transaction) woke multi-shard blpop. + DVLOG(1) << "HasResultConverged: cmtxid - " << committed_txid_ << " vs " << notifyid; + + // We must check for txq head - it's not an optimization - we need it for correctness. + // If a multi-transaction has been scheduled and it does not have any presence in + // this shard (no actual keys) and we won't check for it HasResultConverged will + // return false. The blocked transaction will wait for this shard to progress and + // will also block other shards from progressing (where it has been notified). + // If this multi-transaction has presence in those shards, it won't progress there as well. + // Therefore, we will get a deadlock. By checking txid of the head we will avoid this situation: + // if the head.txid is after notifyid then this shard obviously converged. + // if the head.txid <= notifyid that transaction will be able to progress in other shards. + // and we must wait for it to finish. + return txq_.Empty() || txq_.HeadScore() > notifyid; } +#endif void EngineShard::CacheStats() { #if 0 @@ -542,6 +317,13 @@ size_t EngineShard::UsedMemory() const { return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); } +void EngineShard::AddBlocked(Transaction* trans) { + if (!blocking_controller_) { + blocking_controller_.reset(new BlockingController(this)); + } + blocking_controller_->AddWatched(trans); +} + /** diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 6527ec871..6725d2e02 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -25,6 +25,7 @@ extern "C" { namespace dfly { class TieredStorage; +class BlockingController; class EngineShard { public: @@ -89,34 +90,9 @@ class EngineShard { return &shard_lock_; } - // Iterates over awakened key candidates in each db and moves verified ones into - // global verified_awakened_ array. - // Returns true if there are active awakened keys, false otherwise. - // It has 2 responsibilities. - // 1: to go over potential wakened keys, verify them and activate watch queues. - // 2: if t is awaked and finished running - to remove it from the head - // of the queue and notify the next one. - // If t is null then second part is omitted. - void ProcessAwakened(Transaction* t); - - // Blocking API - // TODO: consider moving all watched functions to - // EngineShard with separate per db map. - //! AddWatched adds a transaction to the blocking queue. - void AddWatched(std::string_view key, Transaction* me); - bool RemovedWatched(std::string_view key, Transaction* me); - void GCWatched(const KeyLockArgs& lock_args); - - void AwakeWatched(DbIndex db_index, std::string_view db_key); - - bool HasAwakedTransaction() const { - return !awakened_transactions_.empty(); - } // TODO: Awkward interface. I should solve it somehow. void ShutdownMulti(Transaction* multi); - void WaitForConvergence(TxId notifyid, Transaction* t); - bool HasResultConverged(TxId notifyid) const; void IncQuickRun() { stats_.quick_runs++; @@ -131,44 +107,41 @@ class EngineShard { TieredStorage* tiered_storage() { return tiered_storage_.get(); } + // Adds blocked transaction to the watch-list. + void AddBlocked(Transaction* trans); + + BlockingController* blocking_controller() { + return blocking_controller_.get(); + } + // for everyone to use for string transformations during atomic cpu sequences. sds tmp_str1, tmp_str2; +#if 0 + size_t TEST_WatchedDbsLen() const { + return watched_dbs_.size(); + } + + size_t TEST_AwakenIndicesLen() const { + return awakened_indices_.size(); + } + + size_t TEST_AwakenTransLen() const { + return awakened_transactions_.size(); + } +#endif + + bool HasResultConverged(TxId notifyid) const; + private: EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); // blocks the calling fiber. void Shutdown(); // called before destructing EngineShard. - struct WatchQueue; - void OnTxFinish(); - void NotifyConvergence(Transaction* tx); void CacheStats(); - /// Returns the notified transaction, - /// or null if all transactions in the queue have expired.. - Transaction* NotifyWatchQueue(WatchQueue* wq); - - using WatchQueueMap = absl::flat_hash_map>; - - // Watch state per db. - struct DbWatchTable { - WatchQueueMap queue_map; - - // awakened keys point to blocked keys that can potentially be unblocked. - // they reference key objects in queue_map. - absl::flat_hash_set awakened_keys; - - // Returns true if queue_map is empty and DbWatchTable can be removed as well. - bool RemoveEntry(WatchQueueMap::iterator it); - }; - - absl::flat_hash_map watched_dbs_; - absl::flat_hash_set awakened_indices_; - absl::flat_hash_set awakened_transactions_; - - absl::btree_multimap waiting_convergence_; ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; @@ -188,6 +161,7 @@ class EngineShard { uint32_t periodic_task_ = 0; uint64_t task_iters_ = 0; std::unique_ptr tiered_storage_; + std::unique_ptr blocking_controller_; static thread_local EngineShard* shard_; }; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index da5874cc5..14f809a57 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -13,6 +13,7 @@ extern "C" { #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" +#include "server/blocking_controller.h" #include "server/error.h" #include "server/transaction.h" #include "util/varz.h" @@ -155,8 +156,8 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts); } - if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST) { - es->AwakeWatched(db_indx_, dest_key); + if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { + es->blocking_controller()->AwakeWatched(db_indx_, dest_key); } } @@ -580,7 +581,8 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys) OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key, bool skip_exists) { - auto& db_slice = op_args.shard->db_slice(); + auto* es = op_args.shard; + auto& db_slice = es->db_slice(); auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; @@ -619,8 +621,8 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts); } - if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST) { - op_args.shard->AwakeWatched(op_args.db_ind, to_key); + if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { + es->blocking_controller()->AwakeWatched(op_args.db_ind, to_key); } return OpStatus::OK; } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 9705d566a..e67a35392 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -11,6 +11,8 @@ extern "C" { #include #include "base/logging.h" + +#include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" @@ -146,7 +148,6 @@ BPopper::BPopper(ListDir dir) : dir_(dir) { } OpStatus BPopper::Run(Transaction* t, unsigned msec) { - OpResult result; using time_point = Transaction::time_point; time_point tp = @@ -158,19 +159,11 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { auto* stats = ServerState::tl_connection_stats(); - while (true) { - result = t->FindFirst(); - - if (result) - break; - - if (result.status() != OpStatus::KEY_NOTFOUND) { // Some error occurred. - // We could be registered in the queue due to previous iterations. - t->UnregisterWatch(); - break; - } + OpResult result = t->FindFirst(); + if (result.status() == OpStatus::KEY_NOTFOUND) { if (is_multi) { + // close transaction and return. auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; t->Execute(std::move(cb), true); @@ -183,10 +176,14 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { if (!wait_succeeded) return OpStatus::TIMED_OUT; + + result = t->FindFirst(); // retry - must find something. } - if (!result) + if (!result) { + t->UnregisterWatch(); return result.status(); + } VLOG(1) << "Popping an element"; find_sid_ = result->sid; @@ -546,11 +543,10 @@ OpResult ListFamily::OpPush(const OpArgs& op_args, std::string_view ke quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos); } - if (new_key) { - // TODO: to use PrimeKey for watched table. + if (new_key && es->blocking_controller()) { string tmp; string_view key = it->first.GetSlice(&tmp); - es->AwakeWatched(op_args.db_ind, key); + es->blocking_controller()->AwakeWatched(op_args.db_ind, key); } else { es->db_slice().PostUpdate(op_args.db_ind, it); } diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 32dad5e6d..7d70b3a12 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -137,7 +137,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) { EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3")); ASSERT_FALSE(IsLocked(0, kKey1)); ASSERT_FALSE(IsLocked(0, kKey2)); - ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); }); + // ess_->RunBriefInParallel([](EngineShard* es) { ASSERT_FALSE(es->HasAwakedTransaction()); }); } TEST_F(ListFamilyTest, BLPopTimeout) { @@ -159,8 +159,10 @@ TEST_F(ListFamilyTest, BLPopTimeout) { TEST_F(ListFamilyTest, BLPopTimeout2) { Run({"BLPOP", "blist1", "blist2", "0.1"}); + Run({"RPUSH", "blist2", "d"}); Run({"RPUSH", "blist2", "hello"}); + auto resp = Run({"BLPOP", "blist1", "blist2", "1"}); ASSERT_THAT(resp, ArrLen(2)); ASSERT_THAT(resp.GetVec(), ElementsAre("blist2", "d")); @@ -168,48 +170,48 @@ TEST_F(ListFamilyTest, BLPopTimeout2) { Run({"RPUSH", "blist1", "a"}); Run({"DEL", "blist2"}); Run({"RPUSH", "blist2", "d"}); - // Run({"BLPOP", "blist1", "blist2", "1"}); + Run({"BLPOP", "blist1", "blist2", "1"}); } -TEST_F(ListFamilyTest, LRem) { - auto resp = Run({"rpush", kKey1, "a", "b", "a", "c"}); - ASSERT_THAT(resp, IntArg(4)); - resp = Run({"lrem", kKey1, "2", "a"}); - ASSERT_THAT(resp, IntArg(2)); +TEST_F(ListFamilyTest, BLPopMultiPush) { + Run({"exists", kKey1, kKey2, kKey3}); + ASSERT_EQ(3, GetDebugInfo().shards_count); + RespExpr blpop_resp; + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); + }); - resp = Run({"lrange", kKey1, "0", "1"}); - ASSERT_THAT(resp, ArrLen(2)); - ASSERT_THAT(resp.GetVec(), ElementsAre("b", "c")); -} + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); -TEST_F(ListFamilyTest, LTrim) { - Run({"rpush", kKey1, "a", "b", "c", "d"}); - ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK"); - auto resp = Run({"lrange", kKey1, "0", "1"}); - ASSERT_THAT(resp, ArrLen(2)); - ASSERT_THAT(resp.GetVec(), ElementsAre("c", "d")); - ASSERT_EQ(Run({"ltrim", kKey1, "0", "0"}), "OK"); - ASSERT_EQ(Run({"lrange", kKey1, "0", "1"}), "c"); -} + auto p1_fb = pp_->at(1)->LaunchFiber([&] { + for (unsigned i = 0; i < 100; ++i) { + // a filler command to create scheduling queue. + Run({"exists", kKey1, kKey2, kKey3}); + } + }); -TEST_F(ListFamilyTest, LRange) { - auto resp = Run({"lrange", kKey1, "0", "5"}); - ASSERT_THAT(resp, ArrLen(0)); - Run({"rpush", kKey1, "0", "1", "2"}); - resp = Run({"lrange", kKey1, "-2", "-1"}); + auto p2_fb = pp_->at(2)->LaunchFiber([&] { + Run({"multi"}); + Run({"lpush", kKey3, "C"}); + Run({"exists", kKey2}); + Run({"lpush", kKey2, "B"}); + Run({"exists", kKey1}); + Run({"lpush", kKey1, "A"}); + Run({"exists", kKey1, kKey2, kKey3}); + auto resp = Run({"exec"}); + ASSERT_THAT(resp, ArrLen(6)); + }); - ASSERT_THAT(resp, ArrLen(2)); - ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2")); -} + p1_fb.join(); + p2_fb.join(); -TEST_F(ListFamilyTest, Lset) { - Run({"rpush", kKey1, "0", "1", "2"}); - ASSERT_EQ(Run({"lset", kKey1, "0", "bar"}), "OK"); - ASSERT_EQ(Run({"lpop", kKey1}), "bar"); - ASSERT_EQ(Run({"lset", kKey1, "-1", "foo"}), "OK"); - ASSERT_EQ(Run({"rpop", kKey1}), "foo"); - Run({"rpush", kKey2, "a"}); - ASSERT_THAT(Run({"lset", kKey2, "1", "foo"}), ErrArg("index out of range")); + pop_fb.join(); + + ASSERT_THAT(blpop_resp, ArrLen(2)); + auto resp_arr = blpop_resp.GetVec(); + EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A")); } TEST_F(ListFamilyTest, BLPopSerialize) { @@ -228,16 +230,22 @@ TEST_F(ListFamilyTest, BLPopSerialize) { TxClock cl1, cl2; auto p1_fb = pp_->at(1)->LaunchFiber([&] { - auto resp = Run({"multi"}); // We use multi to assign ts to lpush. - ASSERT_EQ(resp, "OK"); + // auto resp = Run({"multi"}); // We use multi to assign ts to lpush. + // ASSERT_EQ(resp, "OK"); Run({"lpush", kKey1, "A"}); + + /*for (unsigned i = 0; i < 10; ++i) { + // dummy command to prolong this transaction and make convergence more complicated. + Run({"exists", kKey1, kKey2, kKey3}); + } + resp = Run({"exec"}); // Either this lpush has run first or the one below. // In any case it must be that between 2 invocations of lpush (wrapped in multi) // blpop will be triggerred and it will empty the list again. Hence, in any case // lpush kKey1 here and below should return 1. - ASSERT_THAT(resp, IntArg(1)); + ASSERT_THAT(resp, ArrLen(11));*/ cl1 = GetDebugInfo("IO1").clock; LOG(INFO) << "push1 ts: " << cl1; }); @@ -245,12 +253,20 @@ TEST_F(ListFamilyTest, BLPopSerialize) { auto p2_fb = pp_->at(2)->LaunchFiber([&] { auto resp = Run({"multi"}); // We use multi to assign ts to lpush. ASSERT_EQ(resp, "OK"); + for (unsigned i = 0; i < 10; ++i) { + // dummy command to prolong this transaction and make convergence more complicated. + Run({"exists", kKey1, kKey2, kKey3}); + } Run({"lpush", kKey1, "B"}); Run({"lpush", kKey2, "C"}); + resp = Run({"exec"}); - ASSERT_THAT(resp, ArrLen(2)); - EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1))); + ASSERT_THAT(resp, ArrLen(12)); + /*auto sub_arr = resp.GetVec(); + EXPECT_THAT(sub_arr[0], IntArg(1)); + EXPECT_THAT(sub_arr[1], IntArg(1));*/ + cl2 = GetDebugInfo("IO2").clock; LOG(INFO) << "push2 ts: " << cl2; }); @@ -355,4 +371,45 @@ TEST_F(ListFamilyTest, BPopRename) { EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); } +TEST_F(ListFamilyTest, LRem) { + auto resp = Run({"rpush", kKey1, "a", "b", "a", "c"}); + ASSERT_THAT(resp, IntArg(4)); + resp = Run({"lrem", kKey1, "2", "a"}); + ASSERT_THAT(resp, IntArg(2)); + + resp = Run({"lrange", kKey1, "0", "1"}); + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("b", "c")); +} + +TEST_F(ListFamilyTest, LTrim) { + Run({"rpush", kKey1, "a", "b", "c", "d"}); + ASSERT_EQ(Run({"ltrim", kKey1, "-2", "-1"}), "OK"); + auto resp = Run({"lrange", kKey1, "0", "1"}); + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("c", "d")); + ASSERT_EQ(Run({"ltrim", kKey1, "0", "0"}), "OK"); + ASSERT_EQ(Run({"lrange", kKey1, "0", "1"}), "c"); +} + +TEST_F(ListFamilyTest, LRange) { + auto resp = Run({"lrange", kKey1, "0", "5"}); + ASSERT_THAT(resp, ArrLen(0)); + Run({"rpush", kKey1, "0", "1", "2"}); + resp = Run({"lrange", kKey1, "-2", "-1"}); + + ASSERT_THAT(resp, ArrLen(2)); + ASSERT_THAT(resp.GetVec(), ElementsAre("1", "2")); +} + +TEST_F(ListFamilyTest, Lset) { + Run({"rpush", kKey1, "0", "1", "2"}); + ASSERT_EQ(Run({"lset", kKey1, "0", "bar"}), "OK"); + ASSERT_EQ(Run({"lpop", kKey1}), "bar"); + ASSERT_EQ(Run({"lset", kKey1, "-1", "foo"}), "OK"); + ASSERT_EQ(Run({"rpop", kKey1}), "foo"); + Run({"rpush", kKey2, "a"}); + ASSERT_THAT(Run({"lset", kKey2, "1", "foo"}), ErrArg("index out of range")); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index be80ab05a..8a206585f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" @@ -410,7 +411,8 @@ bool Transaction::RunInShard(EngineShard* shard) { // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head // of the queue and notify the next one. - shard->ProcessAwakened(awaked_prerun ? this : nullptr); + if (shard->blocking_controller()) + shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr); } } @@ -444,7 +446,7 @@ void Transaction::RunNoop(EngineShard* shard) { if (sd.local_mask & SUSPENDED_Q) { sd.local_mask |= EXPIRED_Q; - shard->GCWatched(largs); + shard->blocking_controller()->RemoveWatched(this); } } // Decrease run count after we update all the data in the transaction object. @@ -517,7 +519,7 @@ void Transaction::ScheduleInternal() { DVLOG(1) << "Cancelling " << DebugId(); auto cancel = [&](EngineShard* shard) { - success.fetch_sub(CancelInShard(shard), memory_order_relaxed); + success.fetch_sub(CancelShardCb(shard), memory_order_relaxed); }; ess_->RunBriefInParallel(std::move(cancel), is_active); @@ -645,7 +647,8 @@ void Transaction::UnlockMulti() { shard->ShutdownMulti(this); // notify awakened transactions. - shard->ProcessAwakened(nullptr); + if (shard->blocking_controller()) + shard->blocking_controller()->RunStep(nullptr); shard->PollExecution("unlockmulti", nullptr); this->DecreaseRunCnt(); @@ -665,6 +668,8 @@ void Transaction::UnlockMulti() { // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { + DCHECK(coordinator_state_ & COORD_SCHED); + cb_ = std::move(cb); coordinator_state_ |= COORD_EXEC; @@ -730,7 +735,7 @@ void Transaction::ExecuteAsync() { uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release); bool should_poll = (seq_after == seq) && (local_mask & ARMED); - DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " " + DVLOG(2) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " << run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll; // We verify that this callback is still relevant. @@ -790,42 +795,25 @@ void Transaction::RunQuickie(EngineShard* shard) { } // runs in coordinator thread. -// Marks the transaction as expired but does not remove it from the waiting queue. +// Marks the transaction as expired and removes it from the waiting queue. void Transaction::ExpireBlocking() { DVLOG(1) << "ExpireBlocking " << DebugId(); DCHECK(!IsGlobal()); run_count_.store(unique_shard_cnt_, memory_order_release); - auto expire_cb = [this] { - EngineShard* shard = EngineShard::tlocal(); - - auto lock_args = GetLockArgs(shard->shard_id()); - shard->db_slice().Release(Mode(), lock_args); - - unsigned sd_idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[sd_idx]; - sd.local_mask |= EXPIRED_Q; - sd.local_mask &= ~KEYLOCK_ACQUIRED; - - // Need to see why I decided to call this. - // My guess - probably to trigger the run of stalled transactions in case - // this shard concurrently awoke this transaction and stalled the processing - // of TxQueue. - shard->PollExecution("expirecb", nullptr); - - CHECK_GE(DecreaseRunCnt(), 1u); - }; + auto expire_cb = [this] { ExpireShardCb(EngineShard::tlocal()); }; if (unique_shard_cnt_ == 1) { DCHECK_LT(unique_shard_id_, ess_->size()); - ess_->Add(unique_shard_id_, std::move(expire_cb)); + ess_->Add(unique_shard_id_, move(expire_cb)); } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; DCHECK_EQ(0, sd.local_mask & ARMED); if (sd.arg_count == 0) continue; + ess_->Add(i, expire_cb); } } @@ -950,7 +938,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { return result; } -bool Transaction::CancelInShard(EngineShard* shard) { +bool Transaction::CancelShardCb(EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; @@ -1001,17 +989,16 @@ bool Transaction::WaitOnWatch(const time_point& tp) { VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")"; using namespace chrono; - // wake_txid_.store(kuint64max, std::memory_order_relaxed); - Execute([](auto* t, auto* shard) { return t->AddToWatchedShardCb(shard); }, true); + Execute([](Transaction* t, EngineShard* shard) { return t->AddToWatchedShardCb(shard); }, true); + coordinator_state_ |= COORD_BLOCKED; - bool res = true; // returns false if timeout occurs. auto wake_cb = [this] { return (coordinator_state_ & COORD_CANCELLED) || notify_txid_.load(memory_order_relaxed) != kuint64max; }; - cv_status status = cv_status::no_timeout; + cv_status status = cv_status::no_timeout; if (tp == time_point::max()) { DVLOG(1) << "WaitOnWatch foreva " << DebugId(); blocking_ec_.await(move(wake_cb)); @@ -1031,20 +1018,14 @@ bool Transaction::WaitOnWatch(const time_point& tp) { return false; } +#if 0 // We were notified by a shard, so lets make sure that our notifications converged to a stable // form. if (unique_shard_cnt_ > 1) { run_count_.store(unique_shard_cnt_, memory_order_release); - auto converge_cb = [this] { - EngineShard* shard = EngineShard::tlocal(); - auto& sd = shard_data_[shard->shard_id()]; - TxId notify = notify_txid(); - if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) { - CHECK_GE(DecreaseRunCnt(), 1u); - return; - } - shard->WaitForConvergence(notify, this); + auto converge_cb = [this] { + this->CheckForConvergence(EngineShard::tlocal()); }; for (ShardId i = 0; i < shard_data_.size(); ++i) { @@ -1059,11 +1040,12 @@ bool Transaction::WaitOnWatch(const time_point& tp) { WaitForShardCallbacks(); DVLOG(1) << "Convergence finished " << DebugId(); } +#endif // Lift blocking mask. coordinator_state_ &= ~COORD_BLOCKED; - return res; + return true; } void Transaction::UnregisterWatch() { @@ -1082,10 +1064,7 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); DCHECK_EQ(0, sd.local_mask & ARMED); - auto args = ShardArgsInShard(shard->shard_id()); - for (auto s : args) { - shard->AddWatched(s, this); - } + shard->AddBlocked(this); sd.local_mask |= SUSPENDED_Q; DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask; @@ -1106,14 +1085,53 @@ bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) { sd.local_mask &= ~kQueueMask; - // TODO: what if args have keys and values? - auto args = ShardArgsInShard(shard->shard_id()); - for (auto s : args) { - shard->RemovedWatched(s, this); - } + shard->blocking_controller()->RemoveWatched(this); + return true; } +void Transaction::ExpireShardCb(EngineShard* shard) { + auto lock_args = GetLockArgs(shard->shard_id()); + shard->db_slice().Release(Mode(), lock_args); + + unsigned sd_idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[sd_idx]; + sd.local_mask |= EXPIRED_Q; + sd.local_mask &= ~KEYLOCK_ACQUIRED; + + shard->blocking_controller()->RemoveWatched(this); + + // Need to see why I decided to call this. + // My guess - probably to trigger the run of stalled transactions in case + // this shard concurrently awoke this transaction and stalled the processing + // of TxQueue. + shard->PollExecution("expirecb", nullptr); + + CHECK_GE(DecreaseRunCnt(), 1u); +} + +#if 0 +// HasResultConverged has detailed documentation on how convergence is determined. +void Transaction::CheckForConvergence(EngineShard* shard) { + unsigned idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[idx]; + + TxId notify = notify_txid(); + + if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) { + CHECK_GE(DecreaseRunCnt(), 1u); + return; + } + + LOG(DFATAL) << "TBD"; + + BlockingController* bc = shard->blocking_controller(); + CHECK(bc); // must be present because we have watched this shard before. + + bc->RegisterAwaitForConverge(this); +} +#endif + inline uint32_t Transaction::DecreaseRunCnt() { // to protect against cases where Transaction is destroyed before run_ec_.notify // finishes running. We can not put it inside the (res == 1) block because then it's too late. @@ -1132,7 +1150,7 @@ bool Transaction::IsGlobal() const { } // Runs only in the shard thread. -// Returns true if the transcton has changed its state from suspended to awakened, +// Returns true if the transacton has changed its state from suspended to awakened, // false, otherwise. bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { unsigned idx = SidToId(sid); diff --git a/src/server/transaction.h b/src/server/transaction.h index e5157b840..e31fd4974 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -220,11 +220,13 @@ class Transaction { std::pair ScheduleInShard(EngineShard* shard); // Returns true if operation was cancelled for this shard. Runs in the shard thread. - bool CancelInShard(EngineShard* shard); + bool CancelShardCb(EngineShard* shard); // Shard callbacks used within Execute calls OpStatus AddToWatchedShardCb(EngineShard* shard); bool RemoveFromWatchedShardCb(EngineShard* shard); + void ExpireShardCb(EngineShard* shard); + void CheckForConvergence(EngineShard* shard); void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); @@ -251,7 +253,7 @@ class Transaction { // Bitmask of LocalState enums. uint16_t local_mask{0}; - // Needed to rollback invalid schedulings or remove OOO transactions from + // Needed to rollback inconsistent schedulings or remove OOO transactions from // tx queue. uint32_t pq_pos = TxQueue::kEnd;