feat(server): Implement brpoplpush for single shard case.

A simple case where both src and dest keys are located in the same shard.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-01-07 17:06:59 +02:00 committed by Roman Gershman
parent fe2a491824
commit 9ca636e49d
9 changed files with 223 additions and 74 deletions

View file

@ -136,7 +136,7 @@ void BlockingController::RunStep(Transaction* completed_t) {
awakened_indices_.clear();
}
void BlockingController::AddWatched(Transaction* trans) {
void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr);
@ -146,8 +146,7 @@ void BlockingController::AddWatched(Transaction* trans) {
DbWatchTable& wt = *dbit->second;
auto args = trans->ShardArgsInShard(owner_->shard_id());
for (auto key : args) {
for (auto key : keys) {
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
if (inserted) {
res->second.reset(new WatchQueue);
@ -167,7 +166,7 @@ void BlockingController::AddWatched(Transaction* trans) {
}
// Runs in O(N) complexity in the worst case.
void BlockingController::RemoveWatched(Transaction* trans) {
void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) {
VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto dbit = watched_dbs_.find(trans->db_index());
@ -175,11 +174,13 @@ void BlockingController::RemoveWatched(Transaction* trans) {
return;
DbWatchTable& wt = *dbit->second;
auto args = trans->ShardArgsInShard(owner_->shard_id());
for (auto key : args) {
for (auto key : keys) {
auto watch_it = wt.queue_map.find(key);
// that can happen in case of duplicate keys or when we do not watch on all the argument keys
// like with BLPOPRPUSH.
if (watch_it == wt.queue_map.end())
continue; // that can happen in case of duplicate keys
continue;
WatchQueue& wq = *watch_it->second;
for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) {

View file

@ -38,8 +38,8 @@ class BlockingController {
// TODO: consider moving all watched functions to
// EngineShard with separate per db map.
//! AddWatched adds a transaction to the blocking queue.
void AddWatched(Transaction* me);
void RemoveWatched(Transaction* me);
void AddWatched(ArgSlice watch_keys, Transaction* me);
void RemoveWatched(ArgSlice watch_keys, Transaction* me);
// Called from operations that create keys like lpush, rename etc.
void AwakeWatched(DbIndex db_index, std::string_view db_key);

View file

@ -66,11 +66,13 @@ void BlockingControllerTest::TearDown() {
TEST_F(BlockingControllerTest, Basic) {
shard_set->Await(0, [&] {
BlockingController bc(EngineShard::tlocal());
bc.AddWatched(trans_.get());
EngineShard* shard = EngineShard::tlocal();
BlockingController bc(shard);
auto keys = trans_->ShardArgsInShard(shard->shard_id());
bc.AddWatched(keys, trans_.get());
EXPECT_EQ(1, bc.NumWatched(0));
bc.RemoveWatched(trans_.get());
bc.RemoveWatched(keys, trans_.get());
EXPECT_EQ(0, bc.NumWatched(0));
});
}
@ -79,8 +81,10 @@ TEST_F(BlockingControllerTest, Timeout) {
time_point tp = steady_clock::now() + chrono::milliseconds(10);
trans_->Schedule();
auto keys = trans_->ShardArgsInShard(0);
auto cb = [&](Transaction* t, EngineShard* shard) { return t->WatchInShard(keys, shard); };
bool res = trans_->WaitOnWatch(tp);
bool res = trans_->WaitOnWatch(tp, cb);
EXPECT_FALSE(res);
unsigned num_watched = shard_set->Await(

View file

@ -502,11 +502,12 @@ size_t EngineShard::UsedMemory() const {
return mi_resource_.used() + zmalloc_used_memory_tl + SmallString::UsedThreadLocal();
}
void EngineShard::AddBlocked(Transaction* trans) {
BlockingController* EngineShard::EnsureBlockingController() {
if (!blocking_controller_) {
blocking_controller_.reset(new BlockingController(this));
}
blocking_controller_->AddWatched(trans);
return blocking_controller_.get();
}
void EngineShard::TEST_EnableHeartbeat() {

View file

@ -118,8 +118,7 @@ class EngineShard {
return tiered_storage_.get();
}
// Adds blocked transaction to the watch-list.
void AddBlocked(Transaction* trans);
BlockingController* EnsureBlockingController();
BlockingController* blocking_controller() {
return blocking_controller_.get();

View file

@ -63,6 +63,7 @@ namespace dfly {
using namespace std;
using namespace facade;
using absl::GetFlag;
using time_point = Transaction::time_point;
namespace {
@ -200,12 +201,26 @@ class BPopper {
string value_;
};
class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
// Returns WRONG_TYPE, OK.
// If OK is returned then use result() to fetch the value.
OpResult<string> Run(Transaction* t, unsigned msec);
private:
OpResult<string> RunSingle(Transaction* t, time_point tp);
OpResult<string> RunPair(Transaction* t, time_point tp);
string_view pop_key_, push_key_;
ListDir popdir_, pushdir_;
};
BPopper::BPopper(ListDir dir) : dir_(dir) {
}
OpStatus BPopper::Run(Transaction* t, unsigned msec) {
using time_point = Transaction::time_point;
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
bool is_multi = t->IsMulti();
@ -225,8 +240,13 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
}
// Block
auto cb = [&](Transaction* t, EngineShard* shard) {
auto keys = t->ShardArgsInShard(shard->shard_id());
return t->WatchInShard(keys, shard);
};
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp);
bool wait_succeeded = t->WaitOnWatch(tp, std::move(cb));
--stats->num_blocked_clients;
if (!wait_succeeded)
@ -361,6 +381,64 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
return absl::StrCat(entry.longval);
}
BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir)
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
}
OpResult<string> BPopPusher::Run(Transaction* t, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
t->Schedule();
if (t->unique_shard_cnt() == 1) {
return RunSingle(t, tp);
}
return RunPair(t, tp);
}
OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
OpResult<string> op_res;
bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) {
op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_);
return OpStatus::OK;
};
t->Execute(cb_move, false);
if (is_multi || op_res.status() != OpStatus::KEY_NOTFOUND) {
if (op_res.status() == OpStatus::KEY_NOTFOUND) {
op_res = OpStatus::TIMED_OUT;
}
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
return op_res;
}
auto* stats = ServerState::tl_connection_stats();
auto wcb = [&](Transaction* t, EngineShard* shard) {
ArgSlice keys{&this->pop_key_, 1};
return t->WatchInShard(keys, shard);
};
// Block
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
t->Execute(cb_move, true);
return op_res;
}
OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
return OpStatus::TIMED_OUT;
}
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals) {
EngineShard* es = op_args.shard;
@ -705,20 +783,12 @@ OpResult<StringVec> OpRange(const OpArgs& op_args, std::string_view key, long st
return str_vec;
}
void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, ListDir src_dir,
ListDir dest_dir) {
OpResult<string> result;
if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
CHECK_EQ(2u, cntx->transaction->unique_shard_cnt());
OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir) {
DCHECK_EQ(2u, trans->unique_shard_cnt());
OpResult<string> find_res[2];
OpResult<string> result;
// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
@ -726,7 +796,6 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
// 2. If everything is ok, pop from source and push the peeked value into
// the destination.
//
cntx->transaction->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
@ -735,11 +804,11 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), false);
trans->Execute(move(cb), false);
if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(move(cb), true);
trans->Execute(move(cb), true);
result = find_res[0] ? find_res[1] : find_res[0];
} else {
// Everything is ok, lets proceed with the mutations.
@ -757,9 +826,26 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
}
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), true);
trans->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
return result;
}
void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, ListDir src_dir,
ListDir dest_dir) {
OpResult<string> result;
if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
cntx->transaction->Schedule();
result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir);
}
if (result) {
@ -798,7 +884,22 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError("timeout is negative");
}
BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT);
OpResult<string> op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000));
if (op_res) {
return (*cntx)->SendBulkString(*op_res);
}
switch (op_res.status()) {
case OpStatus::TIMED_OUT:
return (*cntx)->SendNull();
break;
default:
return (*cntx)->SendError(op_res.status());
break;
}
}
} // namespace

View file

@ -659,4 +659,45 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
f.Join();
}
TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
EXPECT_THAT(Run({"brpoplpush", "x", "y", "0.05"}), ArgType(RespExpr::NIL));
EXPECT_THAT(Run({"lpush", "x", "val1"}), IntArg(1));
EXPECT_EQ(Run({"brpoplpush", "x", "y", "0.01"}), "val1");
ASSERT_EQ(1, GetDebugInfo().shards_count);
EXPECT_THAT(Run({
"exists",
"x",
}),
IntArg(0));
Run({"set", "x", "str"});
EXPECT_THAT(Run({"brpoplpush", "y", "x", "0.01"}), ErrArg("wrong kind of value"));
Run({"del", "x", "y"});
Run({"multi"});
Run({"brpoplpush", "y", "x", "0"});
RespExpr resp = Run({"exec"});
EXPECT_THAT(resp, ArgType(RespExpr::NIL));
}
TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
RespExpr resp0, resp1;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
resp0 = Run({"brpoplpush", "x", "y", "0"});
});
fibers_ext::SleepFor(30us);
pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); });
pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); });
fb0.Join();
ASSERT_EQ(resp0, "1");
}
TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
EXPECT_THAT(Run({"brpoplpush", "x", "z", "0.05"}), ArgType(RespExpr::NIL));
}
} // namespace dfly

