diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index cff6b7916..a04645cdb 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1886,6 +1886,10 @@ void Connection::TrackRequestSize(bool enable) { } } +void Connection::EnsureMemoryBudget(unsigned tid) { + thread_queue_backpressure[tid].EnsureBelowLimit(); +} + Connection::WeakRef::WeakRef(std::shared_ptr ptr, unsigned thread_id, uint32_t client_id) : ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} { @@ -1896,7 +1900,9 @@ unsigned Connection::WeakRef::Thread() const { } Connection* Connection::WeakRef::Get() const { + // We should never access the connection object from other threads. DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_)); + // The connection can only be deleted on this thread, so // this pointer is valid until the next suspension. // Note: keeping a shared_ptr doesn't prolong the lifetime because @@ -1912,15 +1918,6 @@ uint32_t Connection::WeakRef::GetClientId() const { return client_id_; } -bool Connection::WeakRef::EnsureMemoryBudget() const { - // Simple optimization: If a connection was closed, don't check memory budget. - if (!ptr_.expired()) { - thread_queue_backpressure[thread_id_].EnsureBelowLimit(); - return true; - } - return false; -} - bool Connection::WeakRef::operator<(const WeakRef& other) { return client_id_ < other.client_id_; } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index eb4f7cc52..f5494396b 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -57,6 +57,7 @@ class Connection : public util::Connection { public: static void Init(unsigned io_threads); static void Shutdown(); + static void ShutdownThreadLocal(); Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service); @@ -196,9 +197,6 @@ class Connection : public util::Connection { // Returns client id.Thread-safe. uint32_t GetClientId() const; - // Ensure owner thread's memory budget. If expired, skips and returns false. Thread-safe. - bool EnsureMemoryBudget() const; - bool operator<(const WeakRef& other); bool operator==(const WeakRef& other) const; @@ -230,10 +228,6 @@ class Connection : public util::Connection { // Add InvalidationMessage to dispatch queue. virtual void SendInvalidationMessageAsync(InvalidationMessage); - // Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not - // reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag. - void EnsureAsyncMemoryBudget(); - // Register hook that is executen when the connection breaks. void RegisterBreakHook(BreakerCb breaker_cb); @@ -248,8 +242,6 @@ class Connection : public util::Connection { // Borrow weak reference to connection. Can be called from any thread. WeakRef Borrow(); - static void ShutdownThreadLocal(); - bool IsCurrentlyDispatching() const; std::string GetClientInfo(unsigned thread_id) const; @@ -309,6 +301,7 @@ class Connection : public util::Connection { static void SetPipelineBufferLimit(unsigned tid, size_t val); static void GetRequestSizeHistogramThreadLocal(std::string* hist); static void TrackRequestSize(bool enable); + static void EnsureMemoryBudget(unsigned tid); protected: void OnShutdown() override; diff --git a/src/server/channel_store.cc b/src/server/channel_store.cc index cd055ef8d..06d4d3b10 100644 --- a/src/server/channel_store.cc +++ b/src/server/channel_store.cc @@ -131,14 +131,21 @@ unsigned ChannelStore::SendMessages(std::string_view channel, facade::ArgRange m // Make sure none of the threads publish buffer limits is reached. We don't reserve memory ahead // and don't prevent the buffer from possibly filling, but the approach is good enough for // limiting fast producers. Most importantly, we can use DispatchBrief below as we block here - optional last_thread; + int32_t last_thread = -1; for (auto& sub : subscribers) { - DCHECK_LE(last_thread.value_or(0), sub.Thread()); - if (last_thread && *last_thread == sub.Thread()) // skip same thread + int sub_thread = sub.Thread(); + DCHECK_LE(last_thread, sub_thread); + if (last_thread == sub_thread) // skip same thread continue; - if (sub.EnsureMemoryBudget()) // Invalid pointers are skipped - last_thread = sub.Thread(); + if (sub.IsExpired()) + continue; + + // Make sure the connection thread has enough memory budget to accept the message. + // This is a heuristic and not entirely hermetic since the connection memory might + // get filled again. + facade::Connection::EnsureMemoryBudget(sub.Thread()); + last_thread = sub_thread; } auto subscribers_ptr = make_shared(std::move(subscribers));