mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: remove pubsub semantics from Connection::WeakRef (#4483)
Make it generic and move pubsub related logic into channel_store.
This commit is contained in:
parent
85cc443448
commit
7d0530547b
3 changed files with 20 additions and 23 deletions
|
@ -1886,6 +1886,10 @@ void Connection::TrackRequestSize(bool enable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Connection::EnsureMemoryBudget(unsigned tid) {
|
||||||
|
thread_queue_backpressure[tid].EnsureBelowLimit();
|
||||||
|
}
|
||||||
|
|
||||||
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id,
|
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, unsigned thread_id,
|
||||||
uint32_t client_id)
|
uint32_t client_id)
|
||||||
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
|
: ptr_{ptr}, thread_id_{thread_id}, client_id_{client_id} {
|
||||||
|
@ -1896,7 +1900,9 @@ unsigned Connection::WeakRef::Thread() const {
|
||||||
}
|
}
|
||||||
|
|
||||||
Connection* Connection::WeakRef::Get() const {
|
Connection* Connection::WeakRef::Get() const {
|
||||||
|
// We should never access the connection object from other threads.
|
||||||
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_));
|
DCHECK_EQ(ProactorBase::me()->GetPoolIndex(), int(thread_id_));
|
||||||
|
|
||||||
// The connection can only be deleted on this thread, so
|
// The connection can only be deleted on this thread, so
|
||||||
// this pointer is valid until the next suspension.
|
// this pointer is valid until the next suspension.
|
||||||
// Note: keeping a shared_ptr doesn't prolong the lifetime because
|
// Note: keeping a shared_ptr doesn't prolong the lifetime because
|
||||||
|
@ -1912,15 +1918,6 @@ uint32_t Connection::WeakRef::GetClientId() const {
|
||||||
return client_id_;
|
return client_id_;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Connection::WeakRef::EnsureMemoryBudget() const {
|
|
||||||
// Simple optimization: If a connection was closed, don't check memory budget.
|
|
||||||
if (!ptr_.expired()) {
|
|
||||||
thread_queue_backpressure[thread_id_].EnsureBelowLimit();
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool Connection::WeakRef::operator<(const WeakRef& other) {
|
bool Connection::WeakRef::operator<(const WeakRef& other) {
|
||||||
return client_id_ < other.client_id_;
|
return client_id_ < other.client_id_;
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,6 +57,7 @@ class Connection : public util::Connection {
|
||||||
public:
|
public:
|
||||||
static void Init(unsigned io_threads);
|
static void Init(unsigned io_threads);
|
||||||
static void Shutdown();
|
static void Shutdown();
|
||||||
|
static void ShutdownThreadLocal();
|
||||||
|
|
||||||
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
||||||
ServiceInterface* service);
|
ServiceInterface* service);
|
||||||
|
@ -196,9 +197,6 @@ class Connection : public util::Connection {
|
||||||
// Returns client id.Thread-safe.
|
// Returns client id.Thread-safe.
|
||||||
uint32_t GetClientId() const;
|
uint32_t GetClientId() const;
|
||||||
|
|
||||||
// Ensure owner thread's memory budget. If expired, skips and returns false. Thread-safe.
|
|
||||||
bool EnsureMemoryBudget() const;
|
|
||||||
|
|
||||||
bool operator<(const WeakRef& other);
|
bool operator<(const WeakRef& other);
|
||||||
bool operator==(const WeakRef& other) const;
|
bool operator==(const WeakRef& other) const;
|
||||||
|
|
||||||
|
@ -230,10 +228,6 @@ class Connection : public util::Connection {
|
||||||
// Add InvalidationMessage to dispatch queue.
|
// Add InvalidationMessage to dispatch queue.
|
||||||
virtual void SendInvalidationMessageAsync(InvalidationMessage);
|
virtual void SendInvalidationMessageAsync(InvalidationMessage);
|
||||||
|
|
||||||
// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
|
|
||||||
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
|
|
||||||
void EnsureAsyncMemoryBudget();
|
|
||||||
|
|
||||||
// Register hook that is executen when the connection breaks.
|
// Register hook that is executen when the connection breaks.
|
||||||
void RegisterBreakHook(BreakerCb breaker_cb);
|
void RegisterBreakHook(BreakerCb breaker_cb);
|
||||||
|
|
||||||
|
@ -248,8 +242,6 @@ class Connection : public util::Connection {
|
||||||
// Borrow weak reference to connection. Can be called from any thread.
|
// Borrow weak reference to connection. Can be called from any thread.
|
||||||
WeakRef Borrow();
|
WeakRef Borrow();
|
||||||
|
|
||||||
static void ShutdownThreadLocal();
|
|
||||||
|
|
||||||
bool IsCurrentlyDispatching() const;
|
bool IsCurrentlyDispatching() const;
|
||||||
|
|
||||||
std::string GetClientInfo(unsigned thread_id) const;
|
std::string GetClientInfo(unsigned thread_id) const;
|
||||||
|
@ -309,6 +301,7 @@ class Connection : public util::Connection {
|
||||||
static void SetPipelineBufferLimit(unsigned tid, size_t val);
|
static void SetPipelineBufferLimit(unsigned tid, size_t val);
|
||||||
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
|
static void GetRequestSizeHistogramThreadLocal(std::string* hist);
|
||||||
static void TrackRequestSize(bool enable);
|
static void TrackRequestSize(bool enable);
|
||||||
|
static void EnsureMemoryBudget(unsigned tid);
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void OnShutdown() override;
|
void OnShutdown() override;
|
||||||
|
|
|
@ -131,14 +131,21 @@ unsigned ChannelStore::SendMessages(std::string_view channel, facade::ArgRange m
|
||||||
// Make sure none of the threads publish buffer limits is reached. We don't reserve memory ahead
|
// Make sure none of the threads publish buffer limits is reached. We don't reserve memory ahead
|
||||||
// and don't prevent the buffer from possibly filling, but the approach is good enough for
|
// and don't prevent the buffer from possibly filling, but the approach is good enough for
|
||||||
// limiting fast producers. Most importantly, we can use DispatchBrief below as we block here
|
// limiting fast producers. Most importantly, we can use DispatchBrief below as we block here
|
||||||
optional<uint32_t> last_thread;
|
int32_t last_thread = -1;
|
||||||
for (auto& sub : subscribers) {
|
for (auto& sub : subscribers) {
|
||||||
DCHECK_LE(last_thread.value_or(0), sub.Thread());
|
int sub_thread = sub.Thread();
|
||||||
if (last_thread && *last_thread == sub.Thread()) // skip same thread
|
DCHECK_LE(last_thread, sub_thread);
|
||||||
|
if (last_thread == sub_thread) // skip same thread
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (sub.EnsureMemoryBudget()) // Invalid pointers are skipped
|
if (sub.IsExpired())
|
||||||
last_thread = sub.Thread();
|
continue;
|
||||||
|
|
||||||
|
// Make sure the connection thread has enough memory budget to accept the message.
|
||||||
|
// This is a heuristic and not entirely hermetic since the connection memory might
|
||||||
|
// get filled again.
|
||||||
|
facade::Connection::EnsureMemoryBudget(sub.Thread());
|
||||||
|
last_thread = sub_thread;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto subscribers_ptr = make_shared<decltype(subscribers)>(std::move(subscribers));
|
auto subscribers_ptr = make_shared<decltype(subscribers)>(std::move(subscribers));
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue