chore: make per-thread QueueBackpressure objects global (#4482)

Before this PR, QueueBackpressure objects are in fact referenced from other threads in an awkward way via
Connection::WeakRef::EnsureMemoryBudget().

This PR removes the complexities of accessing these objects from foreigh threads.
This commit is contained in:
Roman Gershman 2025-01-20 12:53:36 +02:00 committed by GitHub
parent b017cdd1a0
commit d2209d9eea
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 89 additions and 87 deletions

View file

@ -92,6 +92,8 @@ using absl::GetFlag;
using nonstd::make_unexpected;
namespace facade {
namespace {
void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
@ -269,11 +271,12 @@ thread_local uint32_t free_req_release_weight = 0;
const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN",
"PRECLOSE"};
} // namespace
// Keeps track of total per-thread sizes of dispatch queues to limit memory taken up by messages
// in these queues.
struct Connection::QueueBackpressure {
struct QueueBackpressure {
QueueBackpressure() {
}
// Block until subscriber memory usage is below limit, can be called from any thread.
void EnsureBelowLimit();
@ -296,14 +299,23 @@ struct Connection::QueueBackpressure {
uint32_t pipeline_queue_max_len = 256; // cached flag for pipeline queue max length.
};
thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
thread_local Connection::QueueBackpressure Connection::tl_queue_backpressure_;
void Connection::QueueBackpressure::EnsureBelowLimit() {
void QueueBackpressure::EnsureBelowLimit() {
pubsub_ec.await(
[this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}
// Global array for each io thread to keep track of the total memory usage of the dispatch queues.
QueueBackpressure* thread_queue_backpressure = nullptr;
QueueBackpressure& GetQueueBackpressure() {
DCHECK(thread_queue_backpressure != nullptr);
return *thread_queue_backpressure;
}
} // namespace
thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
auto* next = storage.data();
for (size_t i = 0; i < args.size(); ++i) {
@ -523,6 +535,30 @@ void UpdateLibNameVerMap(const string& name, const string& ver, int delta) {
}
} // namespace
void Connection::Init(unsigned io_threads) {
CHECK(thread_queue_backpressure == nullptr);
thread_queue_backpressure = new QueueBackpressure[io_threads];
for (unsigned i = 0; i < io_threads; ++i) {
auto& qbp = thread_queue_backpressure[i];
qbp.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit);
qbp.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit);
qbp.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
qbp.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);
if (qbp.publish_buffer_limit == 0 || qbp.pipeline_cache_limit == 0 ||
qbp.pipeline_buffer_limit == 0 || qbp.pipeline_queue_max_len == 0) {
LOG(ERROR) << "pipeline flag limit is 0";
exit(-1);
}
}
}
void Connection::Shutdown() {
delete[] thread_queue_backpressure;
thread_queue_backpressure = nullptr;
}
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service)
: io_buf_(kMinReadSize),
@ -611,37 +647,14 @@ void Connection::OnPostMigrateThread() {
LaunchAsyncFiberIfNeeded();
}
// Update tl variables
queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
++stats_->num_conns;
stats_->read_buf_capacity += io_buf_.Capacity();
}
void Connection::OnConnectionStart() {
DCHECK(queue_backpressure_ == nullptr);
ThisFiber::SetName("DflyConnection");
// We must initialize tl_queue_backpressure_ here and not in the c'tor because a connection object
// may be created in a differrent thread from where it runs.
if (tl_queue_backpressure_.publish_buffer_limit == 0) {
tl_queue_backpressure_.publish_buffer_limit = GetFlag(FLAGS_publish_buffer_limit);
tl_queue_backpressure_.pipeline_cache_limit = GetFlag(FLAGS_request_cache_limit);
tl_queue_backpressure_.pipeline_buffer_limit = GetFlag(FLAGS_pipeline_buffer_limit);
tl_queue_backpressure_.pipeline_queue_max_len = GetFlag(FLAGS_pipeline_queue_limit);
if (tl_queue_backpressure_.publish_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_cache_limit == 0 ||
tl_queue_backpressure_.pipeline_buffer_limit == 0 ||
tl_queue_backpressure_.pipeline_queue_max_len == 0) {
LOG(ERROR) << "pipeline flag limit is 0";
exit(-1);
}
}
queue_backpressure_ = &tl_queue_backpressure_;
stats_ = &tl_facade_stats->conn_stats;
}
@ -1016,19 +1029,18 @@ void Connection::ConnectionFlow() {
void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb) {
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
bool optimize_for_async = has_more;
if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size())) {
QueueBackpressure& qbp = GetQueueBackpressure();
if (optimize_for_async &&
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size())) {
stats_->pipeline_throttle_count++;
LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit: pipeline_bytes "
<< stats_->dispatch_queue_bytes << " queue_size " << dispatch_q_.size()
<< ", consider increasing pipeline_buffer_limit/pipeline_queue_limit";
fb2::NoOpLock noop;
queue_backpressure_->pipeline_cnd.wait(noop, [this] {
bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit(
stats_->dispatch_queue_bytes, dispatch_q_.size());
qbp.pipeline_cnd.wait(noop, [this, &qbp] {
bool over_limits =
qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size());
return !over_limits || (dispatch_q_.empty() && !cc_->async_dispatch) || cc_->conn_closing;
});
if (cc_->conn_closing)
@ -1418,8 +1430,9 @@ void Connection::ClearPipelinedMessages() {
}
dispatch_q_.clear();
queue_backpressure_->pipeline_cnd.notify_all();
queue_backpressure_->pubsub_ec.notifyAll();
QueueBackpressure& qbp = GetQueueBackpressure();
qbp.pipeline_cnd.notify_all();
qbp.pubsub_ec.notifyAll();
}
string Connection::DebugInfo() const {
@ -1465,7 +1478,7 @@ void Connection::AsyncFiber() {
uint64_t prev_epoch = fb2::FiberSwitchEpoch();
fb2::NoOpLock noop_lk;
QueueBackpressure& qbp = GetQueueBackpressure();
while (!reply_builder_->GetError()) {
DCHECK_EQ(socket()->proactor(), ProactorBase::me());
cnd_.wait(noop_lk, [this] {
@ -1493,7 +1506,7 @@ void Connection::AsyncFiber() {
reply_builder_->SetBatchMode(dispatch_q_.size() > 1);
bool subscriber_over_limit =
stats_->dispatch_queue_subscriber_bytes >= queue_backpressure_->publish_buffer_limit;
stats_->dispatch_queue_subscriber_bytes >= qbp.publish_buffer_limit;
// Special case: if the dispatch queue accumulated a big number of commands,
// we can try to squash them
@ -1518,7 +1531,7 @@ void Connection::AsyncFiber() {
if (ShouldEndAsyncFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
qbp.pipeline_cnd.notify_all();
return; // don't set conn closing flag
}
@ -1528,21 +1541,18 @@ void Connection::AsyncFiber() {
RecycleMessage(std::move(msg));
}
DCHECK(queue_backpressure_ == &tl_queue_backpressure_);
if (!queue_backpressure_->IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes,
dispatch_q_.size()) ||
if (!qbp.IsPipelineBufferOverLimit(stats_->dispatch_queue_bytes, dispatch_q_.size()) ||
dispatch_q_.empty()) {
queue_backpressure_->pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
qbp.pipeline_cnd.notify_all(); // very cheap if noone is waiting on it.
}
if (subscriber_over_limit &&
stats_->dispatch_queue_subscriber_bytes < queue_backpressure_->publish_buffer_limit)
queue_backpressure_->pubsub_ec.notify();
if (subscriber_over_limit && stats_->dispatch_queue_subscriber_bytes < qbp.publish_buffer_limit)
qbp.pubsub_ec.notify();
}
DCHECK(cc_->conn_closing || reply_builder_->GetError());
cc_->conn_closing = true;
queue_backpressure_->pipeline_cnd.notify_all();
qbp.pipeline_cnd.notify_all();
}
Connection::PipelineMessagePtr Connection::FromArgs(RespVec args, mi_heap_t* heap) {
@ -1632,7 +1642,7 @@ Connection::WeakRef Connection::Borrow() {
// unsafe. All external mechanisms that borrow references should register subscriptions.
DCHECK_GT(cc_->subscriptions, 0);
return WeakRef(self_, queue_backpressure_, socket_->proactor()->GetPoolIndex(), id_);
return WeakRef(self_, socket_->proactor()->GetPoolIndex(), id_);
}
void Connection::ShutdownThreadLocal() {
@ -1709,7 +1719,8 @@ void Connection::SendAsync(MessageHandle msg) {
msg.dispatch_ts = ProactorBase::GetMonotonicTimeNs();
if (msg.IsPubMsg()) {
queue_backpressure_->subscriber_bytes.fetch_add(used_mem, memory_order_relaxed);
QueueBackpressure& qbp = GetQueueBackpressure();
qbp.subscriber_bytes.fetch_add(used_mem, memory_order_relaxed);
stats_->dispatch_queue_subscriber_bytes += used_mem;
}
@ -1739,8 +1750,9 @@ void Connection::RecycleMessage(MessageHandle msg) {
stats_->dispatch_queue_bytes -= used_mem;
stats_->dispatch_queue_entries--;
QueueBackpressure& qbp = GetQueueBackpressure();
if (msg.IsPubMsg()) {
queue_backpressure_->subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed);
qbp.subscriber_bytes.fetch_sub(used_mem, memory_order_relaxed);
stats_->dispatch_queue_subscriber_bytes -= used_mem;
}
@ -1752,7 +1764,7 @@ void Connection::RecycleMessage(MessageHandle msg) {
// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
pending_pipeline_cmd_cnt_--;
if (stats_->pipeline_cmd_cache_bytes < queue_backpressure_->pipeline_cache_limit) {
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
pipeline_req_pool_.push_back(std::move(*pipe));
}
@ -1850,14 +1862,14 @@ void Connection::BreakOnce(uint32_t ev_mask) {
}
}
void Connection::SetMaxQueueLenThreadLocal(uint32_t val) {
tl_queue_backpressure_.pipeline_queue_max_len = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
void Connection::SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val) {
thread_queue_backpressure[tid].pipeline_queue_max_len = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
}
void Connection::SetPipelineBufferLimit(size_t val) {
tl_queue_backpressure_.pipeline_buffer_limit = val;
tl_queue_backpressure_.pipeline_cnd.notify_all();
void Connection::SetPipelineBufferLimit(unsigned tid, size_t val) {
thread_queue_backpressure[tid].pipeline_buffer_limit = val;
thread_queue_backpressure[tid].pipeline_cnd.notify_all();
}
void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) {
@ -1874,18 +1886,17 @@ void Connection::TrackRequestSize(bool enable) {
}
}
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {
DCHECK(backpressure);
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id,
uint32_t client_id)
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
}
unsigned Connection::WeakRef::Thread() const {
return thread_;
return thread_id_;
}
Connection* Connection::WeakRef::Get() const {
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_));
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_));
// The connection can only be deleted on this thread, so
// this pointer is valid until the next suspension.
// Note: keeping a shared_ptr doesn't prolong the lifetime because
@ -1904,9 +1915,7 @@ uint32_t Connection::WeakRef::GetClientId() const {
bool Connection::WeakRef::EnsureMemoryBudget() const {
// Simple optimization: If a connection was closed, don't check memory budget.
if (!ptr_.expired()) {
// We don't rely on the connection ptr staying valid because we only access
// the threads backpressure
backpressure_->EnsureBelowLimit();
thread_queue_backpressure[thread_id_].EnsureBelowLimit();
return true;
}
return false;
@ -1931,7 +1940,6 @@ void ResetStats() {
cstats.io_read_bytes = 0;
tl_facade_stats->reply_stats = {};
if (io_req_size_hist)
io_req_size_hist->Clear();
}

View file

@ -54,9 +54,10 @@ class SinkReplyBuilder;
// For pipelined requests, monitor and pubsub messages it uses
// a separate dispatch queue that is processed on a separate fiber.
class Connection : public util::Connection {
struct QueueBackpressure;
public:
static void Init(unsigned io_threads);
static void Shutdown();
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
ServiceInterface* service);
~Connection();
@ -204,12 +205,10 @@ class Connection : public util::Connection {
private:
friend class Connection;
WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure, unsigned thread,
uint32_t client_id);
WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id, uint32_t client_id);
std::weak_ptr<Connection> ptr_;
QueueBackpressure* backpressure_;
unsigned thread_;
unsigned thread_id_;
uint32_t client_id_;
};
@ -306,8 +305,8 @@ class Connection : public util::Connection {
bool IsHttp() const;
// Sets max queue length locally in the calling thread.
static void SetMaxQueueLenThreadLocal(uint32_t val);
static void SetPipelineBufferLimit(size_t val);
static void SetMaxQueueLenThreadLocal(unsigned tid, uint32_t val);
static void SetPipelineBufferLimit(unsigned tid, size_t val);
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
static void TrackRequestSize(bool enable);
@ -427,19 +426,12 @@ class Connection : public util::Connection {
// Used to keep track of borrowed references. Does not really own itself
std::shared_ptr<Connection> self_;
// Pointer to corresponding queue backpressure struct.
// Needed for access from different threads by EnsureAsyncMemoryBudget().
QueueBackpressure* queue_backpressure_ = nullptr;
util::fb2::ProactorBase* migration_request_ = nullptr;
// Pooled pipeline messages per-thread
// Aggregated while handling pipelines, gradually released while handling regular commands.
static thread_local std::vector<PipelineMessagePtr> pipeline_req_pool_;
// Per-thread queue backpressure structs.
static thread_local QueueBackpressure tl_queue_backpressure_;
union {
uint16_t flags_;
struct {

View file

@ -778,6 +778,7 @@ Service::~Service() {
void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
InitRedisTables();
facade::Connection::Init(pp_.size());
config_registry.RegisterSetter<MemoryBytesFlag>(
"maxmemory", [](const MemoryBytesFlag& flag) { max_memory_limit = flag.value; });
@ -800,12 +801,12 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterSetter<uint32_t>("pipeline_queue_limit", [](uint32_t val) {
shard_set->pool()->AwaitBrief(
[val](unsigned, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(val); });
[val](unsigned tid, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(tid, val); });
});
config_registry.RegisterSetter<size_t>("pipeline_buffer_limit", [](size_t val) {
shard_set->pool()->AwaitBrief(
[val](unsigned, auto*) { facade::Connection::SetPipelineBufferLimit(val); });
[val](unsigned tid, auto*) { facade::Connection::SetPipelineBufferLimit(tid, val); });
});
config_registry.RegisterMutable("replica_partial_sync");
@ -910,6 +911,7 @@ void Service::Shutdown() {
// wait for all the pending callbacks to stop.
ThisFiber::SleepFor(10ms);
facade::Connection::Shutdown();
}
optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgList args,