fix(server): Start periodic fiber only once init completes (#2023)

This commit is contained in:
Shahar Mike 2023-10-15 14:56:55 +03:00 committed by GitHub
parent bcfd1863c7
commit 838d01af32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 24 additions and 15 deletions

View file

@ -108,8 +108,10 @@ class RoundRobinSharder {
} }
static void Destroy() { static void Destroy() {
round_robin_shards_.clear();
round_robin_shards_tl_cache_.clear(); round_robin_shards_tl_cache_.clear();
std::lock_guard guard(mutex_);
round_robin_shards_.clear();
} }
static bool IsEnabled() { static bool IsEnabled() {
@ -307,7 +309,7 @@ uint32_t EngineShard::DefragTask() {
return kRunAtLowPriority; return kRunAtLowPriority;
} }
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
: queue_(kQueueLen), : queue_(kQueueLen),
txq_([](const Transaction* t) { return t->txid(); }), txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap), mi_resource_(heap),
@ -317,17 +319,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
queue_.Run(); queue_.Run();
}); });
if (update_db_time) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;
fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms));
});
}
tmp_str1 = sdsempty(); tmp_str1 = sdsempty();
db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0); db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
@ -355,12 +346,23 @@ void EngineShard::Shutdown() {
ProactorBase::me()->RemoveOnIdleTask(defrag_task_); ProactorBase::me()->RemoveOnIdleTask(defrag_task_);
} }
void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
uint32_t clock_cycle_ms = 1000 / std::max<uint32_t>(1, GetFlag(FLAGS_hz));
if (clock_cycle_ms == 0)
clock_cycle_ms = 1;
fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] {
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
RunPeriodic(std::chrono::milliseconds(period_ms));
});
}
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetIndex(); CHECK(shard_ == nullptr) << pb->GetIndex();
mi_heap_t* data_heap = ServerState::tlocal()->data_heap(); mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard)); void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard));
shard_ = new (ptr) EngineShard(pb, update_db_time, data_heap); shard_ = new (ptr) EngineShard(pb, data_heap);
CompactObj::InitThreadLocal(shard_->memory_resource()); CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(data_heap); SmallString::InitThreadLocal(data_heap);
@ -380,6 +382,11 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
RoundRobinSharder::Init(); RoundRobinSharder::Init();
shard_->shard_search_indices_.reset(new ShardDocIndices()); shard_->shard_search_indices_.reset(new ShardDocIndices());
if (update_db_time) {
// Must be last, as it accesses objects initialized above.
shard_->StartPeriodicFiber(pb);
}
} }
void EngineShard::DestroyThreadLocal() { void EngineShard::DestroyThreadLocal() {

View file

@ -174,11 +174,13 @@ class EngineShard {
void ResetScanState(); void ResetScanState();
}; };
EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap); EngineShard(util::ProactorBase* pb, mi_heap_t* heap);
// blocks the calling fiber. // blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard. void Shutdown(); // called before destructing EngineShard.
void StartPeriodicFiber(util::ProactorBase* pb);
void Heartbeat(); void Heartbeat();
void RunPeriodic(std::chrono::milliseconds period_ms); void RunPeriodic(std::chrono::milliseconds period_ms);