From b1e688b33f711c452e591ef3a275411c5b007c70 Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 9 Apr 2024 09:49:33 +0300 Subject: [PATCH] bug(server): set connection flags block/pause flag on all blocking commands (#2816) * bug((server)): set connecttion blocking and puash flags on all blocking commands Signed-off-by: adi_holden --- src/facade/conn_context.h | 5 ++-- src/server/blocking_controller_test.cc | 5 +++- src/server/container_utils.cc | 6 ++--- src/server/container_utils.h | 3 +-- src/server/list_family.cc | 35 +++++++++++++++----------- src/server/stream_family.cc | 3 ++- src/server/transaction.cc | 9 ++++--- src/server/transaction.h | 3 ++- src/server/zset_family.cc | 3 ++- tests/dragonfly/connection_test.py | 19 +++++++++----- 10 files changed, 54 insertions(+), 37 deletions(-) diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 9ba641108..b5bce8b4f 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -97,10 +97,9 @@ class ConnectionContext { bool async_dispatch : 1; // whether this connection is amid an async dispatch bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool journal_emulated : 1; // whether it is used to dispatch journal commands - bool paused : 1; // whether this connection is paused due to CLIENT PAUSE - + bool paused = false; // whether this connection is paused due to CLIENT PAUSE // whether it's blocked on blocking commands like BLPOP, needs to be addressable - bool blocked; + bool blocked = false; // How many async subscription sources are active: monitor and/or pubsub - at most 2. uint8_t subscriptions; diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index b68843759..e9af80b7a 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -93,11 +93,14 @@ TEST_F(BlockingControllerTest, Basic) { TEST_F(BlockingControllerTest, Timeout) { time_point tp = steady_clock::now() + chrono::milliseconds(10); + bool blocked; + bool paused; trans_->Schedule(); auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; - facade::OpStatus status = trans_->WaitOnWatch(tp, cb, [](auto...) { return true; }); + facade::OpStatus status = trans_->WaitOnWatch( + tp, cb, [](auto...) { return true; }, &blocked, &paused); EXPECT_EQ(status, facade::OpStatus::TIMED_OUT); unsigned num_watched = shard_set->Await( diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index ab5caab7a..0af7f1b35 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -258,7 +258,7 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) { OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, BlockingResultCb func, unsigned limit_ms, - bool* block_flag) { + bool* block_flag, bool* pause_flag) { string result_key; // Fast path. If we have only a single shard, we can run opportunistically with a single hop. @@ -317,9 +317,7 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok(); }; - *block_flag = true; - auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker); - *block_flag = false; + auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker, block_flag, pause_flag); if (status != OpStatus::OK) return status; diff --git a/src/server/container_utils.h b/src/server/container_utils.h index 508af5c0b..e149ce95f 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -87,10 +87,9 @@ using BlockingResultCb = // Block until a any key of the transaction becomes non-empty and executes the callback. // If multiple keys are non-empty when this function is called, the callback is executed // immediately with the first key listed in the tx arguments. -// The block flag is set to true while the transaction is blocking. OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, BlockingResultCb cb, unsigned limit_ms, - bool* block_flag); + bool* block_flag, bool* pause_flag); }; // namespace container_utils diff --git a/src/server/list_family.cc b/src/server/list_family.cc index fc06a484b..ca5bec84d 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -164,11 +164,11 @@ class BPopPusher { // Returns WRONG_TYPE, OK. // If OK is returned then use result() to fetch the value. - OpResult Run(Transaction* t, unsigned limit_ms); + OpResult Run(ConnectionContext* cntx, unsigned limit_ms); private: - OpResult RunSingle(Transaction* t, time_point tp); - OpResult RunPair(Transaction* t, time_point tp); + OpResult RunSingle(ConnectionContext* cntx, time_point tp); + OpResult RunPair(ConnectionContext* cntx, time_point tp); string_view pop_key_, push_key_; ListDir popdir_, pushdir_; @@ -765,7 +765,7 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) { } BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT); - OpResult op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); + OpResult op_res = bpop_pusher.Run(cntx, unsigned(timeout * 1000)); auto* rb = static_cast(cntx->reply_builder()); if (op_res) { @@ -808,7 +808,7 @@ void BLMove(CmdArgList args, ConnectionContext* cntx) { } BPopPusher bpop_pusher(src, dest, *src_dir, *dest_dir); - OpResult op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); + OpResult op_res = bpop_pusher.Run(cntx, unsigned(timeout * 1000)); auto* rb = static_cast(cntx->reply_builder()); if (op_res) { @@ -831,20 +831,21 @@ BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { } -OpResult BPopPusher::Run(Transaction* t, unsigned limit_ms) { +OpResult BPopPusher::Run(ConnectionContext* cntx, unsigned limit_ms) { time_point tp = limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max(); - t->Schedule(); + cntx->transaction->Schedule(); - if (t->GetUniqueShardCnt() == 1) { - return RunSingle(t, tp); + if (cntx->transaction->GetUniqueShardCnt() == 1) { + return RunSingle(cntx, tp); } - return RunPair(t, tp); + return RunPair(cntx, tp); } -OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { +OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { + Transaction* t = cntx->transaction; OpResult op_res; bool is_multi = t->IsMulti(); auto cb_move = [&](Transaction* t, EngineShard* shard) { @@ -873,14 +874,16 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); }; // Block - if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK) + auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &(cntx->blocked), &(cntx->paused)); + if (status != OpStatus::OK) return status; t->Execute(cb_move, true); return op_res; } -OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { +OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { + Transaction* t = cntx->transaction; bool is_multi = t->IsMulti(); OpResult op_res = MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, false); @@ -902,7 +905,8 @@ OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); }; - if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK) + if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, &cntx->paused); + status != OpStatus::OK) return status; return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true); @@ -1194,7 +1198,8 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn }; OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( - transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); + transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked, + &cntx->paused); auto* rb = static_cast(cntx->reply_builder()); if (popped_key) { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 9d10fc012..d700fb502 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2833,7 +2833,8 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) { return streamCompareID(&last_id, &sitem.group->last_id) > 0; }; - if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), key_checker); + if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, + &cntx->paused); status != OpStatus::OK) return rb->SendNullArray(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 8ae0e9dc7..2640f3cfd 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1190,7 +1190,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { } OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, - KeyReadyChecker krc) { + KeyReadyChecker krc, bool* block_flag, bool* pause_flag) { DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used // Register keys on active shards blocking controllers and mark shard state as suspended. @@ -1206,16 +1206,19 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); - // TBD set connection blocking state + // Wait for the blocking barrier to be closed. // Note: It might return immediately if another thread already notified us. + *block_flag = true; cv_status status = blocking_barrier_.Wait(tp); + *block_flag = false; DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); --stats->num_blocked_clients; - // TBD set connection pause state + *pause_flag = true; ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands + *pause_flag = false; OpStatus result = OpStatus::OK; if (status == cv_status::timeout) { diff --git a/src/server/transaction.h b/src/server/transaction.h index ab3dd50ad..e03b23734 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -214,7 +214,8 @@ class Transaction { // or b) tp is reached. If tp is time_point::max() then waits indefinitely. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Returns false if timeout occurred, true if was notified by one of the keys. - facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb, KeyReadyChecker krc); + facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb, KeyReadyChecker krc, + bool* block_flag, bool* pause_flag); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the // blocking queue. diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 7d74639c4..a27ddff04 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1318,7 +1318,8 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) { }; OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( - transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); + transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked, + &cntx->paused); auto* rb = static_cast(cntx->reply_builder()); if (popped_key) { diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 59f80dccb..2ecbdfbf6 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -691,6 +691,7 @@ async def test_nested_client_pause(async_client: aioredis.Redis): await p3 +@dfly_args({"proactor_threads": "4"}) async def test_blocking_command_client_pause(async_client: aioredis.Redis): """ 1. Check client pause success when blocking transaction is running @@ -698,14 +699,19 @@ async def test_blocking_command_client_pause(async_client: aioredis.Redis): 3. once puased is finished lpush will run and blpop will pop the pushed value """ - async def blocking_command(): - res = await async_client.execute_command("blpop key 2") - assert res == ["key", "value"] + async def blpop_command(): + res = await async_client.execute_command("blpop dest7 10") + assert res == ["dest7", "value"] + + async def brpoplpush_command(): + res = await async_client.execute_command("brpoplpush src dest7 2") + assert res == "value" async def lpush_command(): - await async_client.execute_command("lpush key value") + await async_client.execute_command("lpush src value") - blocking = asyncio.create_task(blocking_command()) + blpop = asyncio.create_task(blpop_command()) + brpoplpush = asyncio.create_task(brpoplpush_command()) await asyncio.sleep(0.1) res = await async_client.execute_command("client pause 1000") @@ -715,7 +721,8 @@ async def test_blocking_command_client_pause(async_client: aioredis.Redis): assert not lpush.done() await lpush - await blocking + await brpoplpush + await blpop async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis):