mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix(server): client pause work while blocking commands run (#2584)
fix #2576 fix #2661 Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
8ef92629c5
commit
7e4527098b
7 changed files with 83 additions and 9 deletions
|
@ -392,6 +392,7 @@ DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
|
|||
issuer_{issuer},
|
||||
ignore_paused_{ignore_paused},
|
||||
ignore_blocked_{ignore_blocked} {
|
||||
bc_ = make_unique<util::fb2::BlockingCounter>(0);
|
||||
}
|
||||
|
||||
void DispatchTracker::TrackOnThread() {
|
||||
|
@ -400,7 +401,15 @@ void DispatchTracker::TrackOnThread() {
|
|||
}
|
||||
|
||||
bool DispatchTracker::Wait(absl::Duration duration) {
|
||||
return bc_.WaitFor(absl::ToChronoMilliseconds(duration));
|
||||
bool res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
|
||||
if (!res && ignore_blocked_) {
|
||||
// We track all connections again because a connection might became blocked between the time
|
||||
// we call tracking the last time.
|
||||
bc_.reset(new util::fb2::BlockingCounter(0));
|
||||
TrackAll();
|
||||
res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void DispatchTracker::TrackAll() {
|
||||
|
@ -410,7 +419,7 @@ void DispatchTracker::TrackAll() {
|
|||
|
||||
void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
|
||||
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
|
||||
fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_);
|
||||
fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_);
|
||||
}
|
||||
|
||||
} // namespace facade
|
||||
|
|
|
@ -103,7 +103,7 @@ class DispatchTracker {
|
|||
|
||||
std::vector<facade::Listener*> listeners_;
|
||||
facade::Connection* issuer_;
|
||||
util::fb2::BlockingCounter bc_{0};
|
||||
std::unique_ptr<util::fb2::BlockingCounter> bc_;
|
||||
bool ignore_paused_;
|
||||
bool ignore_blocked_;
|
||||
};
|
||||
|
|
|
@ -107,6 +107,10 @@ class CommandId : public facade::CommandId {
|
|||
return opt_mask_ & CO::WRITE;
|
||||
}
|
||||
|
||||
bool IsBlocking() const {
|
||||
return opt_mask_ & CO::BLOCKING;
|
||||
}
|
||||
|
||||
static const char* OptName(CO::CommandOpt fl);
|
||||
|
||||
CommandId&& SetHandler(Handler f) && {
|
||||
|
|
|
@ -1327,7 +1327,9 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
|
|||
// paired with shardlocal eval
|
||||
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));
|
||||
|
||||
if (!is_multi && !is_eval && cid != nullptr) {
|
||||
const bool is_blocking = cid != nullptr && cid->IsBlocking();
|
||||
|
||||
if (!is_multi && !is_eval && !is_blocking && cid != nullptr) {
|
||||
stored_cmds.reserve(args_list.size());
|
||||
stored_cmds.emplace_back(cid, tail_args);
|
||||
continue;
|
||||
|
|
|
@ -559,9 +559,13 @@ string_view GetRedisMode() {
|
|||
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
|
||||
facade::Connection* conn, ClientPause pause_state,
|
||||
std::function<bool()> is_pause_in_progress) {
|
||||
// Set global pause state and track commands that are running when the pause state is flipped.
|
||||
// Exlude already paused commands from the busy count.
|
||||
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */};
|
||||
// Track connections and set pause state to be able to wait untill all running transactions read
|
||||
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
|
||||
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
|
||||
// state after waking from blocking so if the trasaction was waken by another running
|
||||
// command that did not pause on the new state yet we will pause after waking up.
|
||||
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */,
|
||||
true /*ignore blocking*/};
|
||||
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
|
||||
// Commands don't suspend before checking the pause state, so
|
||||
// it's impossible to deadlock on waiting for a command that will be paused.
|
||||
|
@ -569,7 +573,6 @@ std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
|
|||
ServerState::tlocal()->SetPauseState(pause_state, true);
|
||||
});
|
||||
|
||||
// TODO handle blocking commands
|
||||
// Wait for all busy commands to finish running before replying to guarantee
|
||||
// that no more (write) operations will occur.
|
||||
const absl::Duration kDispatchTimeout = absl::Seconds(1);
|
||||
|
|
|
@ -1349,7 +1349,7 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
|
|||
auto* stats = ServerState::tl_connection_stats();
|
||||
++stats->num_blocked_clients;
|
||||
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();
|
||||
|
||||
// TBD set connection blocking state
|
||||
// Wait for the blocking barrier to be closed.
|
||||
// Note: It might return immediately if another thread already notified us.
|
||||
cv_status status = blocking_barrier_.Wait(tp);
|
||||
|
@ -1357,6 +1357,9 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
|
|||
DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
|
||||
--stats->num_blocked_clients;
|
||||
|
||||
// TBD set connection pause state
|
||||
ServerState::tlocal()->AwaitPauseState(true); // blocking are always write commands
|
||||
|
||||
OpStatus result = OpStatus::OK;
|
||||
if (status == cv_status::timeout) {
|
||||
result = OpStatus::TIMED_OUT;
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import random
|
||||
import string
|
||||
import pytest
|
||||
import asyncio
|
||||
import time
|
||||
|
@ -721,3 +722,55 @@ async def test_nested_client_pause(async_client: aioredis.Redis):
|
|||
await asyncio.sleep(0.0)
|
||||
assert p3.done()
|
||||
await p3
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_blocking_command_client_pause(async_client: aioredis.Redis):
|
||||
"""
|
||||
1. Check client pause success when blocking transaction is running
|
||||
2. lpush is paused after running client puase
|
||||
3. once puased is finished lpush will run and blpop will pop the pushed value
|
||||
"""
|
||||
|
||||
async def blocking_command():
|
||||
res = await async_client.execute_command("blpop key 2")
|
||||
assert res == ["key", "value"]
|
||||
|
||||
async def lpush_command():
|
||||
await async_client.execute_command("lpush key value")
|
||||
|
||||
blocking = asyncio.create_task(blocking_command())
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
res = await async_client.execute_command("client pause 1000")
|
||||
assert res == "OK"
|
||||
|
||||
lpush = asyncio.create_task(lpush_command())
|
||||
assert not lpush.done()
|
||||
|
||||
await lpush
|
||||
await blocking
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_multiple_blocking_commands_client_pause(async_client: aioredis.Redis):
|
||||
"""
|
||||
Check running client pause command simultaneously with running multiple blocking command
|
||||
from multiple connections
|
||||
"""
|
||||
|
||||
async def just_blpop():
|
||||
key = "".join(random.choices(string.ascii_letters, k=3))
|
||||
await async_client.execute_command(f"blpop {key} 2")
|
||||
|
||||
async def client_pause():
|
||||
res = await async_client.execute_command("client pause 1000")
|
||||
assert res == "OK"
|
||||
|
||||
tasks = [just_blpop() for _ in range(20)]
|
||||
tasks.append(client_pause())
|
||||
|
||||
all = asyncio.gather(*tasks)
|
||||
|
||||
assert not all.done()
|
||||
await all
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue