chore: Introduce pipeline back-pressure (#3152)

* chore: Introduce pipeline back-pressure

Also, improve synchronization primitives and replace them with
thread-local variations.

Before the change, on my local machine with the dragonfly running with 8 threads,
`memtier_benchmark  -c 10 --threads 8  --command="PING"  --key-maximum 100000000  --hide-histogram --distinct-client-seed --pipeline=20 --test-time=10`

reached 10M qps with 0.327ms p99.9.

After the change, the same command showed 13.8M qps with 0.2ms p99.9
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-06-10 12:39:41 +03:00 committed by GitHub
parent 8eb9d48c3a
commit 007d4854db
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 138 additions and 55 deletions

View file

@ -67,6 +67,7 @@ jobs:
ctest -V -L DFLY
build-macos:
if: false
runs-on: macos-12
timeout-minutes: 45
steps:

View file

@ -30,6 +30,7 @@
#endif
using namespace std;
using facade::operator""_MB;
ABSL_FLAG(bool, tcp_nodelay, true,
"Configures dragonfly connections with socket option TCP_NODELAY");
@ -44,10 +45,13 @@ ABSL_FLAG(string, admin_bind, "",
"If set, the admin consol TCP connection would be bind the given address. "
"This supports both HTTP and RESP protocols");
ABSL_FLAG(uint64_t, request_cache_limit, 1ULL << 26, // 64MB
ABSL_FLAG(uint64_t, request_cache_limit, 64_MB,
"Amount of memory to use for request cache in bytes - per IO thread.");
ABSL_FLAG(uint64_t, subscriber_thread_limit, 1ULL << 27, // 128MB
ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB,
"Amount of memory to use for parsing pipeline requests - per IO thread.");
ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB,
"Amount of memory to use for storing pub commands in bytes - per IO thread");
ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin port");
@ -254,8 +258,7 @@ thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_poo
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;
void Connection::QueueBackpressure::EnsureBelowLimit() {
ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= subscriber_thread_limit; });
ec.await([this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}
struct Connection::Shutdown {
@ -521,12 +524,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
last_interaction_ = creation_time_;
id_ = next_id.fetch_add(1, memory_order_relaxed);
queue_backpressure_ = &tl_queue_backpressure_;
if (queue_backpressure_->subscriber_thread_limit == 0) {
queue_backpressure_->subscriber_thread_limit = absl::GetFlag(FLAGS_subscriber_thread_limit);
queue_backpressure_->pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
}
migration_enabled_ = absl::GetFlag(FLAGS_migrate_connections);
// Create shared_ptr with empty value and associate it with `this` pointer (aliasing constructor).
@ -596,16 +593,38 @@ void Connection::OnPostMigrateThread() {
}
}
void Connection::HandleRequests() {
void Connection::OnConnectionStart() {
DCHECK(queue_backpressure_ == nullptr);
ThisFiber::SetName("DflyConnection");
// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = absl::GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = absl::GetFlag(FLAGS_pipeline_buffer_limit);
if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0) {
LOG(ERROR) << "Buffer limit settings are 0";
exit(-1);
}
}
queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
}
void Connection::HandleRequests() {
VLOG(1) << "[" << id_ << "] HandleRequests";
if (absl::GetFlag(FLAGS_tcp_nodelay) && !socket_->IsUDS()) {
int val = 1;
int res = setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &val, sizeof(val));
DCHECK_EQ(res, 0);
}
stats_ = &tl_facade_stats->conn_stats;
auto remote_ep = RemoteEndpointStr();
FiberSocketBase* peer = socket_.get();
@ -882,7 +901,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
// After the client disconnected.
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
cnd_.notify_one();
phase_ = SHUTTING_DOWN;
VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
@ -934,34 +953,56 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}
void Connection::DispatchCommand(bool has_more, absl::FunctionRef<void()> dispatch_sync,
absl::FunctionRef<MessageHandle()> dispatch_async) {
// Avoid sync dispatch if we can interleave with an ongoing async dispatch
bool can_dispatch_sync = !cc_->async_dispatch;
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;
// Avoid sync dispatch if we already have pending async messages or
// can potentially receive some (subscriptions > 0)
if (dispatch_q_.size() > 0 || cc_->subscriptions > 0)
can_dispatch_sync = false;
if (optimize_for_async &&
queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes)) {
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
return !queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
(dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
return;
// prefer synchronous dispatching to save memory.
optimize_for_async = false;
}
// Avoid sync dispatch if we can interleave with an ongoing async dispatch.
bool can_dispatch_sync = !cc_->async_dispatch && dispatch_q_.empty() && cc_->subscriptions == 0;
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (has_more || !can_dispatch_sync) {
SendAsync(dispatch_async());
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());
if (dispatch_q_.size() > 10)
ThisFiber::Yield();
auto epoch = fb2::FiberSwitchEpoch();
if (async_fiber_epoch_ == epoch) {
// If we pushed too many items without context switching - yield
if (++async_streak_len_ >= 10 && !cc_->async_dispatch) {
async_streak_len_ = 0;
ThisFiber::Yield();
}
} else {
async_streak_len_ = 0;
async_fiber_epoch_ = epoch;
}
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
cc_->sync_dispatch = true;
dispatch_sync();
invoke_cb();
cc_->sync_dispatch = false;
}
last_interaction_ = time(nullptr);
// We might have blocked the dispatch queue from processing, wake it up.
if (dispatch_q_.size() > 0)
evc_.notify();
cnd_.notify_one();
}
}
@ -993,7 +1034,8 @@ Connection::ParserStatus Connection::ParseRedis(SinkReplyBuilder* orig_builder)
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
}
DispatchCommand(has_more, dispatch_sync, dispatch_async);
DispatchSingle(has_more, dispatch_sync, dispatch_async);
}
io_buf_.ConsumeInput(consumed);
} while (RedisParser::OK == result && !orig_builder->GetError());
@ -1049,7 +1091,7 @@ auto Connection::ParseMemcache() -> ParserStatus {
return NEED_MORE;
}
}
DispatchCommand(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
DispatchSingle(total_len < io_buf_.InputLen(), dispatch_sync, dispatch_async);
io_buf_.ConsumeInput(total_len);
} while (!builder->GetError());
@ -1084,7 +1126,7 @@ void Connection::OnBreakCb(int32_t mask) {
cc_->conn_closing = true;
BreakOnce(mask);
evc_.notify(); // Notify dispatch fiber.
cnd_.notify_one(); // Notify dispatch fiber.
}
void Connection::HandleMigrateRequest() {
@ -1248,7 +1290,7 @@ void Connection::SquashPipeline(facade::SinkReplyBuilder* builder) {
auto& pmsg = get<PipelineMessagePtr>(msg.handle);
squash_cmds.push_back(absl::MakeSpan(pmsg->args));
}
stats_->squashed_commands += squash_cmds.size();
cc_->async_dispatch = true;
size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), cc_.get());
@ -1286,6 +1328,7 @@ void Connection::ClearPipelinedMessages() {
}
dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->ec.notifyAll();
}
@ -1318,18 +1361,21 @@ std::string Connection::DebugInfo() const {
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("DispatchFiber");
void Connection::ExecutionFiber(util::FiberSocketBase* peer) {
ThisFiber::SetName("ExecutionFiber");
SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this};
size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);
uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;
while (!builder->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
evc_.await(
[this] { return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch); });
cnd_.wait(noop_lk, [this] {
return cc_->conn_closing || (!dispatch_q_.empty() && !cc_->sync_dispatch);
});
if (cc_->conn_closing)
break;
@ -1351,6 +1397,9 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
builder->SetBatchMode(dispatch_q_.size() > 1);
bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;
// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
// It is only enabled if the threshold is reached and the whole dispatch queue
@ -1374,6 +1423,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
if (ShouldEndDispatchFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
return; // don't set conn closing flag
}
@ -1383,11 +1433,20 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
RecycleMessage(std::move(msg));
}
queue_backpressure_->ec.notify();
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}
if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->ec.notify();
}
DCHECK(cc_->conn_closing || builder->GetError());
cc_->conn_closing = true;
queue_backpressure_->pipeline_cnd.notify_all();
}
Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
@ -1527,7 +1586,7 @@ void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
[this, peer = socket_.get()]() { ExecutionFiber(peer); });
}
}
@ -1573,7 +1632,7 @@ void Connection::SendAsync(MessageHandle msg) {
// Don't notify if a sync dispatch is in progress, it will wake after finishing.
if (dispatch_q_.size() == 1 && !cc_->sync_dispatch) {
evc_.notify();
cnd_.notify_one();
}
}
@ -1699,6 +1758,7 @@ void Connection::BreakOnce(uint32_t ev_mask) {
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
DCHECK(backpressure);
}
unsigned Connection::WeakRef::Thread() const {

View file

@ -61,6 +61,10 @@ class Connection : public util::Connection {
ServiceInterface* service);
~Connection();
// A callback called by Listener::OnConnectionStart in the same thread where
// HandleRequests will run.
void OnConnectionStart();
using BreakerCb = std::function<void(uint32_t)>;
using ShutdownCb = std::function<void()>;
using ShutdownHandle = unsigned;
@ -215,7 +219,6 @@ class Connection : public util::Connection {
uint32_t client_id_;
};
public:
// Add PubMessage to dispatch queue.
// Virtual because behavior is overridden in test_utils.
virtual void SendPubMessageAsync(PubMessage);
@ -318,18 +321,28 @@ class Connection : public util::Connection {
struct DispatchCleanup;
struct Shutdown;
// Keeps track of total per-thread sizes of dispatch queues to
// limit memory taken up by messages from PUBLISH commands and slow down clients
// producing them to quickly via EnsureAsyncMemoryBudget.
// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct QueueBackpressure {
// Block until memory usage is below limit, can be called from any thread
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();
bool IsPipelineBufferOverLimit(size_t size) const {
return size >= pipeline_buffer_limit;
}
// Used by publisher/subscriber actors to make sure we do not publish too many messages
// into the queue. Thread-safe to allow safe access in EnsureBelowLimit.
util::fb2::EventCount ec;
std::atomic_size_t subscriber_bytes = 0;
size_t subscriber_thread_limit = 0; // cached flag subscriber_thread_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
// Used by pipelining/execution fiber to throttle the incoming pipeline messages.
// Used together with pipeline_buffer_limit to limit the pipeline usage per thread.
util::fb2::CondVarAny pipeline_cnd;
size_t publish_buffer_limit = 0; // cached flag publish_buffer_limit
size_t pipeline_cache_limit = 0; // cached flag pipeline_cache_limit
size_t pipeline_buffer_limit = 0; // cached flag for buffer size in bytes
};
private:
@ -346,14 +359,15 @@ class Connection : public util::Connection {
// Returns true if HTTP header is detected.
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
// Dispatch Redis or MC command. `has_more` should indicate whether the buffer has more commands
// Dispatches a single (Redis or MC) command.
// `has_more` should indicate whether the io buffer has more commands
// (pipelining in progress). Performs async dispatch if forced (already in async mode) or if
// has_more is true, otherwise uses synchronous dispatch.
void DispatchCommand(bool has_more, absl::FunctionRef<void()> sync_dispatch,
absl::FunctionRef<MessageHandle()> async_dispatch);
void DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb);
// Handles events from dispatch queue.
void DispatchFiber(util::FiberSocketBase* peer);
void ExecutionFiber(util::FiberSocketBase* peer);
void SendAsync(MessageHandle msg);
@ -394,7 +408,7 @@ class Connection : public util::Connection {
void BreakOnce(uint32_t ev_mask);
std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::EventCount evc_; // dispatch queue waker
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
size_t pending_pipeline_cmd_cnt_ = 0; // how many queued async commands in dispatch_q
@ -413,12 +427,15 @@ class Connection : public util::Connection {
ServiceInterface* service_;
time_t creation_time_, last_interaction_;
Phase phase_ = SETUP;
std::string name_;
unsigned parser_error_ = 0;
// amount of times we enqued requests asynchronously during the same async_fiber_epoch_.
unsigned async_streak_len_ = 0;
uint64_t async_fiber_epoch_ = 0;
BreakerCb breaker_cb_;
// Used by redis parser to avoid allocations
@ -430,7 +447,7 @@ class Connection : public util::Connection {
// Pointer to corresponding queue backpressure struct.
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_;
QueueBackpressure* queue_backpressure_ = nullptr;
util::fb2::ProactorBase* migration_request_ = nullptr;

View file

@ -325,6 +325,8 @@ void Listener::OnConnectionStart(util::Connection* conn) {
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
VLOG(1) << "Opening connection " << facade_conn->GetClientId();
facade_conn->OnConnectionStart();
absl::base_internal::SpinLockHolder lock{&mutex_};
int32_t prev_cnt = per_thread_[id].num_connections++;
++conn_cnt_;

View file

@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 112u);
static_assert(kSizeConnStats == 120u);
ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
@ -37,6 +37,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_replicas);
ADD(num_blocked_clients);
ADD(num_migrations);
ADD(squashed_commands);
return *this;
}

View file

@ -66,6 +66,7 @@ struct ConnectionStats {
uint32_t num_replicas = 0;
uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0;
uint64_t squashed_commands = 0;
ConnectionStats& operator+=(const ConnectionStats& o);
};

View file

@ -23,7 +23,6 @@ extern "C" {
#include <utility>
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/redis_parser.h"
#include "server/error.h"
#include "server/journal/executor.h"

View file

@ -2102,6 +2102,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("total_commands_processed", conn_stats.command_cnt);
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("total_pipelined_squashed_commands", conn_stats.squashed_commands);
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
append("total_net_input_bytes", conn_stats.io_read_bytes);
append("connection_migrations", conn_stats.num_migrations);

View file

@ -59,6 +59,7 @@ TestConnection::TestConnection(Protocol protocol, io::StringSink* sink)
: facade::Connection(protocol, nullptr, nullptr, nullptr), sink_(sink) {
cc_.reset(new dfly::ConnectionContext(sink_, this));
SetSocket(ProactorBase::me()->CreateSocket());
OnConnectionStart();
}
void TestConnection::SendPubMessageAsync(PubMessage pmsg) {

View file

@ -334,7 +334,7 @@ will eventually unblock when it disconnects.
@pytest.mark.slow
@dfly_args({"proactor_threads": "1", "subscriber_thread_limit": "100"})
@dfly_args({"proactor_threads": "1", "publish_buffer_limit": "100"})
async def test_publish_stuck(df_server: DflyInstance, async_client: aioredis.Redis):
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port, limit=10)
writer.write(b"SUBSCRIBE channel\r\n")