From 4001a94b2286aa9526bf278eea39260706e7365d Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Tue, 8 Aug 2023 13:01:50 +0200 Subject: [PATCH] chore: Add names to fibers that were missing them (#1667) --- src/facade/dragonfly_connection.cc | 3 ++- src/server/common.cc | 2 +- src/server/db_slice.cc | 6 +++--- src/server/dflycmd.cc | 4 ++-- src/server/engine_shard_set.cc | 3 +-- src/server/journal/streamer.cc | 2 +- src/server/replica.cc | 15 +++++++++------ src/server/snapshot.cc | 2 +- 8 files changed, 20 insertions(+), 17 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 642642035..f415b0969 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -440,7 +440,8 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) { stats_ = service_->GetThreadLocalConnectionStats(); - auto dispatch_fb = MakeFiber(dfly::Launch::dispatch, [&] { DispatchFiber(peer); }); + auto dispatch_fb = + fb2::Fiber(dfly::Launch::dispatch, "connection_dispatch", [&] { DispatchFiber(peer); }); ++stats_->num_conns; ++stats_->conn_received_cnt; diff --git a/src/server/common.cc b/src/server/common.cc index 81e2315db..5fd1aaa14 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -351,7 +351,7 @@ GenericError Context::ReportErrorInternal(GenericError&& err) { CHECK(!err_handler_fb_.IsJoinable()); if (err_handler_) - err_handler_fb_ = MakeFiber(err_handler_, err_); + err_handler_fb_ = fb2::Fiber("report_internal_error", err_handler_, err_); Cancellation::Cancel(); return err_; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 7b2fc493b..ec218a277 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -544,7 +544,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { void DbSlice::FlushSlots(SlotSet slot_ids) { InvalidateSlotWatches(slot_ids); - util::MakeFiber([this, slot_ids = std::move(slot_ids)]() mutable { + fb2::Fiber("flush_slots", [this, slot_ids = std::move(slot_ids)]() mutable { FlushSlotsFb(slot_ids); }).Detach(); } @@ -578,7 +578,7 @@ void DbSlice::FlushDb(DbIndex db_ind) { mi_heap_collect(ServerState::tlocal()->data_heap(), true); }; - util::MakeFiber(std::move(cb)).Detach(); + fb2::Fiber("flush_db", std::move(cb)).Detach(); return; } @@ -598,7 +598,7 @@ void DbSlice::FlushDb(DbIndex db_ind) { } } - MakeFiber([all_dbs = std::move(all_dbs)]() mutable { + fb2::Fiber("flush_all", [all_dbs = std::move(all_dbs)]() mutable { for (auto& db : all_dbs) { db.reset(); } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0860a0b3a..2dd0b2f6c 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -482,7 +482,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); } - flow->full_sync_fb = MakeFiber(&DflyCmd::FullSyncFb, this, flow, cntx); + flow->full_sync_fb = fb2::Fiber("full_sync", &DflyCmd::FullSyncFb, this, flow, cntx); return OpStatus::OK; } @@ -565,7 +565,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx) // Spawn external fiber to allow destructing the context from outside // and return from the handler immediately. - util::MakeFiber(&DflyCmd::StopReplication, this, sync_id).Detach(); + fb2::Fiber("stop_replication", &DflyCmd::StopReplication, this, sync_id).Detach(); }; string address = cntx->owner()->RemoteEndpointAddress(); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index ca4c583d5..85f48099d 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -545,8 +545,7 @@ BlockingController* EngineShard::EnsureBlockingController() { } void EngineShard::TEST_EnableHeartbeat() { - fiber_periodic_ = MakeFiber([this, period_ms = 1] { - ThisFiber::SetName("shard_periodic_TEST"); + fiber_periodic_ = fb2::Fiber("shard_periodic_TEST", [this, period_ms = 1] { RunPeriodic(std::chrono::milliseconds(period_ms)); }); } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 0054221ee..b1b092117 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -9,7 +9,7 @@ using namespace util; void JournalStreamer::Start(io::Sink* dest) { using namespace journal; - write_fb_ = MakeFiber(&JournalStreamer::WriterFb, this, dest); + write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest); journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) { if (entry.opcode == Op::NOOP) { // No recode to write, just await if data was written so consumer will read the data. diff --git a/src/server/replica.cc b/src/server/replica.cc index 348251830..0f3557bfd 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -111,7 +111,7 @@ error_code Replica::Start(ConnectionContext* cntx) { RETURN_ON_ERR(check_connection_error(ec, "could not greet master ")); // 4. Spawn main coordination fiber. - sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); + sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this); (*cntx)->SendOk(); return {}; @@ -511,7 +511,7 @@ error_code Replica::ConsumeRedisStream() { LOG(INFO) << "Transitioned into stable sync"; facade::CmdArgVec args_vector; - acks_fb_ = MakeFiber(&Replica::RedisStreamAcksFb, this); + acks_fb_ = fb2::Fiber("redis_acks", &Replica::RedisStreamAcksFb, this); while (true) { auto response = ReadRespReply(&io_buf, /*copy_msg=*/false); @@ -645,7 +645,8 @@ error_code DflyShardReplica::StartFullSyncFlow(BlockingCounter sb, Context* cntx // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = MakeFiber(&DflyShardReplica::FullSyncDflyFb, this, std::move(eof_token), sb, cntx); + sync_fb_ = fb2::Fiber("shard_full_sync", &DflyShardReplica::FullSyncDflyFb, this, + std::move(eof_token), sb, cntx); return error_code{}; } @@ -656,9 +657,11 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) { CHECK(mythread); CHECK(Sock()->IsOpen()); - sync_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyReadFb, this, cntx); + sync_fb_ = + fb2::Fiber("shard_stable_sync_read", &DflyShardReplica::StableSyncDflyReadFb, this, cntx); if (use_multi_shard_exe_sync_) { - execution_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyExecFb, this, cntx); + execution_fb_ = + fb2::Fiber("shard_stable_sync_exec", &DflyShardReplica::StableSyncDflyExecFb, this, cntx); } return std::error_code{}; @@ -723,7 +726,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { TransactionReader tx_reader{}; if (master_context_.version > DflyVersion::VER0) { - acks_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyAcksFb, this, cntx); + acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); } while (!cntx->IsCancelled()) { diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 34209ce9e..6771cae31 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -50,7 +50,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; - snapshot_fb_ = MakeFiber([this, stream_journal, cll] { + snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] { IterateBucketsFb(cll); db_slice_->UnregisterOnChange(snapshot_version_); if (cll->IsCancelled()) {