diff --git a/helio b/helio index b786e8270..1695b3241 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit b786e8270fe2a02cdc259d3ea14f93cff0b8b72b +Subproject commit 1695b324146b1b1341fa46fa3e35c694cd04e14c diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index a5d378053..1a3e97622 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -287,7 +287,7 @@ void Listener::PostShutdown() { } void Listener::OnConnectionStart(util::Connection* conn) { - unsigned id = conn->socket()->proactor()->GetIndex(); + unsigned id = conn->socket()->proactor()->GetPoolIndex(); DCHECK_LT(id, per_thread_.size()); facade::Connection* facade_conn = static_cast(conn); @@ -313,7 +313,7 @@ void Listener::OnConnectionStart(util::Connection* conn) { void Listener::OnConnectionClose(util::Connection* conn) { // TODO: We do not account for connections migrated to other threads. This is a rare case. - unsigned id = conn->socket()->proactor()->GetIndex(); + unsigned id = conn->socket()->proactor()->GetPoolIndex(); DCHECK_LT(id, per_thread_.size()); auto& pth = per_thread_[id]; diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 0b357da6d..d9d91f323 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -141,7 +141,7 @@ vector ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, auto& sinfo = *conn->conn_state.subscribe_info.get(); auto& local_store = pattern ? sinfo.patterns : sinfo.channels; - int32_t tid = util::ProactorBase::GetIndex(); + int32_t tid = util::ProactorBase::me()->GetPoolIndex(); DCHECK_GE(tid, 0); ChannelStoreUpdater csu{pattern, to_add, conn, uint32_t(tid)}; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 28be2dbb3..edb7a0397 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -217,7 +217,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { if (args.size() == 1) { // DFLY THREAD : returns connection thread index and number of threads. rb->StartArray(2); - rb->SendLong(ProactorBase::GetIndex()); + rb->SendLong(ProactorBase::me()->GetPoolIndex()); rb->SendLong(long(pool->size())); return; } @@ -230,7 +230,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) { } if (num_thread < pool->size()) { - if (int(num_thread) != ProactorBase::GetIndex()) { + if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) { cntx->conn()->Migrate(pool->at(num_thread)); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 8d9d336f0..c5f6a3797 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -317,8 +317,8 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), - db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { - fiber_q_ = MakeFiber([this, index = pb->GetIndex()] { + db_slice_(pb->GetPoolIndex(), GetFlag(FLAGS_cache_mode), this) { + fiber_q_ = MakeFiber([this, index = pb->GetPoolIndex()] { ThisFiber::SetName(absl::StrCat("shard_queue", index)); queue_.Run(); }); @@ -355,14 +355,14 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) { if (clock_cycle_ms == 0) clock_cycle_ms = 1; - fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] { + fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms] { ThisFiber::SetName(absl::StrCat("shard_periodic", index)); RunPeriodic(std::chrono::milliseconds(period_ms)); }); } void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { - CHECK(shard_ == nullptr) << pb->GetIndex(); + CHECK(shard_ == nullptr) << pb->GetPoolIndex(); mi_heap_t* data_heap = ServerState::tlocal()->data_heap(); void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard)); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 07e190520..48a2ce9d1 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -29,7 +29,7 @@ Journal::Journal() { } void Journal::StartInThread() { - journal_slice.Init(unsigned(ProactorBase::GetIndex())); + journal_slice.Init(unsigned(ProactorBase::me()->GetPoolIndex())); ServerState::tlocal()->set_journal(this); EngineShard* shard = EngineShard::tlocal(); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ccec59e67..68cf13e6d 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -251,8 +251,8 @@ void SendMonitor(const std::string& msg) { const auto& monitor_repo = ServerState::tlocal()->Monitors(); const auto& monitors = monitor_repo.monitors(); if (!monitors.empty()) { - VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg - << "' for " << monitors.size(); + VLOG(1) << "thread " << ProactorBase::me()->GetPoolIndex() << " sending monitor message '" + << msg << "' for " << monitors.size(); for (auto monitor_conn : monitors) { // never preempts, so we can iterate safely. @@ -1815,8 +1815,8 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret }); if (*sid != ServerState::tlocal()->thread_index()) { - VLOG(1) << "Migrating connection " << cntx->conn() << " from " << ProactorBase::GetIndex() - << " to " << *sid; + VLOG(1) << "Migrating connection " << cntx->conn() << " from " + << ProactorBase::me()->GetPoolIndex() << " to " << *sid; cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid)); } } else { diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 92b70c3d7..e53fcab0f 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -37,7 +37,7 @@ ServerState::Stats& ServerState::Stats::operator+=(const ServerState::Stats& oth void MonitorsRepo::Add(facade::Connection* connection) { VLOG(1) << "register connection " << " at address 0x" << std::hex << (const void*)connection << " for thread " - << util::ProactorBase::GetIndex(); + << util::ProactorBase::me()->GetPoolIndex(); monitors_.push_back(connection); } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index c72f25477..bf59c52ea 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -176,7 +176,7 @@ void SliceSnapshot::Join() { // Serializes all the entries with version less than snapshot_version_. void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { { - auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex()); + auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex()); ThisFiber::SetName(std::move(fiber_name)); } diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 26eb3302b..26e3d6909 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -539,7 +539,7 @@ bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const { } string BaseFamilyTest::GetId() const { - int32 id = ProactorBase::GetIndex(); + int32 id = ProactorBase::me()->GetPoolIndex(); CHECK_GE(id, 0); return absl::StrCat("IO", id); }