View file

@ -406,7 +406,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
sd.local_mask &= ~KEYLOCK_ACQUIRED;
if (was_suspended || (sd.local_mask & AWAKED_Q)) {
shard->blocking_controller()->RemoveWatched(this);
shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
}
}
sd.local_mask &= ~OUT_OF_ORDER;
@ -1011,12 +1011,12 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
return reverse_index_[sd.arg_start + arg_index];
}
bool Transaction::WaitOnWatch(const time_point& tp) {
bool Transaction::WaitOnWatch(const time_point& tp, RunnableType cb) {
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
using namespace chrono;
Execute([](Transaction* t, EngineShard* shard) { return t->AddToWatchedShardCb(shard); }, true);
Execute(cb, true);
coordinator_state_ |= COORD_BLOCKED;
@ -1077,14 +1077,16 @@ bool Transaction::WaitOnWatch(const time_point& tp) {
}
// Runs only in the shard thread.
OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
DCHECK_EQ(0, sd.local_mask & ARMED);
shard->AddBlocked(this);
auto* bc = shard->EnsureBlockingController();
bc->AddWatched(keys, this);
sd.local_mask |= SUSPENDED_Q;
DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask;
@ -1100,7 +1102,7 @@ void Transaction::ExpireShardCb(EngineShard* shard) {
sd.local_mask |= EXPIRED_Q;
sd.local_mask &= ~KEYLOCK_ACQUIRED;
shard->blocking_controller()->RemoveWatched(this);
shard->blocking_controller()->RemoveWatched(ShardArgsInShard(shard->shard_id()), this);
// Need to see why I decided to call this.
// My guess - probably to trigger the run of stalled transactions in case

View file

@ -161,7 +161,7 @@ class Transaction {
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout occurred, true if was notified by one of the keys.
bool WaitOnWatch(const time_point& tp);
bool WaitOnWatch(const time_point& tp, RunnableType cb);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
@ -191,6 +191,9 @@ class Transaction {
return db_index_;
}
// Adds itself to watched queue in the shard. Must run in that shard thread.
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard);
private:
struct LockCnt {
unsigned cnt[2] = {0, 0};
@ -223,9 +226,6 @@ class Transaction {
// Returns true if we need to follow up with PollExecution on this shard.
bool CancelShardCb(EngineShard* shard);
// Shard callbacks used within Execute calls
OpStatus AddToWatchedShardCb(EngineShard* shard);
void ExpireShardCb(EngineShard* shard);
void UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard);