fix(server): Fix break callback handling in case of thread migrations.

When a connection migrates to another thread, it also being handled by a different io_uring.
That means we need to cancel any pending io requests we had in the old io uring and reestablish them
in the new one.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-09-05 06:52:51 +03:00
parent 14e61f532f
commit c5a7e658a1
3 changed files with 45 additions and 19 deletions

2
helio

@ -1 +1 @@
Subproject commit 6bc0a8d49e534c8f7492ecbabc622c54b5ab7e72
Subproject commit 0f87cb9c5e9e846c721eb6c93603d6fdc14223bd

View file

@ -148,6 +148,7 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
Connection::~Connection() {
}
// Called from Connection::Shutdown() right after socket_->Shutdown call.
void Connection::OnShutdown() {
VLOG(1) << "Connection::OnShutdown";
@ -158,6 +159,26 @@ void Connection::OnShutdown() {
}
}
void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have.
if (break_poll_id_ != kuint32max) {
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
us->CancelPoll(break_poll_id_);
break_poll_id_ = kuint32max;
}
}
void Connection::OnPostMigrateThread() {
// Once we migrated, we should rearm OnBreakCb callback.
if (breaker_cb_) {
DCHECK_EQ(kuint32max, break_poll_id_);
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
}
}
auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
if (!shutdown_) {
shutdown_ = make_unique<Shutdown>();
@ -221,30 +242,19 @@ void Connection::HandleRequests() {
} else {
cc_.reset(service_->CreateContext(peer, this));
bool should_disarm_poller = false;
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
uint32_t poll_id = 0;
if (breaker_cb_) {
should_disarm_poller = true;
poll_id = us->PollEvent(POLLERR | POLLHUP, [&](int32_t mask) {
cc_->conn_closing = true;
if (mask > 0) {
VLOG(1) << "Got event " << mask;
breaker_cb_(mask);
}
evc_.notify(); // Notify dispatch fiber.
should_disarm_poller = false;
});
break_poll_id_ =
us->PollEvent(POLLERR | POLLHUP, [this](int32_t mask) { this->OnBreakCb(mask); });
}
ConnectionFlow(peer);
if (should_disarm_poller) {
us->CancelPoll(poll_id);
if (break_poll_id_ != kuint32max) {
us->CancelPoll(break_poll_id_);
}
cc_.reset();
}
}
@ -256,8 +266,7 @@ void Connection::RegisterOnBreak(BreakerCb breaker_cb) {
breaker_cb_ = breaker_cb;
}
void Connection::SendMsgVecAsync(const PubMessage& pub_msg,
fibers_ext::BlockingCounter bc) {
void Connection::SendMsgVecAsync(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc) {
DCHECK(cc_);
if (cc_->conn_closing) {
@ -508,6 +517,19 @@ auto Connection::ParseMemcache() -> ParserStatus {
return OK;
}
void Connection::OnBreakCb(int32_t mask) {
if (mask <= 0)
return; // we cancelled the poller, which means we do not need to break from anything.
VLOG(1) << "Got event " << mask;
CHECK(cc_);
cc_->conn_closing = true;
break_poll_id_ = kuint32max; // do not attempt to cancel it.
breaker_cb_(mask);
evc_.notify(); // Notify dispatch fiber.
}
auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, ParserStatus> {
SinkReplyBuilder* builder = cc_->reply_builder();
ConnectionStats* stats = service_->GetThreadLocalConnectionStats();

View file

@ -74,6 +74,8 @@ class Connection : public util::Connection {
protected:
void OnShutdown() override;
void OnPreMigrateThread() override;
void OnPostMigrateThread() override;
private:
enum ParserStatus { OK, NEED_MORE, ERROR };
@ -97,6 +99,7 @@ class Connection : public util::Connection {
ParserStatus ParseRedis();
ParserStatus ParseMemcache();
void OnBreakCb(int32_t mask);
base::IoBuf io_buf_;
std::unique_ptr<RedisParser> redis_parser_;
@ -123,6 +126,7 @@ class Connection : public util::Connection {
unsigned parser_error_ = 0;
uint32_t id_;
uint32_t break_poll_id_ = UINT32_MAX;
Protocol protocol_;