diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1bd9ff651..481d11981 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -108,8 +108,10 @@ class RoundRobinSharder { } static void Destroy() { - round_robin_shards_.clear(); round_robin_shards_tl_cache_.clear(); + + std::lock_guard guard(mutex_); + round_robin_shards_.clear(); } static bool IsEnabled() { @@ -307,7 +309,7 @@ uint32_t EngineShard::DefragTask() { return kRunAtLowPriority; } -EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) +EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), @@ -317,17 +319,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* queue_.Run(); }); - if (update_db_time) { - uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); - if (clock_cycle_ms == 0) - clock_cycle_ms = 1; - - fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] { - ThisFiber::SetName(absl::StrCat("shard_periodic", index)); - RunPeriodic(std::chrono::milliseconds(period_ms)); - }); - } - tmp_str1 = sdsempty(); db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); @@ -355,12 +346,23 @@ void EngineShard::Shutdown() { ProactorBase::me()->RemoveOnIdleTask(defrag_task_); } +void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) { + uint32_t clock_cycle_ms = 1000 / std::max(1, GetFlag(FLAGS_hz)); + if (clock_cycle_ms == 0) + clock_cycle_ms = 1; + + fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] { + ThisFiber::SetName(absl::StrCat("shard_periodic", index)); + RunPeriodic(std::chrono::milliseconds(period_ms)); + }); +} + void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { CHECK(shard_ == nullptr) << pb->GetIndex(); mi_heap_t* data_heap = ServerState::tlocal()->data_heap(); void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard)); - shard_ = new (ptr) EngineShard(pb, update_db_time, data_heap); + shard_ = new (ptr) EngineShard(pb, data_heap); CompactObj::InitThreadLocal(shard_->memory_resource()); SmallString::InitThreadLocal(data_heap); @@ -380,6 +382,11 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { RoundRobinSharder::Init(); shard_->shard_search_indices_.reset(new ShardDocIndices()); + + if (update_db_time) { + // Must be last, as it accesses objects initialized above. + shard_->StartPeriodicFiber(pb); + } } void EngineShard::DestroyThreadLocal() { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 0464ea6dc..0968ca171 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -174,11 +174,13 @@ class EngineShard { void ResetScanState(); }; - EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); + EngineShard(util::ProactorBase* pb, mi_heap_t* heap); // blocks the calling fiber. void Shutdown(); // called before destructing EngineShard. + void StartPeriodicFiber(util::ProactorBase* pb); + void Heartbeat(); void RunPeriodic(std::chrono::milliseconds period_ms);