From 69519b2c5bef3241d7fce910eb5c41487bc7afe2 Mon Sep 17 00:00:00 2001 From: adiholden Date: Thu, 2 Feb 2023 08:58:06 +0200 Subject: [PATCH] feat(list family): support blocking command for replication (#740) --- src/server/list_family.cc | 31 ++++++++++++++++++++--------- tests/dragonfly/replication_test.py | 8 ++++++-- 2 files changed, 28 insertions(+), 11 deletions(-) diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 6548b02a4..5332b7d54 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -191,7 +191,7 @@ class BPopper { } private: - OpStatus Pop(Transaction* t, EngineShard* shard); + void Pop(Transaction* t, EngineShard* shard); ListDir dir_; @@ -267,13 +267,21 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { VLOG(1) << "Popping an element " << t->DebugId(); ff_result_ = move(result.value()); - auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); }; + auto cb = [this](Transaction* t, EngineShard* shard) { + Pop(t, shard); + OpArgs op_args = t->GetOpArgs(shard); + if (op_args.shard->journal()) { + string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP"; + RecordJournal(op_args, command, ArgSlice{key_}, 1); + } + return OpStatus::OK; + }; t->Execute(std::move(cb), true); return OpStatus::OK; } -OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { +void BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); @@ -289,8 +297,6 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); } } - - return OpStatus::OK; } OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest, @@ -882,8 +888,13 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { OpResult op_res; bool is_multi = t->IsMulti(); auto cb_move = [&](Transaction* t, EngineShard* shard) { - op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_); - t->RenableAutoJournal(); // With single shard run auto journal flow. + OpArgs op_args = t->GetOpArgs(shard); + op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_); + if (op_res) { + if (op_args.shard->journal()) { + RecordJournal(op_args, "RPOPLPUSH", ArgSlice{pop_key_, push_key_}, 1); + } + } return OpStatus::OK; }; t->Execute(cb_move, false); @@ -1341,8 +1352,10 @@ void ListFamily::Register(CommandRegistry* registry) { .SetHandler(RPopLPush) << CI{"BRPOPLPUSH", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, 4, 1, 2, 1} .SetHandler(BRPopLPush) - << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) - << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop) + << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1} + .HFUNC(BLPop) + << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1} + .HFUNC(BRPop) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) << CI{"LPOS", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(LPos) << CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex) diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index f9dc57847..1a742faf1 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -513,8 +513,12 @@ async def test_rewrites(df_local_factory): # Check there is no rewrite for RPOPLPUSH on single shard await check("RPOPLPUSH list list", r"RPOPLPUSH list list") - # Check there is no rewrite for BRPOPLPUSH on single shard - await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0") + # Check BRPOPLPUSH on single shard turns into RPOPLPUSH + await check("BRPOPLPUSH list list 0", r"RPOPLPUSH list list") + # Check BLPOP turns into LPOP + await check("BLPOP list 0", r"LPOP list") + # Check BRPOP turns into RPOP + await check("BRPOP list 0", r"RPOP list") await c_master.lpush("list1s", "v1", "v2", "v3", "v4")