bug(server): set connection flags block/pause flag on all blocking commands (#2816)

* bug((server)): set connecttion blocking and puash flags on all blocking commands

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-04-09 09:49:33 +03:00 committed by GitHub
parent 23106d4be5
commit b1e688b33f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 54 additions and 37 deletions

View file

@ -97,10 +97,9 @@ class ConnectionContext {
bool async_dispatch : 1; // whether this connection is amid an async dispatch bool async_dispatch : 1; // whether this connection is amid an async dispatch
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE bool paused = false; // whether this connection is paused due to CLIENT PAUSE
// whether it's blocked on blocking commands like BLPOP, needs to be addressable // whether it's blocked on blocking commands like BLPOP, needs to be addressable
bool blocked; bool blocked = false;
// How many async subscription sources are active: monitor and/or pubsub - at most 2. // How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions; uint8_t subscriptions;

View file

@ -93,11 +93,14 @@ TEST_F(BlockingControllerTest, Basic) {
TEST_F(BlockingControllerTest, Timeout) { TEST_F(BlockingControllerTest, Timeout) {
time_point tp = steady_clock::now() + chrono::milliseconds(10); time_point tp = steady_clock::now() + chrono::milliseconds(10);
bool blocked;
bool paused;
trans_->Schedule(); trans_->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); };
facade::OpStatus status = trans_->WaitOnWatch(tp, cb, [](auto...) { return true; }); facade::OpStatus status = trans_->WaitOnWatch(
tp, cb, [](auto...) { return true; }, &blocked, &paused);
EXPECT_EQ(status, facade::OpStatus::TIMED_OUT); EXPECT_EQ(status, facade::OpStatus::TIMED_OUT);
unsigned num_watched = shard_set->Await( unsigned num_watched = shard_set->Await(

View file

@ -258,7 +258,7 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb func, unsigned limit_ms, BlockingResultCb func, unsigned limit_ms,
bool* block_flag) { bool* block_flag, bool* pause_flag) {
string result_key; string result_key;
// Fast path. If we have only a single shard, we can run opportunistically with a single hop. // Fast path. If we have only a single shard, we can run opportunistically with a single hop.
@ -317,9 +317,7 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok(); return owner->db_slice().FindReadOnly(context, key, req_obj_type).ok();
}; };
*block_flag = true; auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker, block_flag, pause_flag);
auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker);
*block_flag = false;
if (status != OpStatus::OK) if (status != OpStatus::OK)
return status; return status;

View file

@ -87,10 +87,9 @@ using BlockingResultCb =
// Block until a any key of the transaction becomes non-empty and executes the callback. // Block until a any key of the transaction becomes non-empty and executes the callback.
// If multiple keys are non-empty when this function is called, the callback is executed // If multiple keys are non-empty when this function is called, the callback is executed
// immediately with the first key listed in the tx arguments. // immediately with the first key listed in the tx arguments.
// The block flag is set to true while the transaction is blocking.
OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb cb, unsigned limit_ms, BlockingResultCb cb, unsigned limit_ms,
bool* block_flag); bool* block_flag, bool* pause_flag);
}; // namespace container_utils }; // namespace container_utils

View file

@ -164,11 +164,11 @@ class BPopPusher {
// Returns WRONG_TYPE, OK. // Returns WRONG_TYPE, OK.
// If OK is returned then use result() to fetch the value. // If OK is returned then use result() to fetch the value.
OpResult<string> Run(Transaction* t, unsigned limit_ms); OpResult<string> Run(ConnectionContext* cntx, unsigned limit_ms);
private: private:
OpResult<string> RunSingle(Transaction* t, time_point tp); OpResult<string> RunSingle(ConnectionContext* cntx, time_point tp);
OpResult<string> RunPair(Transaction* t, time_point tp); OpResult<string> RunPair(ConnectionContext* cntx, time_point tp);
string_view pop_key_, push_key_; string_view pop_key_, push_key_;
ListDir popdir_, pushdir_; ListDir popdir_, pushdir_;
@ -765,7 +765,7 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) {
} }
BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT); BPopPusher bpop_pusher(src, dest, ListDir::RIGHT, ListDir::LEFT);
OpResult<string> op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); OpResult<string> op_res = bpop_pusher.Run(cntx, unsigned(timeout * 1000));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (op_res) { if (op_res) {
@ -808,7 +808,7 @@ void BLMove(CmdArgList args, ConnectionContext* cntx) {
} }
BPopPusher bpop_pusher(src, dest, *src_dir, *dest_dir); BPopPusher bpop_pusher(src, dest, *src_dir, *dest_dir);
OpResult<string> op_res = bpop_pusher.Run(cntx->transaction, unsigned(timeout * 1000)); OpResult<string> op_res = bpop_pusher.Run(cntx, unsigned(timeout * 1000));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (op_res) { if (op_res) {
@ -831,20 +831,21 @@ BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) { : pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
} }
OpResult<string> BPopPusher::Run(Transaction* t, unsigned limit_ms) { OpResult<string> BPopPusher::Run(ConnectionContext* cntx, unsigned limit_ms) {
time_point tp = time_point tp =
limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max(); limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max();
t->Schedule(); cntx->transaction->Schedule();
if (t->GetUniqueShardCnt() == 1) { if (cntx->transaction->GetUniqueShardCnt() == 1) {
return RunSingle(t, tp); return RunSingle(cntx, tp);
} }
return RunPair(t, tp); return RunPair(cntx, tp);
} }
OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) { OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
Transaction* t = cntx->transaction;
OpResult<string> op_res; OpResult<string> op_res;
bool is_multi = t->IsMulti(); bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) { auto cb_move = [&](Transaction* t, EngineShard* shard) {
@ -873,14 +874,16 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok();
}; };
// Block // Block
if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK) auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &(cntx->blocked), &(cntx->paused));
if (status != OpStatus::OK)
return status; return status;
t->Execute(cb_move, true); t->Execute(cb_move, true);
return op_res; return op_res;
} }
OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) { OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
Transaction* t = cntx->transaction;
bool is_multi = t->IsMulti(); bool is_multi = t->IsMulti();
OpResult<string> op_res = MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, false); OpResult<string> op_res = MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, false);
@ -902,7 +905,8 @@ OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok(); return owner->db_slice().FindReadOnly(context, key, OBJ_LIST).ok();
}; };
if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker); status != OpStatus::OK) if (auto status = t->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, &cntx->paused);
status != OpStatus::OK)
return status; return status;
return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true); return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true);
@ -1194,7 +1198,8 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
}; };
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked,
&cntx->paused);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (popped_key) { if (popped_key) {

View file

@ -2833,7 +2833,8 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) {
return streamCompareID(&last_id, &sitem.group->last_id) > 0; return streamCompareID(&last_id, &sitem.group->last_id) > 0;
}; };
if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), key_checker); if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked,
&cntx->paused);
status != OpStatus::OK) status != OpStatus::OK)
return rb->SendNullArray(); return rb->SendNullArray();

View file

@ -1190,7 +1190,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
} }
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
KeyReadyChecker krc) { KeyReadyChecker krc, bool* block_flag, bool* pause_flag) {
DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used DCHECK(!blocking_barrier_.IsClaimed()); // Blocking barrier can't be re-used
// Register keys on active shards blocking controllers and mark shard state as suspended. // Register keys on active shards blocking controllers and mark shard state as suspended.
@ -1206,16 +1206,19 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
auto* stats = ServerState::tl_connection_stats(); auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients; ++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();
// TBD set connection blocking state
// Wait for the blocking barrier to be closed. // Wait for the blocking barrier to be closed.
// Note: It might return immediately if another thread already notified us. // Note: It might return immediately if another thread already notified us.
*block_flag = true;
cv_status status = blocking_barrier_.Wait(tp); cv_status status = blocking_barrier_.Wait(tp);
*block_flag = false;
DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
--stats->num_blocked_clients; --stats->num_blocked_clients;
// TBD set connection pause state *pause_flag = true;
ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands
*pause_flag = false;
OpStatus result = OpStatus::OK; OpStatus result = OpStatus::OK;
if (status == cv_status::timeout) { if (status == cv_status::timeout) {

View file

@ -214,7 +214,8 @@ class Transaction {
// or b) tp is reached. If tp is time_point::max() then waits indefinitely. // or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout occurred, true if was notified by one of the keys. // Returns false if timeout occurred, true if was notified by one of the keys.
facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb, KeyReadyChecker krc); facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb, KeyReadyChecker krc,
bool* block_flag, bool* pause_flag);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the // Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. // blocking queue.

View file

@ -1318,7 +1318,8 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
}; };
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked,
&cntx->paused);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (popped_key) { if (popped_key) {

View file

@ -691,6 +691,7 @@ async def test_nested_client_pause(async_client: aioredis.Redis):
await p3 await p3
@dfly_args({"proactor_threads": "4"})
async def test_blocking_command_client_pause(async_client: aioredis.Redis): async def test_blocking_command_client_pause(async_client: aioredis.Redis):
""" """
1. Check client pause success when blocking transaction is running 1. Check client pause success when blocking transaction is running
@ -698,14 +699,19 @@ async def test_blocking_command_client_pause(async_client: aioredis.Redis):
3. once puased is finished lpush will run and blpop will pop the pushed value 3. once puased is finished lpush will run and blpop will pop the pushed value
""" """
async def blocking_command(): async def blpop_command():
res = await async_client.execute_command("blpop key 2") res = await async_client.execute_command("blpop dest7 10")
assert res == ["key", "value"] assert res == ["dest7", "value"]
async def brpoplpush_command():
res = await async_client.execute_command("brpoplpush src dest7 2")
assert res == "value"
async def lpush_command(): async def lpush_command():
await async_client.execute_command("lpush key value") await async_client.execute_command("lpush src value")
blocking = asyncio.create_task(blocking_command()) blpop = asyncio.create_task(blpop_command())
brpoplpush = asyncio.create_task(brpoplpush_command())
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
res = await async_client.execute_command("client pause 1000") res = await async_client.execute_command("client pause 1000")
@ -715,7 +721,8 @@ async def test_blocking_command_client_pause(async_client: aioredis.Redis):
assert not lpush.done() assert not lpush.done()
await lpush await lpush
await blocking await brpoplpush
await blpop
async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis): async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis):