chore: call breaker_cb_ on shutdown (#3128)

* chore: call breaker_cb_ on shutdown
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-06-06 08:09:40 +03:00 committed by GitHub
parent 229eeeb014
commit ce2e127183
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 14 additions and 30 deletions

View file

@ -150,7 +150,7 @@ jobs:
- if: ${{ hashFiles(format('{0}-{1}', matrix.dockerfile, inputs.build_type)) }}
name: Build release image for arm64
uses: docker/build-push-action@v3
uses: docker/build-push-action@v5
with:
context: .
platforms: linux/arm64

View file

@ -552,11 +552,7 @@ Connection::~Connection() {
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";
if (shutdown_cb_) {
for (const auto& k_v : shutdown_cb_->map) {
k_v.second();
}
}
BreakOnce(POLLHUP);
}
void Connection::OnPreMigrateThread() {
@ -600,21 +596,6 @@ void Connection::OnPostMigrateThread() {
}
}
auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
if (!shutdown_cb_) {
shutdown_cb_ = make_unique<Shutdown>();
}
return shutdown_cb_->Add(std::move(cb));
}
void Connection::UnregisterShutdownHook(ShutdownHandle id) {
if (shutdown_cb_) {
shutdown_cb_->Remove(id);
if (shutdown_cb_->map.empty())
shutdown_cb_.reset();
}
}
void Connection::HandleRequests() {
ThisFiber::SetName("DflyConnection");
@ -1103,8 +1084,7 @@ void Connection::OnBreakCb(int32_t mask) {
<< cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError();
cc_->conn_closing = true;
breaker_cb_(mask);
BreakOnce(mask);
evc_.notify(); // Notify dispatch fiber.
}
@ -1546,7 +1526,7 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) {
void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "LaunchDispatchFiberIfNeeded " << GetClientId();
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch",
[&, peer = socket_.get()]() { DispatchFiber(peer); });
}
@ -1718,6 +1698,15 @@ void Connection::DecreaseStatsOnClose() {
--stats_->num_conns;
}
void Connection::BreakOnce(uint32_t ev_mask) {
if (breaker_cb_) {
DVLOG(1) << "[" << id_ << "] Connection::breaker_cb_ " << ev_mask;
auto fun = std::move(breaker_cb_);
DCHECK(!breaker_cb_);
fun(ev_mask);
}
}
Connection::WeakRef::WeakRef(std::shared_ptr<Connection> ptr, QueueBackpressure* backpressure,
unsigned thread, uint32_t client_id)
: ptr_{ptr}, backpressure_{backpressure}, thread_{thread}, client_id_{client_id} {

View file

@ -238,11 +238,6 @@ class Connection : public util::Connection {
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.
void EnsureAsyncMemoryBudget();
// Register hook that is executed on connection shutdown.
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
void UnregisterShutdownHook(ShutdownHandle id);
// Register hook that is executen when the connection breaks.
void RegisterBreakHook(BreakerCb breaker_cb);
@ -400,6 +395,7 @@ class Connection : public util::Connection {
private:
void DecreaseStatsOnClose();
void BreakOnce(uint32_t ev_mask);
std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::EventCount evc_; // dispatch queue waker
@ -428,7 +424,6 @@ class Connection : public util::Connection {
unsigned parser_error_ = 0;
BreakerCb breaker_cb_;
std::unique_ptr<Shutdown> shutdown_cb_;
// Used by redis parser to avoid allocations
RespVec tmp_parse_args_;