diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 39725001d..19b880e3f 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -303,20 +303,6 @@ void Connection::QueueBackpressure::EnsureBelowLimit() { [this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; }); } -struct Connection::Shutdown { - absl::flat_hash_map map; - ShutdownHandle next_handle = 1; - - ShutdownHandle Add(ShutdownCb cb) { - map[next_handle] = std::move(cb); - return next_handle++; - } - - void Remove(ShutdownHandle sh) { - map.erase(sh); - } -}; - void Connection::PipelineMessage::SetArgs(const RespVec& args) { auto* next = storage.data(); for (size_t i = 0; i < args.size(); ++i) { @@ -429,8 +415,8 @@ bool Connection::MessageHandle::IsReplying() const { !get(handle)->cmd.no_reply); } -struct Connection::DispatchOperations { - DispatchOperations(SinkReplyBuilder* b, Connection* me) +struct Connection::AsyncOperations { + AsyncOperations(SinkReplyBuilder* b, Connection* me) : stats{&tl_facade_stats->conn_stats}, builder{b}, self(me) { } @@ -452,12 +438,12 @@ struct Connection::DispatchOperations { Connection* self = nullptr; }; -void Connection::DispatchOperations::operator()(const MonitorMessage& msg) { +void Connection::AsyncOperations::operator()(const MonitorMessage& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; rbuilder->SendSimpleString(msg); } -void Connection::DispatchOperations::operator()(const AclUpdateMessage& msg) { +void Connection::AsyncOperations::operator()(const AclUpdateMessage& msg) { if (self->cntx()) { if (msg.username == self->cntx()->authed_username) { self->cntx()->acl_commands = msg.commands; @@ -467,7 +453,7 @@ void Connection::DispatchOperations::operator()(const AclUpdateMessage& msg) { } } -void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { +void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; unsigned i = 0; array arr; @@ -483,7 +469,7 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { RedisReplyBuilder::CollectionType::PUSH); } -void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) { +void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) { DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front()); self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, @@ -493,24 +479,24 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg self->skip_next_squashing_ = false; } -void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) { +void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) { self->service_->DispatchMC(msg.cmd, msg.value, static_cast(self->reply_builder_.get()), self->cc_.get()); self->last_interaction_ = time(nullptr); } -void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) { +void Connection::AsyncOperations::operator()(const MigrationRequestMessage& msg) { // no-op } -void Connection::DispatchOperations::operator()(CheckpointMessage msg) { +void Connection::AsyncOperations::operator()(CheckpointMessage msg) { VLOG(2) << "Decremented checkpoint at " << self->DebugInfo(); msg.bc->Dec(); } -void Connection::DispatchOperations::operator()(const InvalidationMessage& msg) { +void Connection::AsyncOperations::operator()(const InvalidationMessage& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; DCHECK(rbuilder->IsResp3()); rbuilder->StartCollection(2, facade::RedisReplyBuilder::CollectionType::PUSH); @@ -606,7 +592,7 @@ void Connection::OnPreMigrateThread() { migration_in_process_ = true; socket_->CancelOnErrorCb(); - DCHECK(!dispatch_fb_.IsJoinable()) << GetClientId(); + DCHECK(!async_fb_.IsJoinable()) << GetClientId(); } void Connection::OnPostMigrateThread() { @@ -617,11 +603,11 @@ void Connection::OnPostMigrateThread() { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); } migration_in_process_ = false; - DCHECK(!dispatch_fb_.IsJoinable()); + DCHECK(!async_fb_.IsJoinable()); - // If someone had sent Async during the migration, we must create dispatch_fb_. + // If someone had sent Async during the migration, we must create async_fb_. if (!dispatch_q_.empty()) { - LaunchDispatchFiberIfNeeded(); + LaunchAsyncFiberIfNeeded(); } // Update tl variables @@ -974,7 +960,7 @@ void Connection::ConnectionFlow() { cnd_.notify_one(); phase_ = SHUTTING_DOWN; VLOG(2) << "Before dispatch_fb.join()"; - dispatch_fb_.JoinIfNeeded(); + async_fb_.JoinIfNeeded(); VLOG(2) << "After dispatch_fb.join()"; phase_ = PRECLOSE; @@ -1048,6 +1034,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ // prefer synchronous dispatching to save memory. optimize_for_async = false; + last_interaction_ = time(nullptr); } // Avoid sync dispatch if we can interleave with an ongoing async dispatch. @@ -1222,9 +1209,9 @@ void Connection::HandleMigrateRequest() { ProactorBase* dest = migration_request_; - if (dispatch_fb_.IsJoinable()) { + if (async_fb_.IsJoinable()) { SendAsync({MigrationRequestMessage{}}); - dispatch_fb_.Join(); + async_fb_.Join(); } // We don't support migrating with subscriptions as it would require moving thread local @@ -1237,9 +1224,9 @@ void Connection::HandleMigrateRequest() { // We need to return early as the socket is closing and IoLoop will clean up. // The reason that this is true is because of the following DCHECK - DCHECK(!dispatch_fb_.IsJoinable()); + DCHECK(!async_fb_.IsJoinable()); - // which can never trigger since we Joined on the dispatch_fb_ above and we are + // which can never trigger since we Joined on the async_fb_ above and we are // atomic in respect to our proactor meaning that no other fiber will // launch the DispatchFiber. if (!this->Migrate(dest)) { @@ -1336,7 +1323,7 @@ auto Connection::IoLoop() -> variant { return parse_status; } -bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) { +bool Connection::ShouldEndAsyncFiber(const MessageHandle& msg) { if (!holds_alternative(msg.handle)) { return false; } @@ -1402,14 +1389,14 @@ void Connection::SquashPipeline() { } void Connection::ClearPipelinedMessages() { - DispatchOperations dispatch_op{reply_builder_.get(), this}; + AsyncOperations async_op{reply_builder_.get(), this}; // Recycle messages even from disconnecting client to keep properly track of memory stats // As well as to avoid pubsub backpressure leakege. for (auto& msg : dispatch_q_) { FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages if (msg.IsControl()) - visit(dispatch_op, msg.handle); // to not miss checkpoints + visit(async_op, msg.handle); // to not miss checkpoints RecycleMessage(std::move(msg)); } @@ -1428,7 +1415,7 @@ std::string Connection::DebugInfo() const { absl::StrAppend(&info, "dispatch(s/a)=", cc_->sync_dispatch, " ", cc_->async_dispatch, ", "); absl::StrAppend(&info, "closing=", cc_->conn_closing, ", "); } - absl::StrAppend(&info, "dispatch_fiber:joinable=", dispatch_fb_.IsJoinable(), ", "); + absl::StrAppend(&info, "dispatch_fiber:joinable=", async_fb_.IsJoinable(), ", "); bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsControl(); absl::StrAppend(&info, "dispatch_queue:size=", dispatch_q_.size(), ", "); @@ -1452,10 +1439,10 @@ std::string Connection::DebugInfo() const { // into the dispatch queue and DispatchFiber will run those commands asynchronously with // InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the // DispatchFiber. -void Connection::ExecutionFiber() { - ThisFiber::SetName("ExecutionFiber"); +void Connection::AsyncFiber() { + ThisFiber::SetName("AsyncFiber"); - DispatchOperations dispatch_op{reply_builder_.get(), this}; + AsyncOperations async_op{reply_builder_.get(), this}; size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash); @@ -1511,7 +1498,7 @@ void Connection::ExecutionFiber() { reply_builder_->Flush(); } - if (ShouldEndDispatchFiber(msg)) { + if (ShouldEndAsyncFiber(msg)) { RecycleMessage(std::move(msg)); CHECK(dispatch_q_.empty()) << DebugInfo(); queue_backpressure_->pipeline_cnd.notify_all(); @@ -1519,7 +1506,7 @@ void Connection::ExecutionFiber() { } cc_->async_dispatch = true; - std::visit(dispatch_op, msg.handle); + std::visit(async_op, msg.handle); cc_->async_dispatch = false; RecycleMessage(std::move(msg)); } @@ -1608,7 +1595,7 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) { // Migrate is only used by DFLY Thread and Flow command which both check against // the result of Migration and handle it explicitly in their flows so this can act // as a weak if condition instead of a crash prone CHECK. - if (dispatch_fb_.IsJoinable() || cc_->conn_closing) { + if (async_fb_.IsJoinable() || cc_->conn_closing) { return false; } @@ -1675,11 +1662,10 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) { SendAsync({std::move(msg)}); } -void Connection::LaunchDispatchFiberIfNeeded() { - if (!dispatch_fb_.IsJoinable() && !migration_in_process_) { - VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded "; - dispatch_fb_ = - fb2::Fiber(fb2::Launch::post, "connection_dispatch", [this]() { ExecutionFiber(); }); +void Connection::LaunchAsyncFiberIfNeeded() { + if (!async_fb_.IsJoinable() && !migration_in_process_) { + VLOG(1) << "[" << id_ << "] LaunchAsyncFiberIfNeeded "; + async_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", [this]() { AsyncFiber(); }); } } @@ -1695,7 +1681,7 @@ void Connection::SendAsync(MessageHandle msg) { // If we launch while closing, it won't be awaited. Control messages will be processed on cleanup. if (!cc_->conn_closing) { - LaunchDispatchFiberIfNeeded(); + LaunchAsyncFiberIfNeeded(); } DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 401bbfe14..3eced43ae 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -67,7 +67,6 @@ class Connection : public util::Connection { using BreakerCb = std::function; using ShutdownCb = std::function; - using ShutdownHandle = unsigned; // PubSub message, either incoming message for active subscription or reply for new subscription. struct PubMessage { @@ -116,7 +115,7 @@ class Connection : public util::Connection { dfly::acl::AclPubSub pub_sub; }; - // Migration request message, the dispatch fiber stops to give way for thread migration. + // Migration request message, the async fiber stops to give way for thread migration. struct MigrationRequestMessage {}; // Checkpoint message, used to track when the connection finishes executing the current command. @@ -322,9 +321,7 @@ class Connection : public util::Connection { private: enum ParserStatus { OK, NEED_MORE, ERROR }; - struct DispatchOperations; - struct DispatchCleanup; - struct Shutdown; + struct AsyncOperations; // Check protocol and handle connection. void HandleRequests() final; @@ -345,8 +342,8 @@ class Connection : public util::Connection { void DispatchSingle(bool has_more, absl::FunctionRef invoke_cb, absl::FunctionRef cmd_msg_cb); - // Handles events from dispatch queue. - void ExecutionFiber(); + // Handles events from the dispatch queue. + void AsyncFiber(); void SendAsync(MessageHandle msg); @@ -368,9 +365,9 @@ class Connection : public util::Connection { PipelineMessagePtr GetFromPipelinePool(); void HandleMigrateRequest(); - bool ShouldEndDispatchFiber(const MessageHandle& msg); + bool ShouldEndAsyncFiber(const MessageHandle& msg); - void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily + void LaunchAsyncFiberIfNeeded(); // Async fiber is started lazily // Squashes pipelined commands from the dispatch queue to spread load over all threads void SquashPipeline(); @@ -385,7 +382,7 @@ class Connection : public util::Connection { std::deque dispatch_q_; // dispatch queue util::fb2::CondVarAny cnd_; // dispatch queue waker - util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started) + util::fb2::Fiber async_fb_; // async fiber (if started) uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q