diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index eb1f234f7..bf470f37f 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -136,7 +136,7 @@ void BlockingController::RunStep(Transaction* completed_t) { awakened_indices_.clear(); } -void BlockingController::AddWatched(Transaction* trans) { +void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId(); auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr); @@ -146,8 +146,7 @@ void BlockingController::AddWatched(Transaction* trans) { DbWatchTable& wt = *dbit->second; - auto args = trans->ShardArgsInShard(owner_->shard_id()); - for (auto key : args) { + for (auto key : keys) { auto [res, inserted] = wt.queue_map.emplace(key, nullptr); if (inserted) { res->second.reset(new WatchQueue); @@ -167,7 +166,7 @@ void BlockingController::AddWatched(Transaction* trans) { } // Runs in O(N) complexity in the worst case. -void BlockingController::RemoveWatched(Transaction* trans) { +void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) { VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId(); auto dbit = watched_dbs_.find(trans->db_index()); @@ -175,11 +174,13 @@ void BlockingController::RemoveWatched(Transaction* trans) { return; DbWatchTable& wt = *dbit->second; - auto args = trans->ShardArgsInShard(owner_->shard_id()); - for (auto key : args) { + for (auto key : keys) { auto watch_it = wt.queue_map.find(key); + + // that can happen in case of duplicate keys or when we do not watch on all the argument keys + // like with BLPOPRPUSH. if (watch_it == wt.queue_map.end()) - continue; // that can happen in case of duplicate keys + continue; WatchQueue& wq = *watch_it->second; for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) { diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index a1dc82483..61ae33eb0 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -38,8 +38,8 @@ class BlockingController { // TODO: consider moving all watched functions to // EngineShard with separate per db map. //! AddWatched adds a transaction to the blocking queue. - void AddWatched(Transaction* me); - void RemoveWatched(Transaction* me); + void AddWatched(ArgSlice watch_keys, Transaction* me); + void RemoveWatched(ArgSlice watch_keys, Transaction* me); // Called from operations that create keys like lpush, rename etc. void AwakeWatched(DbIndex db_index, std::string_view db_key); diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 9e2f87670..b64106ba9 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -66,11 +66,13 @@ void BlockingControllerTest::TearDown() { TEST_F(BlockingControllerTest, Basic) { shard_set->Await(0, [&] { - BlockingController bc(EngineShard::tlocal()); - bc.AddWatched(trans_.get()); + EngineShard* shard = EngineShard::tlocal(); + BlockingController bc(shard); + auto keys = trans_->ShardArgsInShard(shard->shard_id()); + bc.AddWatched(keys, trans_.get()); EXPECT_EQ(1, bc.NumWatched(0)); - bc.RemoveWatched(trans_.get()); + bc.RemoveWatched(keys, trans_.get()); EXPECT_EQ(0, bc.NumWatched(0)); }); } @@ -79,8 +81,10 @@ TEST_F(BlockingControllerTest, Timeout) { time_point tp = steady_clock::now() + chrono::milliseconds(10); trans_->Schedule(); + auto keys = trans_->ShardArgsInShard(0); + auto cb = [&](Transaction* t, EngineShard* shard) { return t->WatchInShard(keys, shard); }; - bool res = trans_->WaitOnWatch(tp); + bool res = trans_->WaitOnWatch(tp, cb); EXPECT_FALSE(res); unsigned num_watched = shard_set->Await( diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1fdd3ea5b..5914c4465 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -502,11 +502,12 @@ size_t EngineShard::UsedMemory() const { return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal(); } -void EngineShard::AddBlocked(Transaction* trans) { +BlockingController* EngineShard::EnsureBlockingController() { if (!blocking_controller_) { blocking_controller_.reset(new BlockingController(this)); } - blocking_controller_->AddWatched(trans); + + return blocking_controller_.get(); } void EngineShard::TEST_EnableHeartbeat() { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 16acb37c3..32075b07c 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -118,8 +118,7 @@ class EngineShard { return tiered_storage_.get(); } - // Adds blocked transaction to the watch-list. - void AddBlocked(Transaction* trans); + BlockingController* EnsureBlockingController(); BlockingController* blocking_controller() { return blocking_controller_.get(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index bd81546a7..a4b28c3a9 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -63,6 +63,7 @@ namespace dfly { using namespace std; using namespace facade; using absl::GetFlag; +using time_point = Transaction::time_point; namespace { @@ -200,12 +201,26 @@ class BPopper { string value_; }; +class BPopPusher { + public: + BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir); + + // Returns WRONG_TYPE, OK. + // If OK is returned then use result() to fetch the value. + OpResult Run(Transaction* t, unsigned msec); + + private: + OpResult RunSingle(Transaction* t, time_point tp); + OpResult RunPair(Transaction* t, time_point tp); + + string_view pop_key_, push_key_; + ListDir popdir_, pushdir_; +}; + BPopper::BPopper(ListDir dir) : dir_(dir) { } OpStatus BPopper::Run(Transaction* t, unsigned msec) { - using time_point = Transaction::time_point; - time_point tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); bool is_multi = t->IsMulti(); @@ -225,8 +240,13 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { } // Block + auto cb = [&](Transaction* t, EngineShard* shard) { + auto keys = t->ShardArgsInShard(shard->shard_id()); + return t->WatchInShard(keys, shard); + }; + ++stats->num_blocked_clients; - bool wait_succeeded = t->WaitOnWatch(tp); + bool wait_succeeded = t->WaitOnWatch(tp, std::move(cb)); --stats->num_blocked_clients; if (!wait_succeeded) @@ -361,6 +381,64 @@ OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool return absl::StrCat(entry.longval); } +BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir) + : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { +} + +OpResult BPopPusher::Run(Transaction* t, unsigned msec) { + time_point tp = + msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); + + t->Schedule(); + + if (t->unique_shard_cnt() == 1) { + return RunSingle(t, tp); + } + + return RunPair(t, tp); +} + +OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { + OpResult op_res; + bool is_multi = t->IsMulti(); + auto cb_move = [&](Transaction* t, EngineShard* shard) { + op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_); + return OpStatus::OK; + }; + t->Execute(cb_move, false); + + if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) { + if (op_res.status() == OpStatus::KEY_NOTFOUND) { + op_res = OpStatus::TIMED_OUT; + } + auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + t->Execute(std::move(cb), true); + return op_res; + } + + auto* stats = ServerState::tl_connection_stats(); + auto wcb = [&](Transaction* t, EngineShard* shard) { + ArgSlice keys{&this->pop_key_, 1}; + return t->WatchInShard(keys, shard); + }; + + // Block + ++stats->num_blocked_clients; + + bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); + --stats->num_blocked_clients; + + if (!wait_succeeded) + return OpStatus::TIMED_OUT; + + t->Execute(cb_move, true); + return op_res; +} + +OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { + return OpStatus::TIMED_OUT; +} + OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, bool skip_notexist, absl::Span vals) { EngineShard* es = op_args.shard; @@ -705,6 +783,56 @@ OpResult OpRange(const OpArgs& op_args, std::string_view key, long st return str_vec; } +OpResult MoveTwoShards(Transaction* trans, string_view src, string_view dest, + ListDir src_dir, ListDir dest_dir) { + DCHECK_EQ(2u, trans->unique_shard_cnt()); + + OpResult find_res[2]; + OpResult result; + + // Transaction is comprised of 2 hops: + // 1 - check for entries existence, their types and if possible - + // read the value we may move from the source list. + // 2. If everything is ok, pop from source and push the peeked value into + // the destination. + // + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + DCHECK_EQ(1u, args.size()); + bool is_dest = args.front() == dest; + find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); + return OpStatus::OK; + }; + + trans->Execute(move(cb), false); + + if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { + auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->Execute(move(cb), true); + result = find_res[0] ? find_res[1] : find_res[0]; + } else { + // Everything is ok, lets proceed with the mutations. + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + bool is_dest = args.front() == dest; + OpArgs op_args = t->GetOpArgs(shard); + + if (is_dest) { + string_view val{find_res[0].value()}; + absl::Span span{&val, 1}; + OpPush(op_args, args.front(), dest_dir, false, span); + } else { + OpPop(op_args, args.front(), src_dir, 1, false); + } + return OpStatus::OK; + }; + trans->Execute(move(cb), true); + result = std::move(find_res[0].value()); + } + + return result; +} + void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, ListDir src_dir, ListDir dest_dir) { OpResult result; @@ -716,50 +844,8 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { - CHECK_EQ(2u, cntx->transaction->unique_shard_cnt()); - - OpResult find_res[2]; - - // Transaction is comprised of 2 hops: - // 1 - check for entries existence, their types and if possible - - // read the value we may move from the source list. - // 2. If everything is ok, pop from source and push the peeked value into - // the destination. - // cntx->transaction->Schedule(); - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - DCHECK_EQ(1u, args.size()); - bool is_dest = args.front() == dest; - find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); - return OpStatus::OK; - }; - - cntx->transaction->Execute(move(cb), false); - - if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { - auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - cntx->transaction->Execute(move(cb), true); - result = find_res[0] ? find_res[1] : find_res[0]; - } else { - // Everything is ok, lets proceed with the mutations. - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - bool is_dest = args.front() == dest; - OpArgs op_args = t->GetOpArgs(shard); - - if (is_dest) { - string_view val{find_res[0].value()}; - absl::Span span{&val, 1}; - OpPush(op_args, args.front(), dest_dir, false, span); - } else { - OpPop(op_args, args.front(), src_dir, 1, false); - } - return OpStatus::OK; - }; - cntx->transaction->Execute(move(cb), true); - result = std::move(find_res[0].value()); - } + result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir); } if (result) { @@ -798,7 +884,22 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError("timeout is negative"); } - return (*cntx)->SendNull(); + BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT); + OpResult op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); + + if (op_res) { + return (*cntx)->SendBulkString(*op_res); + } + + switch (op_res.status()) { + case OpStatus::TIMED_OUT: + return (*cntx)->SendNull(); + break; + + default: + return (*cntx)->SendError(op_res.status()); + break; + } } } // namespace diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index c0116733a..ee5feddef 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -659,4 +659,45 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { f.Join(); } +TEST_F(ListFamilyTest, BRPopLPushSingleShard) { + EXPECT_THAT(Run({"brpoplpush", "x", "y", "0.05"}), ArgType(RespExpr::NIL)); + + EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1)); + EXPECT_EQ(Run({"brpoplpush", "x", "y", "0.01"}), "val1"); + ASSERT_EQ(1, GetDebugInfo().shards_count); + + EXPECT_THAT(Run({ + "exists", + "x", + }), + IntArg(0)); + Run({"set", "x", "str"}); + EXPECT_THAT(Run({"brpoplpush", "y", "x", "0.01"}), ErrArg("wrong kind of value")); + + Run({"del", "x", "y"}); + Run({"multi"}); + Run({"brpoplpush", "y", "x", "0"}); + RespExpr resp = Run({"exec"}); + EXPECT_THAT(resp, ArgType(RespExpr::NIL)); +} + +TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { + RespExpr resp0, resp1; + + // Run the fiber at creation. + auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + resp0 = Run({"brpoplpush", "x", "y", "0"}); + }); + fibers_ext::SleepFor(30us); + pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); }); + + pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); }); + fb0.Join(); + ASSERT_EQ(resp0, "1"); +} + +TEST_F(ListFamilyTest, BRPopLPushTwoShards) { + EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL)); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 291bde53c..77c5b7466 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -406,7 +406,7 @@ bool Transaction::RunInShard(EngineShard* shard) { sd.local_mask &= ~KEYLOCK_ACQUIRED; if (was_suspended || (sd.local_mask & AWAKED_Q)) { - shard->blocking_controller()->RemoveWatched(this); + shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this); } } sd.local_mask &= ~OUT_OF_ORDER; @@ -1011,12 +1011,12 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { return reverse_index_[sd.arg_start + arg_index]; } -bool Transaction::WaitOnWatch(const time_point& tp) { +bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) { // Assumes that transaction is pending and scheduled. TODO: To verify it with state machine. VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")"; using namespace chrono; - Execute([](Transaction* t, EngineShard* shard) { return t->AddToWatchedShardCb(shard); }, true); + Execute(cb, true); coordinator_state_ |= COORD_BLOCKED; @@ -1077,14 +1077,16 @@ bool Transaction::WaitOnWatch(const time_point& tp) { } // Runs only in the shard thread. -OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { +OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) { ShardId idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); DCHECK_EQ(0, sd.local_mask & ARMED); - shard->AddBlocked(this); + auto* bc = shard->EnsureBlockingController(); + bc->AddWatched(keys, this); + sd.local_mask |= SUSPENDED_Q; DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask; @@ -1100,7 +1102,7 @@ void Transaction::ExpireShardCb(EngineShard* shard) { sd.local_mask |= EXPIRED_Q; sd.local_mask &= ~KEYLOCK_ACQUIRED; - shard->blocking_controller()->RemoveWatched(this); + shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this); // Need to see why I decided to call this. // My guess - probably to trigger the run of stalled transactions in case diff --git a/src/server/transaction.h b/src/server/transaction.h index bfdeb2318..60281439c 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -161,7 +161,7 @@ class Transaction { // or b) tp is reached. If tp is time_point::max() then waits indefinitely. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Returns false if timeout occurred, true if was notified by one of the keys. - bool WaitOnWatch(const time_point& tp); + bool WaitOnWatch(const time_point& tp, RunnableType cb); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the // blocking queue. NotifySuspended may be called from (multiple) shard threads and @@ -191,6 +191,9 @@ class Transaction { return db_index_; } + // Adds itself to watched queue in the shard. Must run in that shard thread. + OpStatus WatchInShard(ArgSlice keys, EngineShard* shard); + private: struct LockCnt { unsigned cnt[2] = {0, 0}; @@ -223,9 +226,6 @@ class Transaction { // Returns true if we need to follow up with PollExecution on this shard. bool CancelShardCb(EngineShard* shard); - // Shard callbacks used within Execute calls - OpStatus AddToWatchedShardCb(EngineShard* shard); - void ExpireShardCb(EngineShard* shard); void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard);