mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: connection fixes (#2192)
* chore: add more states to client connections * fix: clear pipelined messages before close * fix: skip same thread on backpressure --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io> Co-authored-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
79dbd5ff91
commit
d21f82a5f9
7 changed files with 146 additions and 46 deletions
|
@ -43,8 +43,8 @@ ABSL_FLAG(string, admin_bind, "",
|
|||
ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
|
||||
"Amount of memory to use for request cache in bytes - per IO thread.");
|
||||
|
||||
ABSL_FLAG(uint64_t, pipeline_queue_limit, 1ULL << 27, // 128MB
|
||||
"Amount of memory to use for storing pipelined commands in bytes - per IO thread");
|
||||
ABSL_FLAG(uint64_t, subscriber_thread_limit, 1ULL << 27, // 128MB
|
||||
"Amount of memory to use for storing pub commands in bytes - per IO thread");
|
||||
|
||||
ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
|
||||
|
||||
|
@ -108,7 +108,8 @@ thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_poo
|
|||
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;
|
||||
|
||||
void Connection::QueueBackpressure::EnsureBelowLimit() {
|
||||
ec.await([this] { return bytes.load(memory_order_relaxed) <= limit; });
|
||||
ec.await(
|
||||
[this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; });
|
||||
}
|
||||
|
||||
struct Connection::Shutdown {
|
||||
|
@ -203,8 +204,10 @@ size_t Connection::MessageHandle::UsedMemory() const {
|
|||
size_t operator()(const MonitorMessage& msg) {
|
||||
return msg.capacity();
|
||||
}
|
||||
size_t operator()(const AclUpdateMessage& msg) {
|
||||
return 0;
|
||||
size_t operator()(const AclUpdateMessagePtr& msg) {
|
||||
return sizeof(AclUpdateMessage) + msg->username.capacity() * sizeof(string) +
|
||||
msg->commands.capacity() * sizeof(vector<int>) +
|
||||
msg->categories.capacity() * sizeof(uint32_t);
|
||||
}
|
||||
size_t operator()(const MigrationRequestMessage& msg) {
|
||||
return 0;
|
||||
|
@ -218,12 +221,16 @@ size_t Connection::MessageHandle::UsedMemory() const {
|
|||
}
|
||||
|
||||
bool Connection::MessageHandle::IsIntrusive() const {
|
||||
return holds_alternative<AclUpdateMessage>(handle) ||
|
||||
return holds_alternative<AclUpdateMessagePtr>(handle) ||
|
||||
holds_alternative<CheckpointMessage>(handle);
|
||||
}
|
||||
|
||||
bool Connection::MessageHandle::IsPipelineMsg() const {
|
||||
return get_if<PipelineMessagePtr>(&this->handle) != nullptr;
|
||||
return holds_alternative<PipelineMessagePtr>(handle);
|
||||
}
|
||||
|
||||
bool Connection::MessageHandle::IsPubMsg() const {
|
||||
return holds_alternative<PubMessagePtr>(handle);
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const MonitorMessage& msg) {
|
||||
|
@ -300,8 +307,8 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
|
|||
id_ = next_id.fetch_add(1, memory_order_relaxed);
|
||||
|
||||
queue_backpressure_ = &tl_queue_backpressure_;
|
||||
if (queue_backpressure_->limit == 0) {
|
||||
queue_backpressure_->limit = absl::GetFlag(FLAGS_pipeline_queue_limit);
|
||||
if (queue_backpressure_->subscriber_thread_limit == 0) {
|
||||
queue_backpressure_->subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit);
|
||||
queue_backpressure_->pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
|
||||
}
|
||||
|
||||
|
@ -455,8 +462,10 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
|
|||
int my_cpu_id = sched_getcpu();
|
||||
#endif
|
||||
|
||||
static constexpr string_view PHASE_NAMES[] = {"setup", "readsock", "process"};
|
||||
static_assert(PHASE_NAMES[PROCESS] == "process");
|
||||
static constexpr string_view PHASE_NAMES[] = {"setup", "readsock", "process", "shutting_down",
|
||||
"preclose"};
|
||||
static_assert(NUM_PHASES == ABSL_ARRAYSIZE(PHASE_NAMES));
|
||||
static_assert(PHASE_NAMES[SHUTTING_DOWN] == "shutting_down");
|
||||
|
||||
absl::StrAppend(&before, "id=", id_, " addr=", re, " laddr=", le);
|
||||
absl::StrAppend(&before, " fd=", socket_->native_handle(), " name=", name_);
|
||||
|
@ -591,10 +600,15 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
// After the client disconnected.
|
||||
cc_->conn_closing = true; // Signal dispatch to close.
|
||||
evc_.notify();
|
||||
phase_ = SHUTTING_DOWN;
|
||||
|
||||
VLOG(1) << "Before dispatch_fb.join()";
|
||||
dispatch_fb_.JoinIfNeeded();
|
||||
VLOG(1) << "After dispatch_fb.join()";
|
||||
|
||||
phase_ = PRECLOSE;
|
||||
|
||||
ClearPipelinedMessages();
|
||||
DCHECK(dispatch_q_.empty());
|
||||
|
||||
service_->OnClose(cc_.get());
|
||||
|
@ -820,6 +834,8 @@ void Connection::HandleMigrateRequest() {
|
|||
queue_backpressure_ = &tl_queue_backpressure_;
|
||||
}
|
||||
|
||||
DCHECK(dispatch_q_.empty());
|
||||
|
||||
// In case we Yield()ed in Migrate() above, dispatch_fb_ might have been started.
|
||||
LaunchDispatchFiberIfNeeded();
|
||||
}
|
||||
|
@ -975,6 +991,22 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
|
|||
RecycleMessage(MessageHandle{std::move(msg)});
|
||||
}
|
||||
|
||||
void Connection::ClearPipelinedMessages() {
|
||||
DispatchOperations dispatch_op{cc_->reply_builder(), this};
|
||||
|
||||
// Recycle messages even from disconnecting client to keep properly track of memory stats
|
||||
// As well as to avoid pubsub backpressure leakege.
|
||||
for (auto& msg : dispatch_q_) {
|
||||
FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
|
||||
if (msg.IsIntrusive())
|
||||
visit(dispatch_op, msg.handle); // to not miss checkpoints
|
||||
RecycleMessage(std::move(msg));
|
||||
}
|
||||
|
||||
dispatch_q_.clear();
|
||||
queue_backpressure_->ec.notifyAll();
|
||||
}
|
||||
|
||||
// DispatchFiber handles commands coming from the InputLoop.
|
||||
// Thus, InputLoop can quickly read data from the input buffer, parse it and push
|
||||
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
|
||||
|
@ -1025,8 +1057,8 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
|
||||
if (ShouldEndDispatchFiber(msg)) {
|
||||
RecycleMessage(std::move(msg));
|
||||
DCHECK(dispatch_q_.empty());
|
||||
return;
|
||||
CHECK(dispatch_q_.empty());
|
||||
return; // don't set conn closing flag
|
||||
}
|
||||
|
||||
cc_->async_dispatch = true;
|
||||
|
@ -1041,15 +1073,6 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
|
||||
DCHECK(cc_->conn_closing || builder->GetError());
|
||||
cc_->conn_closing = true;
|
||||
|
||||
// Recycle messages even from disconnecting client to keep properly track of memory stats
|
||||
for (auto& msg : dispatch_q_) {
|
||||
FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
|
||||
if (msg.IsIntrusive())
|
||||
visit(dispatch_op, msg.handle); // to not miss checkpoints
|
||||
RecycleMessage(std::move(msg));
|
||||
}
|
||||
dispatch_q_.clear();
|
||||
}
|
||||
|
||||
Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
|
||||
|
@ -1127,6 +1150,7 @@ void Connection::ShutdownThreadLocal() {
|
|||
bool Connection::IsCurrentlyDispatching() const {
|
||||
if (!cc_)
|
||||
return false;
|
||||
|
||||
return cc_->async_dispatch || cc_->sync_dispatch;
|
||||
}
|
||||
|
||||
|
@ -1140,7 +1164,7 @@ void Connection::SendMonitorMessageAsync(string msg) {
|
|||
}
|
||||
|
||||
void Connection::SendAclUpdateAsync(AclUpdateMessage msg) {
|
||||
SendAsync({std::move(msg)});
|
||||
SendAsync({make_unique<AclUpdateMessage>(std::move(msg))});
|
||||
}
|
||||
|
||||
void Connection::SendCheckpoint(fb2::BlockingCounter bc) {
|
||||
|
@ -1163,18 +1187,26 @@ void Connection::SendAsync(MessageHandle msg) {
|
|||
DCHECK(owner());
|
||||
DCHECK_EQ(ProactorBase::me(), socket_->proactor());
|
||||
|
||||
// We still deliver control messages even to closing connections, as messages
|
||||
// like checkpoints always expect to be handled.
|
||||
// "Closing" connections might be still processing commands, as we don't interrupt them.
|
||||
// So we still want to deliver control messages to them (like checkpoints).
|
||||
if (cc_->conn_closing && !msg.IsIntrusive())
|
||||
return;
|
||||
|
||||
LaunchDispatchFiberIfNeeded();
|
||||
// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.
|
||||
if (!cc_->conn_closing)
|
||||
LaunchDispatchFiberIfNeeded();
|
||||
|
||||
DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point
|
||||
|
||||
size_t used_mem = msg.UsedMemory();
|
||||
queue_backpressure_->bytes.fetch_add(used_mem, memory_order_relaxed);
|
||||
stats_->dispatch_queue_entries++;
|
||||
stats_->dispatch_queue_bytes += used_mem;
|
||||
|
||||
if (msg.IsPubMsg()) {
|
||||
queue_backpressure_->subscriber_bytes.fetch_add(used_mem, memory_order_relaxed);
|
||||
stats_->dispatch_queue_subscriber_bytes += used_mem;
|
||||
}
|
||||
|
||||
if (msg.IsIntrusive()) {
|
||||
auto it = dispatch_q_.begin();
|
||||
while (it < dispatch_q_.end() && it->IsIntrusive())
|
||||
|
@ -1192,11 +1224,15 @@ void Connection::SendAsync(MessageHandle msg) {
|
|||
|
||||
void Connection::RecycleMessage(MessageHandle msg) {
|
||||
size_t used_mem = msg.UsedMemory();
|
||||
queue_backpressure_->bytes.fetch_sub(used_mem, memory_order_relaxed);
|
||||
|
||||
stats_->dispatch_queue_bytes -= used_mem;
|
||||
stats_->dispatch_queue_entries--;
|
||||
|
||||
if (msg.IsPubMsg()) {
|
||||
queue_backpressure_->subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed);
|
||||
stats_->dispatch_queue_subscriber_bytes -= used_mem;
|
||||
}
|
||||
|
||||
// Retain pipeline message in pool.
|
||||
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
|
||||
dispatch_q_cmds_count_--;
|
||||
|
|
|
@ -120,6 +120,7 @@ class Connection : public util::Connection {
|
|||
// Requests are allocated on the mimalloc heap and thus require a custom deleter.
|
||||
using PipelineMessagePtr = std::unique_ptr<PipelineMessage, MessageDeleter>;
|
||||
using PubMessagePtr = std::unique_ptr<PubMessage, MessageDeleter>;
|
||||
using AclUpdateMessagePtr = std::unique_ptr<AclUpdateMessage>;
|
||||
|
||||
// Variant wrapper around different message types
|
||||
struct MessageHandle {
|
||||
|
@ -130,13 +131,17 @@ class Connection : public util::Connection {
|
|||
bool IsIntrusive() const;
|
||||
|
||||
bool IsPipelineMsg() const;
|
||||
bool IsPubMsg() const;
|
||||
|
||||
std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, AclUpdateMessage,
|
||||
std::variant<MonitorMessage, PubMessagePtr, PipelineMessagePtr, AclUpdateMessagePtr,
|
||||
MigrationRequestMessage, CheckpointMessage>
|
||||
handle;
|
||||
};
|
||||
|
||||
enum Phase { SETUP, READ_SOCKET, PROCESS, NUM_PHASES };
|
||||
static_assert(sizeof(MessageHandle) <= 40,
|
||||
"Big structs should use indirection to avoid wasting deque space!");
|
||||
|
||||
enum Phase { SETUP, READ_SOCKET, PROCESS, SHUTTING_DOWN, PRECLOSE, NUM_PHASES };
|
||||
|
||||
public:
|
||||
// Add PubMessage to dispatch queue.
|
||||
|
@ -153,8 +158,8 @@ class Connection : public util::Connection {
|
|||
// decrement it once finished.
|
||||
void SendCheckpoint(util::fb2::BlockingCounter bc);
|
||||
|
||||
// Must be called before SendAsync to ensure the connection dispatch queue is not overfilled.
|
||||
// Blocks until free space is available.
|
||||
// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
|
||||
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
|
||||
void EnsureAsyncMemoryBudget();
|
||||
|
||||
// Register hook that is executed on connection shutdown.
|
||||
|
@ -226,16 +231,17 @@ class Connection : public util::Connection {
|
|||
struct Shutdown;
|
||||
|
||||
// Keeps track of total per-thread sizes of dispatch queues to
|
||||
// limit memory taken up by pipelined / pubsub commands and slow down clients
|
||||
// limit memory taken up by messages from PUBLISH commands and slow down clients
|
||||
// producing them to quickly via EnsureAsyncMemoryBudget.
|
||||
struct QueueBackpressure {
|
||||
// Block until memory usage is below limit, can be called from any thread
|
||||
void EnsureBelowLimit();
|
||||
|
||||
dfly::EventCount ec;
|
||||
std::atomic_size_t bytes = 0;
|
||||
size_t limit = 0;
|
||||
size_t pipeline_cache_limit = 0;
|
||||
std::atomic_size_t subscriber_bytes = 0;
|
||||
|
||||
size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit
|
||||
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
|
||||
};
|
||||
|
||||
private:
|
||||
|
@ -285,6 +291,9 @@ class Connection : public util::Connection {
|
|||
// Squashes pipelined commands from the dispatch queue to spread load over all threads
|
||||
void SquashPipeline(facade::SinkReplyBuilder*);
|
||||
|
||||
// Clear pipelined messages, disaptching only intrusive ones.
|
||||
void ClearPipelinedMessages();
|
||||
|
||||
private:
|
||||
std::pair<std::string, std::string> GetClientInfoBeforeAfterTid() const;
|
||||
std::deque<MessageHandle> dispatch_q_; // dispatch queue
|
||||
|
|
|
@ -27,11 +27,12 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
|
|||
|
||||
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
|
||||
// To break this code deliberately if we add/remove a field to this struct.
|
||||
static_assert(kSizeConnStats == 136u);
|
||||
static_assert(kSizeConnStats == 144u);
|
||||
|
||||
ADD(read_buf_capacity);
|
||||
ADD(dispatch_queue_entries);
|
||||
ADD(dispatch_queue_bytes);
|
||||
ADD(dispatch_queue_subscriber_bytes);
|
||||
ADD(pipeline_cmd_cache_bytes);
|
||||
ADD(io_read_cnt);
|
||||
ADD(io_read_bytes);
|
||||
|
|
|
@ -39,9 +39,10 @@ struct CmdArgListFormatter {
|
|||
struct ConnectionStats {
|
||||
absl::flat_hash_map<std::string, uint64_t> err_count_map;
|
||||
|
||||
size_t read_buf_capacity = 0; // total capacity of input buffers
|
||||
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
|
||||
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
|
||||
size_t read_buf_capacity = 0; // total capacity of input buffers
|
||||
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
|
||||
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
|
||||
size_t dispatch_queue_subscriber_bytes = 0; // total size of all publish messages
|
||||
|
||||
size_t pipeline_cmd_cache_bytes = 0;
|
||||
|
||||
|
|
|
@ -2076,14 +2076,21 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
|
|||
int num_published = subscribers.size();
|
||||
|
||||
if (!subscribers.empty()) {
|
||||
// Make sure neither of the subscribers buffers is filled up.
|
||||
// Make sure neither of the threads limits is reached.
|
||||
// This check actually doesn't reserve any memory ahead and doesn't prevent the buffer
|
||||
// from eventually filling up, especially if multiple clients are unblocked simultaneously
|
||||
// from eventually filling up, especially if multiple clients are unblocked simultaneously,
|
||||
// but is generally good enough to limit too fast producers.
|
||||
// Most importantly, this approach allows not blocking and not awaiting in the dispatch below,
|
||||
// thus not adding any overhead to backpressure checks.
|
||||
for (auto& sub : subscribers)
|
||||
optional<uint32_t> last_thread;
|
||||
for (auto& sub : subscribers) {
|
||||
DCHECK_LE(last_thread.value_or(0), sub.thread_id);
|
||||
if (last_thread && *last_thread == sub.thread_id) // skip same thread
|
||||
continue;
|
||||
|
||||
sub.conn_cntx->conn()->EnsureAsyncMemoryBudget();
|
||||
last_thread = sub.thread_id;
|
||||
}
|
||||
|
||||
auto subscribers_ptr = make_shared<decltype(subscribers)>(std::move(subscribers));
|
||||
auto buf = shared_ptr<char[]>{new char[channel.size() + msg.size()]};
|
||||
|
@ -2339,12 +2346,13 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
|
|||
|
||||
if (conn_state.subscribe_info) {
|
||||
DCHECK(!conn_state.subscribe_info->patterns.empty());
|
||||
|
||||
auto token = conn_state.subscribe_info->borrow_token;
|
||||
server_cntx->PUnsubscribeAll(false);
|
||||
// Check that all borrowers finished processing
|
||||
token.Wait();
|
||||
DCHECK(!conn_state.subscribe_info);
|
||||
token.Wait(); // Same as above
|
||||
}
|
||||
|
||||
DCHECK(!conn_state.subscribe_info);
|
||||
}
|
||||
|
||||
DeactivateMonitoring(server_cntx);
|
||||
|
|
|
@ -1635,6 +1635,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
append("small_string_bytes", m.small_string_bytes);
|
||||
append("pipeline_cache_bytes", m.conn_stats.pipeline_cmd_cache_bytes);
|
||||
append("dispatch_queue_bytes", m.conn_stats.dispatch_queue_bytes);
|
||||
append("dispatch_queue_subscriber_bytes", m.conn_stats.dispatch_queue_subscriber_bytes);
|
||||
append("dispatch_queue_peak_bytes", m.peak_stats.conn_dispatch_queue_bytes);
|
||||
append("client_read_buffer_peak_bytes", m.peak_stats.conn_read_buf_capacity);
|
||||
|
||||
|
|
|
@ -287,6 +287,50 @@ async def test_multi_pubsub(async_client):
|
|||
assert state, message
|
||||
|
||||
|
||||
"""
|
||||
Test that pubsub clients who are stuck on backpressure from a slow client (the one in the test doesn't read messages at all)
|
||||
will eventually unblock when it disconnects.
|
||||
"""
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.slow
|
||||
@dfly_args({"proactor_threads": "1", "pipeline_queue_limit": "100"})
|
||||
async def test_publish_stuck(df_server: DflyInstance, async_client: aioredis.Redis):
|
||||
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port, limit=10)
|
||||
writer.write(b"SUBSCRIBE channel\r\n")
|
||||
await writer.drain()
|
||||
|
||||
async def pub_task():
|
||||
payload = "msg" * 1000
|
||||
p = async_client.pipeline()
|
||||
for _ in range(1000):
|
||||
p.publish("channel", payload)
|
||||
await p.execute()
|
||||
|
||||
publishers = [asyncio.create_task(pub_task()) for _ in range(20)]
|
||||
|
||||
await asyncio.sleep(5)
|
||||
|
||||
# Check we reached the limit
|
||||
pub_bytes = int((await async_client.info())["dispatch_queue_pub_bytes"])
|
||||
assert pub_bytes >= 100
|
||||
|
||||
await asyncio.sleep(0.1)
|
||||
|
||||
# Make sure processing is stalled
|
||||
new_pub_bytes = int((await async_client.info())["dispatch_queue_pub_bytes"])
|
||||
assert new_pub_bytes == pub_bytes
|
||||
|
||||
writer.write(b"QUIT\r\n")
|
||||
await writer.drain()
|
||||
writer.close()
|
||||
|
||||
# Make sure all publishers unblock eventually
|
||||
for pub in asyncio.as_completed(publishers):
|
||||
await pub
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_connections=100):
|
||||
# TODO: I am not how to customize the max connections for the pool.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue