From cb82680aca7b1ad4f0a98418d3b45cbd5cd0e62f Mon Sep 17 00:00:00 2001 From: Vladislav Date: Wed, 3 May 2023 19:45:06 +0300 Subject: [PATCH] Remove blpop FindFirst hop after wakeup (#1168) Remove BLPOP hop after wake --- src/server/blocking_controller.cc | 2 +- src/server/list_family.cc | 103 +++++++++++++----------------- src/server/list_family_test.cc | 75 +--------------------- src/server/transaction.cc | 39 +++++++---- src/server/transaction.h | 20 +++--- 5 files changed, 83 insertions(+), 156 deletions(-) diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 3682604cc..b6b082b72 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -287,7 +287,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w Transaction* head = wi.get(); DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key; - if (head->NotifySuspended(owner_->committed_txid(), sid)) { + if (head->NotifySuspended(owner_->committed_txid(), sid, key)) { // We deliberately keep the notified transaction in the queue to know which queue // must handled when this transaction finished. wq->notify_txid = owner_->committed_txid(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index a5a035125..d536cdd3e 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -136,7 +136,7 @@ struct ShardFFResult { }; // Used by bpopper. -OpResult FindFirst(bool awaked_only, Transaction* trans) { +OpResult FindFirst(Transaction* trans) { VLOG(2) << "FindFirst::Find " << trans->DebugId(); // Holds Find results: (iterator to a found key, and its index in the passed arguments). @@ -145,31 +145,17 @@ OpResult FindFirst(bool awaked_only, Transaction* trans) { std::vector> find_res(shard_set->size()); fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); - // We must capture notify_txid before we spawn callbacks. - // Otherwise, consider the following scenario: - // 0. The key is added in shard 0, with notify_txid = 100 - // 1. The cb runs first on shard1 and does not find anything. - // 2. A tx 99 runs on shard 1, adds a key, updates notify_txid to 99. - // 3. the cb on shard 0 runs and ignores the key due to lower notify_txid. - uint64_t notify_txid = trans->GetNotifyTxid(); - auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->GetShardArgs(shard->shard_id()); - // if requested to consider awaked shards only, we check the AWAKED_Q flag. - if (awaked_only && (t->GetLocalMask(shard->shard_id()) & Transaction::AWAKED_Q) == 0) { - return OpStatus::OK; - } - if (shard->committed_txid() <= notify_txid) { - OpResult> ff_res = - shard->db_slice().FindFirst(t->GetDbContext(), args); + OpResult> ff_res = + shard->db_slice().FindFirst(t->GetDbContext(), args); - if (ff_res) { - FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); - find_res[shard->shard_id()] = move(ff_result); - } else { - find_res[shard->shard_id()] = ff_res.status(); - } + if (ff_res) { + FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); + find_res[shard->shard_id()] = move(ff_result); + } else { + find_res[shard->shard_id()] = ff_res.status(); } return OpStatus::OK; @@ -229,6 +215,7 @@ class BPopper { private: void Pop(Transaction* t, EngineShard* shard); + void OpPop(Transaction* t, EngineShard* shard); ListDir dir_; @@ -258,25 +245,25 @@ BPopper::BPopper(ListDir dir) : dir_(dir) { } OpStatus BPopper::Run(Transaction* trans, unsigned msec) { - time_point tp = - msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); + auto tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); bool is_multi = trans->IsMulti(); + trans->Schedule(); auto* stats = ServerState::tl_connection_stats(); - OpResult result = FindFirst(false, trans); + OpResult result = FindFirst(trans); - if (result.status() == OpStatus::KEY_NOTFOUND) { + if (result.ok()) { + ff_result_ = move(result.value()); + } else if (result.status() == OpStatus::KEY_NOTFOUND) { + // Close transaction and return. if (is_multi) { - // close transaction and return. auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; trans->Execute(std::move(cb), true); - return OpStatus::TIMED_OUT; } - // Block auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; @@ -288,25 +275,16 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) { if (!wait_succeeded) return OpStatus::TIMED_OUT; - - // Now we have something for sure. - result = FindFirst(true, trans); // retry - must find something. - } - - if (!result) { + } else { // Could be the wrong-type error. // cleanups, locks removal etc. auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; trans->Execute(std::move(cb), true); DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND); - return result.status(); } - VLOG(1) << "Popping an element " << trans->DebugId(); - ff_result_ = move(result.value()); - auto cb = [this](Transaction* t, EngineShard* shard) { Pop(t, shard); return OpStatus::OK; @@ -317,27 +295,35 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) { } void BPopper::Pop(Transaction* t, EngineShard* shard) { - if (shard->shard_id() == ff_result_.sid) { + if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) { + key_ = *wake_key; + OpPop(t, shard); + } else if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); - auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST); - CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok. - PrimeIterator it = *it_res; - quicklist* ql = GetQL(it->second); + OpPop(t, shard); + } +} - DVLOG(2) << "popping from " << key_ << " " << t->DebugId(); - db_slice.PreUpdate(t->GetDbIndex(), it); - value_ = ListPop(dir_, ql); - db_slice.PostUpdate(t->GetDbIndex(), it, key_); - if (quicklistCount(ql) == 0) { - DVLOG(1) << "deleting key " << key_ << " " << t->DebugId(); - CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); - } - OpArgs op_args = t->GetOpArgs(shard); - if (op_args.shard->journal()) { - string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP"; - RecordJournal(op_args, command, ArgSlice{key_}, 1); - } +void BPopper::OpPop(Transaction* t, EngineShard* shard) { + auto& db_slice = shard->db_slice(); + auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST); + CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok. + PrimeIterator it = *it_res; + + quicklist* ql = GetQL(it->second); + + DVLOG(2) << "popping from " << key_ << " " << t->DebugId(); + db_slice.PreUpdate(t->GetDbIndex(), it); + value_ = ListPop(dir_, ql); + db_slice.PostUpdate(t->GetDbIndex(), it, key_); + if (quicklistCount(ql) == 0) { + DVLOG(1) << "deleting key " << key_ << " " << t->DebugId(); + CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); + } + OpArgs op_args = t->GetOpArgs(shard); + if (op_args.shard->journal()) { + string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP"; + RecordJournal(op_args, command, ArgSlice{key_}, 1); } } @@ -1007,7 +993,6 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { // Block ++stats->num_blocked_clients; - bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); --stats->num_blocked_clients; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 5dd1530a4..dbe1080c1 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -233,82 +233,13 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { pop_fb.Join(); + // We can't determine what key was popped, so only check result presence. + // It might not be first kKey3 "C" because of squashing and re-ordering. ASSERT_THAT(blpop_resp, ArrLen(2)); - auto resp_arr = blpop_resp.GetVec(); - EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A")); + ASSERT_THAT(Run({"exists", kKey1, kKey2, kKey3}), IntArg(2)); ASSERT_EQ(0, NumWatched()); } -TEST_F(ListFamilyTest, BLPopSerialize) { - RespExpr blpop_resp; - - auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { - blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); - }); - - WaitUntilLocked(0, kKey1); - - LOG(INFO) << "Starting multi"; - - TxClock cl1, cl2; - - auto p1_fb = pp_->at(1)->LaunchFiber([&] { - // auto resp = Run({"multi"}); // We use multi to assign ts to lpush. - // ASSERT_EQ(resp, "OK"); - Run({"lpush", kKey1, "A"}); - - /*for (unsigned i = 0; i < 10; ++i) { - // dummy command to prolong this transaction and make convergence more complicated. - Run({"exists", kKey1, kKey2, kKey3}); - } - - resp = Run({"exec"}); - - // Either this lpush has run first or the one below. - // In any case it must be that between 2 invocations of lpush (wrapped in multi) - // blpop will be triggered and it will empty the list again. Hence, in any case - // lpush kKey1 here and below should return 1. - ASSERT_THAT(resp, ArrLen(11));*/ - cl1 = GetDebugInfo("IO1").clock; - LOG(INFO) << "push1 ts: " << cl1; - }); - - auto p2_fb = pp_->at(2)->LaunchFiber([&] { - auto resp = Run({"multi"}); // We use multi to assign ts to lpush. - ASSERT_EQ(resp, "OK"); - for (unsigned i = 0; i < 10; ++i) { - // dummy command to prolong this transaction and make convergence more complicated. - Run({"exists", kKey1, kKey2, kKey3}); - } - Run({"lpush", kKey1, "B"}); - Run({"lpush", kKey2, "C"}); - - resp = Run({"exec"}); - - ASSERT_THAT(resp, ArrLen(12)); - /*auto sub_arr = resp.GetVec(); - EXPECT_THAT(sub_arr[0], IntArg(1)); - EXPECT_THAT(sub_arr[1], IntArg(1));*/ - - cl2 = GetDebugInfo("IO2").clock; - LOG(INFO) << "push2 ts: " << cl2; - }); - - p1_fb.Join(); - p2_fb.Join(); - - pop_fb.Join(); - ASSERT_THAT(blpop_resp, ArrLen(2)); - auto resp_arr = blpop_resp.GetVec(); - EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING))); - - if (cl2 < cl1) { - EXPECT_EQ(resp_arr[1], "B"); - } else { - EXPECT_EQ(resp_arr[1], "A"); - } -} - TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { RespExpr blpop_resp; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index fe5800a0a..053d4fac9 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1107,7 +1107,7 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi auto wake_cb = [this] { return (coordinator_state_ & COORD_CANCELLED) || - notify_txid_.load(memory_order_relaxed) != kuint64max; + wakeup_requested_.load(memory_order_relaxed) > 0; }; cv_status status = cv_status::no_timeout; @@ -1257,7 +1257,7 @@ bool Transaction::IsGlobal() const { // Runs only in the shard thread. // Returns true if the transacton has changed its state from suspended to awakened, // false, otherwise. -bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { +bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) { unsigned idx = SidToId(sid); auto& sd = shard_data_[idx]; unsigned local_mask = sd.local_mask; @@ -1266,6 +1266,12 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } + // Wake a transaction only once on the first notify. + // We don't care about preserving the strict order with multiple operations running on blocking + // keys in parallel, because the internal order is not observable from outside either way. + if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0) + return false; + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id " << committed_txid; @@ -1277,20 +1283,25 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { sd.local_mask &= ~SUSPENDED_Q; sd.local_mask |= AWAKED_Q; - TxId notify_id = notify_txid_.load(memory_order_relaxed); - - while (committed_txid < notify_id) { - if (notify_txid_.compare_exchange_weak(notify_id, committed_txid, memory_order_relaxed)) { - // if we improved notify_txid_ - break. - blocking_ec_.notify(); // release barrier. - break; - } - } - return true; + // Find index of awakened key. + auto args = GetShardArgs(sid); + auto it = + find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; }); + DCHECK(it != args.end()); + sd.wake_key_pos = it - args.begin(); } - CHECK(sd.local_mask & AWAKED_Q); - return false; + blocking_ec_.notify(); + return true; +} + +optional Transaction::GetWakeKey(ShardId sid) const { + auto& sd = shard_data_[SidToId(sid)]; + if ((sd.local_mask & AWAKED_Q) == 0) + return nullopt; + + CHECK_NE(sd.wake_key_pos, UINT16_MAX); + return GetShardArgs(sid).at(sd.wake_key_pos); } void Transaction::LogAutoJournalOnShard(EngineShard* shard) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 60094b985..94ce97e52 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -177,10 +177,8 @@ class Transaction { bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the - // blocking queue. NotifySuspended may be called from (multiple) shard threads and - // with each call potentially improving the minimal wake_txid at which - // this transaction has been awaked. - bool NotifySuspended(TxId committed_ts, ShardId sid); + // blocking queue. + bool NotifySuspended(TxId committed_ts, ShardId sid, std::string_view key); // Cancel all blocking watches on shutdown. Set COORD_CANCELLED. void BreakOnShutdown(); @@ -257,10 +255,6 @@ class Transaction { return unique_shard_cnt_; } - TxId GetNotifyTxid() const { - return notify_txid_.load(std::memory_order_relaxed); - } - bool IsMulti() const { return bool(multi_); } @@ -275,6 +269,9 @@ class Transaction { return coordinator_state_ & COORD_OOO; } + // If blocking tx was woken up on this shard, get wake key. + std::optional GetWakeKey(ShardId sid) const; + OpArgs GetOpArgs(EngineShard* shard) const { return OpArgs{shard, this, GetDbContext()}; } @@ -329,7 +326,7 @@ class Transaction { std::atomic_bool is_armed{false}; // We pad with some memory so that atomic loads won't cause false sharing betweem threads. - char pad[48]; // to make sure PerShardData is 64 bytes and takes full cacheline. + char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline. uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; @@ -341,6 +338,9 @@ class Transaction { // Needed to rollback inconsistent schedulings or remove OOO transactions from // tx queue. uint32_t pq_pos = TxQueue::kEnd; + + // Index of key relative to args in shard that the shard was woken up after blocking wait. + uint16_t wake_key_pos = UINT16_MAX; }; static_assert(sizeof(PerShardData) == 64); // cacheline @@ -537,7 +537,7 @@ class Transaction { DbIndex db_index_{0}; uint64_t time_now_ms_{0}; - std::atomic notify_txid_{UINT64_MAX}; + std::atomic wakeup_requested_{0}; // whether tx was woken up std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread.