chore: fix wording around the dispatch fiber in dragonfly_connection (#4333)

Replace dispatch with async because we already have dispatch fiber in proactor code.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-18 11:22:55 +02:00 committed by GitHub
parent d0d720a375
commit 04e21c07da
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 42 additions and 59 deletions

View file

@ -303,20 +303,6 @@ void Connection::QueueBackpressure::EnsureBelowLimit() {
[this] { return subscriber_bytes.load(memory_order_relaxed) <= publish_buffer_limit; });
}
struct Connection::Shutdown {
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
ShutdownHandle next_handle = 1;
ShutdownHandle Add(ShutdownCb cb) {
map[next_handle] = std::move(cb);
return next_handle++;
}
void Remove(ShutdownHandle sh) {
map.erase(sh);
}
};
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
auto* next = storage.data();
for (size_t i = 0; i < args.size(); ++i) {
@ -429,8 +415,8 @@ bool Connection::MessageHandle::IsReplying() const {
!get<MCPipelineMessagePtr>(handle)->cmd.no_reply);
}
struct Connection::DispatchOperations {
DispatchOperations(SinkReplyBuilder* b, Connection* me)
struct Connection::AsyncOperations {
AsyncOperations(SinkReplyBuilder* b, Connection* me)
: stats{&tl_facade_stats->conn_stats}, builder{b}, self(me) {
}
@ -452,12 +438,12 @@ struct Connection::DispatchOperations {
Connection* self = nullptr;
};
void Connection::DispatchOperations::operator()(const MonitorMessage& msg) {
void Connection::AsyncOperations::operator()(const MonitorMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
rbuilder->SendSimpleString(msg);
}
void Connection::DispatchOperations::operator()(const AclUpdateMessage& msg) {
void Connection::AsyncOperations::operator()(const AclUpdateMessage& msg) {
if (self->cntx()) {
if (msg.username == self->cntx()->authed_username) {
self->cntx()->acl_commands = msg.commands;
@ -467,7 +453,7 @@ void Connection::DispatchOperations::operator()(const AclUpdateMessage& msg) {
}
}
void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
void Connection::AsyncOperations::operator()(const PubMessage& pub_msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
unsigned i = 0;
array<string_view, 4> arr;
@ -483,7 +469,7 @@ void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
RedisReplyBuilder::CollectionType::PUSH);
}
void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg) {
void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
DVLOG(2) << "Dispatching pipeline: " << ToSV(msg.args.front());
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()},
@ -493,24 +479,24 @@ void Connection::DispatchOperations::operator()(Connection::PipelineMessage& msg
self->skip_next_squashing_ = false;
}
void Connection::DispatchOperations::operator()(const Connection::MCPipelineMessage& msg) {
void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value,
static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
self->cc_.get());
self->last_interaction_ = time(nullptr);
}
void Connection::DispatchOperations::operator()(const MigrationRequestMessage& msg) {
void Connection::AsyncOperations::operator()(const MigrationRequestMessage& msg) {
// no-op
}
void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
void Connection::AsyncOperations::operator()(CheckpointMessage msg) {
VLOG(2) << "Decremented checkpoint at " << self->DebugInfo();
msg.bc->Dec();
}
void Connection::DispatchOperations::operator()(const InvalidationMessage& msg) {
void Connection::AsyncOperations::operator()(const InvalidationMessage& msg) {
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
DCHECK(rbuilder->IsResp3());
rbuilder->StartCollection(2, facade::RedisReplyBuilder::CollectionType::PUSH);
@ -606,7 +592,7 @@ void Connection::OnPreMigrateThread() {
migration_in_process_ = true;
socket_->CancelOnErrorCb();
DCHECK(!dispatch_fb_.IsJoinable()) << GetClientId();
DCHECK(!async_fb_.IsJoinable()) << GetClientId();
}
void Connection::OnPostMigrateThread() {
@ -617,11 +603,11 @@ void Connection::OnPostMigrateThread() {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
}
migration_in_process_ = false;
DCHECK(!dispatch_fb_.IsJoinable());
DCHECK(!async_fb_.IsJoinable());
// If someone had sent Async during the migration, we must create dispatch_fb_.
// If someone had sent Async during the migration, we must create async_fb_.
if (!dispatch_q_.empty()) {
LaunchDispatchFiberIfNeeded();
LaunchAsyncFiberIfNeeded();
}
// Update tl variables
@ -974,7 +960,7 @@ void Connection::ConnectionFlow() {
cnd_.notify_one();
phase_ = SHUTTING_DOWN;
VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
async_fb_.JoinIfNeeded();
VLOG(2) << "After dispatch_fb.join()";
phase_ = PRECLOSE;
@ -1048,6 +1034,7 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
// prefer synchronous dispatching to save memory.
optimize_for_async = false;
last_interaction_ = time(nullptr);
}
// Avoid sync dispatch if we can interleave with an ongoing async dispatch.
@ -1222,9 +1209,9 @@ void Connection::HandleMigrateRequest() {
ProactorBase* dest = migration_request_;
if (dispatch_fb_.IsJoinable()) {
if (async_fb_.IsJoinable()) {
SendAsync({MigrationRequestMessage{}});
dispatch_fb_.Join();
async_fb_.Join();
}
// We don't support migrating with subscriptions as it would require moving thread local
@ -1237,9 +1224,9 @@ void Connection::HandleMigrateRequest() {
// We need to return early as the socket is closing and IoLoop will clean up.
// The reason that this is true is because of the following DCHECK
DCHECK(!dispatch_fb_.IsJoinable());
DCHECK(!async_fb_.IsJoinable());
// which can never trigger since we Joined on the dispatch_fb_ above and we are
// which can never trigger since we Joined on the async_fb_ above and we are
// atomic in respect to our proactor meaning that no other fiber will
// launch the DispatchFiber.
if (!this->Migrate(dest)) {
@ -1336,7 +1323,7 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
return parse_status;
}
bool Connection::ShouldEndDispatchFiber(const MessageHandle& msg) {
bool Connection::ShouldEndAsyncFiber(const MessageHandle& msg) {
if (!holds_alternative<MigrationRequestMessage>(msg.handle)) {
return false;
}
@ -1402,14 +1389,14 @@ void Connection::SquashPipeline() {
}
void Connection::ClearPipelinedMessages() {
DispatchOperations dispatch_op{reply_builder_.get(), this};
AsyncOperations async_op{reply_builder_.get(), this};
// Recycle messages even from disconnecting client to keep properly track of memory stats
// As well as to avoid pubsub backpressure leakege.
for (auto& msg : dispatch_q_) {
FiberAtomicGuard guard; // don't suspend when concluding to avoid getting new messages
if (msg.IsControl())
visit(dispatch_op, msg.handle); // to not miss checkpoints
visit(async_op, msg.handle); // to not miss checkpoints
RecycleMessage(std::move(msg));
}
@ -1428,7 +1415,7 @@ std::string Connection::DebugInfo() const {
absl::StrAppend(&info, "dispatch(s/a)=", cc_->sync_dispatch, " ", cc_->async_dispatch, ", ");
absl::StrAppend(&info, "closing=", cc_->conn_closing, ", ");
}
absl::StrAppend(&info, "dispatch_fiber:joinable=", dispatch_fb_.IsJoinable(), ", ");
absl::StrAppend(&info, "dispatch_fiber:joinable=", async_fb_.IsJoinable(), ", ");
bool intrusive_front = dispatch_q_.size() > 0 && dispatch_q_.front().IsControl();
absl::StrAppend(&info, "dispatch_queue:size=", dispatch_q_.size(), ", ");
@ -1452,10 +1439,10 @@ std::string Connection::DebugInfo() const {
// into the dispatch queue and DispatchFiber will run those commands asynchronously with
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber.
void Connection::ExecutionFiber() {
ThisFiber::SetName("ExecutionFiber");
void Connection::AsyncFiber() {
ThisFiber::SetName("AsyncFiber");
DispatchOperations dispatch_op{reply_builder_.get(), this};
AsyncOperations async_op{reply_builder_.get(), this};
size_t squashing_threshold = GetFlag(FLAGS_pipeline_squash);
@ -1511,7 +1498,7 @@ void Connection::ExecutionFiber() {
reply_builder_->Flush();
}
if (ShouldEndDispatchFiber(msg)) {
if (ShouldEndAsyncFiber(msg)) {
RecycleMessage(std::move(msg));
CHECK(dispatch_q_.empty()) << DebugInfo();
queue_backpressure_->pipeline_cnd.notify_all();
@ -1519,7 +1506,7 @@ void Connection::ExecutionFiber() {
}
cc_->async_dispatch = true;
std::visit(dispatch_op, msg.handle);
std::visit(async_op, msg.handle);
cc_->async_dispatch = false;
RecycleMessage(std::move(msg));
}
@ -1608,7 +1595,7 @@ bool Connection::Migrate(util::fb2::ProactorBase* dest) {
// Migrate is only used by DFLY Thread and Flow command which both check against
// the result of Migration and handle it explicitly in their flows so this can act
// as a weak if condition instead of a crash prone CHECK.
if (dispatch_fb_.IsJoinable() || cc_->conn_closing) {
if (async_fb_.IsJoinable() || cc_->conn_closing) {
return false;
}
@ -1675,11 +1662,10 @@ void Connection::SendInvalidationMessageAsync(InvalidationMessage msg) {
SendAsync({std::move(msg)});
}
void Connection::LaunchDispatchFiberIfNeeded() {
if (!dispatch_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "[" << id_ << "] LaunchDispatchFiberIfNeeded ";
dispatch_fb_ =
fb2::Fiber(fb2::Launch::post, "connection_dispatch", [this]() { ExecutionFiber(); });
void Connection::LaunchAsyncFiberIfNeeded() {
if (!async_fb_.IsJoinable() && !migration_in_process_) {
VLOG(1) << "[" << id_ << "] LaunchAsyncFiberIfNeeded ";
async_fb_ = fb2::Fiber(fb2::Launch::post, "connection_dispatch", [this]() { AsyncFiber(); });
}
}
@ -1695,7 +1681,7 @@ void Connection::SendAsync(MessageHandle msg) {
// If we launch while closing, it won't be awaited. Control messages will be processed on cleanup.
if (!cc_->conn_closing) {
LaunchDispatchFiberIfNeeded();
LaunchAsyncFiberIfNeeded();
}
DCHECK_NE(phase_, PRECLOSE); // No more messages are processed after this point

View file

@ -67,7 +67,6 @@ class Connection : public util::Connection {
using BreakerCb = std::function<void(uint32_t)>;
using ShutdownCb = std::function<void()>;
using ShutdownHandle = unsigned;
// PubSub message, either incoming message for active subscription or reply for new subscription.
struct PubMessage {
@ -116,7 +115,7 @@ class Connection : public util::Connection {
dfly::acl::AclPubSub pub_sub;
};
// Migration request message, the dispatch fiber stops to give way for thread migration.
// Migration request message, the async fiber stops to give way for thread migration.
struct MigrationRequestMessage {};
// Checkpoint message, used to track when the connection finishes executing the current command.
@ -322,9 +321,7 @@ class Connection : public util::Connection {
private:
enum ParserStatus { OK, NEED_MORE, ERROR };
struct DispatchOperations;
struct DispatchCleanup;
struct Shutdown;
struct AsyncOperations;
// Check protocol and handle connection.
void HandleRequests() final;
@ -345,8 +342,8 @@ class Connection : public util::Connection {
void DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_cb,
absl::FunctionRef<MessageHandle()> cmd_msg_cb);
// Handles events from dispatch queue.
void ExecutionFiber();
// Handles events from the dispatch queue.
void AsyncFiber();
void SendAsync(MessageHandle msg);
@ -368,9 +365,9 @@ class Connection : public util::Connection {
PipelineMessagePtr GetFromPipelinePool();
void HandleMigrateRequest();
bool ShouldEndDispatchFiber(const MessageHandle& msg);
bool ShouldEndAsyncFiber(const MessageHandle& msg);
void LaunchDispatchFiberIfNeeded(); // Dispatch fiber is started lazily
void LaunchAsyncFiberIfNeeded(); // Async fiber is started lazily
// Squashes pipelined commands from the dispatch queue to spread load over all threads
void SquashPipeline();
@ -385,7 +382,7 @@ class Connection : public util::Connection {
std::deque<MessageHandle> dispatch_q_; // dispatch queue
util::fb2::CondVarAny cnd_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
util::fb2::Fiber async_fb_; // async fiber (if started)
uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q