chore: Add names to fibers that were missing them (#1667)

This commit is contained in:
Roy Jacobson 2023-08-08 13:01:50 +02:00 committed by GitHub
parent 16e512c60d
commit 4001a94b22
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 20 additions and 17 deletions

View file

@ -440,7 +440,8 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
void Connection::ConnectionFlow(FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) {
stats_ = service_->GetThreadLocalConnectionStats(); stats_ = service_->GetThreadLocalConnectionStats();
auto dispatch_fb = MakeFiber(dfly::Launch::dispatch, [&] { DispatchFiber(peer); }); auto dispatch_fb =
fb2::Fiber(dfly::Launch::dispatch, "connection_dispatch", [&] { DispatchFiber(peer); });
++stats_->num_conns; ++stats_->num_conns;
++stats_->conn_received_cnt; ++stats_->conn_received_cnt;

View file

@ -351,7 +351,7 @@ GenericError Context::ReportErrorInternal(GenericError&& err) {
CHECK(!err_handler_fb_.IsJoinable()); CHECK(!err_handler_fb_.IsJoinable());
if (err_handler_) if (err_handler_)
err_handler_fb_ = MakeFiber(err_handler_, err_); err_handler_fb_ = fb2::Fiber("report_internal_error", err_handler_, err_);
Cancellation::Cancel(); Cancellation::Cancel();
return err_; return err_;

View file

@ -544,7 +544,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
void DbSlice::FlushSlots(SlotSet slot_ids) { void DbSlice::FlushSlots(SlotSet slot_ids) {
InvalidateSlotWatches(slot_ids); InvalidateSlotWatches(slot_ids);
util::MakeFiber([this, slot_ids = std::move(slot_ids)]() mutable { fb2::Fiber("flush_slots", [this, slot_ids = std::move(slot_ids)]() mutable {
FlushSlotsFb(slot_ids); FlushSlotsFb(slot_ids);
}).Detach(); }).Detach();
} }
@ -578,7 +578,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
mi_heap_collect(ServerState::tlocal()->data_heap(), true); mi_heap_collect(ServerState::tlocal()->data_heap(), true);
}; };
util::MakeFiber(std::move(cb)).Detach(); fb2::Fiber("flush_db", std::move(cb)).Detach();
return; return;
} }
@ -598,7 +598,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
} }
} }
MakeFiber([all_dbs = std::move(all_dbs)]() mutable { fb2::Fiber("flush_all", [all_dbs = std::move(all_dbs)]() mutable {
for (auto& db : all_dbs) { for (auto& db : all_dbs) {
db.reset(); db.reset();
} }

View file

@ -482,7 +482,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
} }
flow->full_sync_fb = MakeFiber(&DflyCmd::FullSyncFb, this, flow, cntx); flow->full_sync_fb = fb2::Fiber("full_sync", &DflyCmd::FullSyncFb, this, flow, cntx);
return OpStatus::OK; return OpStatus::OK;
} }
@ -565,7 +565,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx)
// Spawn external fiber to allow destructing the context from outside // Spawn external fiber to allow destructing the context from outside
// and return from the handler immediately. // and return from the handler immediately.
util::MakeFiber(&DflyCmd::StopReplication, this, sync_id).Detach(); fb2::Fiber("stop_replication", &DflyCmd::StopReplication, this, sync_id).Detach();
}; };
string address = cntx->owner()->RemoteEndpointAddress(); string address = cntx->owner()->RemoteEndpointAddress();

View file

@ -545,8 +545,7 @@ BlockingController* EngineShard::EnsureBlockingController() {
} }
void EngineShard::TEST_EnableHeartbeat() { void EngineShard::TEST_EnableHeartbeat() {
fiber_periodic_ = MakeFiber([this, period_ms = 1] { fiber_periodic_ = fb2::Fiber("shard_periodic_TEST", [this, period_ms = 1] {
ThisFiber::SetName("shard_periodic_TEST");
RunPeriodic(std::chrono::milliseconds(period_ms)); RunPeriodic(std::chrono::milliseconds(period_ms));
}); });
} }

View file

@ -9,7 +9,7 @@ using namespace util;
void JournalStreamer::Start(io::Sink* dest) { void JournalStreamer::Start(io::Sink* dest) {
using namespace journal; using namespace journal;
write_fb_ = MakeFiber(&JournalStreamer::WriterFb, this, dest); write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest);
journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) { journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) {
if (entry.opcode == Op::NOOP) { if (entry.opcode == Op::NOOP) {
// No recode to write, just await if data was written so consumer will read the data. // No recode to write, just await if data was written so consumer will read the data.

View file

@ -111,7 +111,7 @@ error_code Replica::Start(ConnectionContext* cntx) {
RETURN_ON_ERR(check_connection_error(ec, "could not greet master ")); RETURN_ON_ERR(check_connection_error(ec, "could not greet master "));
// 4. Spawn main coordination fiber. // 4. Spawn main coordination fiber.
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this);
(*cntx)->SendOk(); (*cntx)->SendOk();
return {}; return {};
@ -511,7 +511,7 @@ error_code Replica::ConsumeRedisStream() {
LOG(INFO) << "Transitioned into stable sync"; LOG(INFO) << "Transitioned into stable sync";
facade::CmdArgVec args_vector; facade::CmdArgVec args_vector;
acks_fb_ = MakeFiber(&Replica::RedisStreamAcksFb, this); acks_fb_ = fb2::Fiber("redis_acks", &Replica::RedisStreamAcksFb, this);
while (true) { while (true) {
auto response = ReadRespReply(&io_buf, /*copy_msg=*/false); auto response = ReadRespReply(&io_buf, /*copy_msg=*/false);
@ -645,7 +645,8 @@ error_code DflyShardReplica::StartFullSyncFlow(BlockingCounter sb, Context* cntx
// We can not discard io_buf because it may contain data // We can not discard io_buf because it may contain data
// besides the response we parsed. Therefore we pass it further to ReplicateDFFb. // besides the response we parsed. Therefore we pass it further to ReplicateDFFb.
sync_fb_ = MakeFiber(&DflyShardReplica::FullSyncDflyFb, this, std::move(eof_token), sb, cntx); sync_fb_ = fb2::Fiber("shard_full_sync", &DflyShardReplica::FullSyncDflyFb, this,
std::move(eof_token), sb, cntx);
return error_code{}; return error_code{};
} }
@ -656,9 +657,11 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) {
CHECK(mythread); CHECK(mythread);
CHECK(Sock()->IsOpen()); CHECK(Sock()->IsOpen());
sync_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyReadFb, this, cntx); sync_fb_ =
fb2::Fiber("shard_stable_sync_read", &DflyShardReplica::StableSyncDflyReadFb, this, cntx);
if (use_multi_shard_exe_sync_) { if (use_multi_shard_exe_sync_) {
execution_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyExecFb, this, cntx); execution_fb_ =
fb2::Fiber("shard_stable_sync_exec", &DflyShardReplica::StableSyncDflyExecFb, this, cntx);
} }
return std::error_code{}; return std::error_code{};
@ -723,7 +726,7 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
TransactionReader tx_reader{}; TransactionReader tx_reader{};
if (master_context_.version > DflyVersion::VER0) { if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyAcksFb, this, cntx); acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
} }
while (!cntx->IsCancelled()) { while (!cntx->IsCancelled()) {

View file

@ -50,7 +50,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
snapshot_fb_ = MakeFiber([this, stream_journal, cll] { snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] {
IterateBucketsFb(cll); IterateBucketsFb(cll);
db_slice_->UnregisterOnChange(snapshot_version_); db_slice_->UnregisterOnChange(snapshot_version_);
if (cll->IsCancelled()) { if (cll->IsCancelled()) {