diff --git a/helio b/helio index fb46b481e..f9e28c79d 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit fb46b481e3eb82ecbc7bbf1d22b2fda7f5fac409 +Subproject commit f9e28c79d3f9234ab0f094a7101cad8b5847c184 diff --git a/src/core/task_queue.cc b/src/core/task_queue.cc index 209ff5ffd..39fe7c236 100644 --- a/src/core/task_queue.cc +++ b/src/core/task_queue.cc @@ -4,8 +4,35 @@ #include "core/task_queue.h" +#include + +#include "base/logging.h" + +using namespace std; namespace dfly { __thread unsigned TaskQueue::blocked_submitters_ = 0; +TaskQueue::TaskQueue(unsigned queue_size, unsigned start_size, unsigned pool_max_size) + : queue_(queue_size), consumer_fibers_(start_size), pool_max_size_(pool_max_size) { + CHECK_GT(start_size, 0u); + CHECK_LE(start_size, pool_max_size); +} + +void TaskQueue::Start(std::string_view base_name) { + for (size_t i = 0; i < consumer_fibers_.size(); ++i) { + auto& fb = consumer_fibers_[i]; + CHECK(!fb.IsJoinable()); + + string name = absl::StrCat(base_name, "/", i); + fb = util::fb2::Fiber(name, [this] { queue_.Run(); }); + } +} + +void TaskQueue::Shutdown() { + queue_.Shutdown(); + for (auto& fb : consumer_fibers_) + fb.JoinIfNeeded(); +} + } // namespace dfly diff --git a/src/core/task_queue.h b/src/core/task_queue.h index 7b223bc98..4004731d4 100644 --- a/src/core/task_queue.h +++ b/src/core/task_queue.h @@ -15,8 +15,8 @@ namespace dfly { */ class TaskQueue { public: - explicit TaskQueue(unsigned queue_size = 128) : queue_(queue_size) { - } + // TODO: to add a mechanism to moderate pool size. Currently it's static with pool_start_size. + TaskQueue(unsigned queue_size, unsigned pool_start_size, unsigned pool_max_size); template bool TryAdd(F&& f) { return queue_.TryAdd(std::forward(f)); @@ -51,18 +51,13 @@ class TaskQueue { * @brief Start running consumer loop in the caller thread by spawning fibers. * Returns immediately. */ - void Start(std::string_view base_name) { - consumer_fiber_ = util::fb2::Fiber(base_name, [this] { queue_.Run(); }); - } + void Start(std::string_view base_name); /** * @brief Notifies Run() function to empty the queue and to exit and waits for the consumer * fiber to finish. */ - void Shutdown() { - queue_.Shutdown(); - consumer_fiber_.JoinIfNeeded(); - } + void Shutdown(); static unsigned blocked_submitters() { return blocked_submitters_; @@ -70,7 +65,9 @@ class TaskQueue { private: util::fb2::FiberQueue queue_; - util::fb2::Fiber consumer_fiber_; + std::vector consumer_fibers_; + unsigned pool_max_size_; + static __thread unsigned blocked_submitters_; }; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 66e75a730..013bb3f82 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -371,7 +371,8 @@ uint32_t EngineShard::DefragTask() { } EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) - : queue_(kQueueLen), + : queue_(kQueueLen, 1, 1), + queue2_(kQueueLen / 2, 2, 2), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), shard_id_(pb->GetPoolIndex()) { @@ -379,6 +380,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); queue_.Start(absl::StrCat("shard_queue_", shard_id())); + queue2_.Start(absl::StrCat("l2_queue_", shard_id())); } EngineShard::~EngineShard() { @@ -389,7 +391,7 @@ void EngineShard::Shutdown() { DVLOG(1) << "EngineShard::Shutdown"; queue_.Shutdown(); - + queue2_.Shutdown(); DCHECK(!fiber_periodic_.IsJoinable()); ProactorBase::me()->RemoveOnIdleTask(defrag_task_); diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 0bd5b3980..89b96178d 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -68,6 +68,10 @@ class EngineShard { return &queue_; } + TaskQueue* GetSecondaryQueue() { + return &queue2_; + } + // Processes TxQueue, blocked transactions or any other execution state related to that // shard. Tries executing the passed transaction if possible (does not guarantee though). void PollExecution(const char* context, Transaction* trans); @@ -223,7 +227,7 @@ class EngineShard { // return true if we did not complete the shard scan bool DoDefrag(); - TaskQueue queue_; + TaskQueue queue_, queue2_; TxQueue txq_; MiMemoryResource mi_resource_; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 63ec8f55b..0d6fa5c13 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -103,11 +103,11 @@ EngineShardSet* shard_set = nullptr; void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { CHECK_EQ(0u, size()); - shard_queue_.resize(sz); - + shards_.reset(new EngineShard*[sz]); + size_ = sz; size_t max_shard_file_size = GetTieredFileLimit(sz); - pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { - if (index < shard_queue_.size()) { + pp_->AwaitFiberOnAll([this](uint32_t index, ProactorBase* pb) { + if (index < size_) { InitThreadLocal(pb); } }); @@ -115,7 +115,7 @@ void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { namespaces.Init(); pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { - if (index < shard_queue_.size()) { + if (index < size_) { auto* shard = EngineShard::tlocal(); shard->InitTieredStorage(pb, max_shard_file_size); @@ -144,7 +144,7 @@ void EngineShardSet::Shutdown() { void EngineShardSet::InitThreadLocal(ProactorBase* pb) { EngineShard::InitThreadLocal(pb); EngineShard* es = EngineShard::tlocal(); - shard_queue_[es->shard_id()] = es->GetFiberQueue(); + shards_[es->shard_id()] = es; } void EngineShardSet::TEST_EnableCacheMode() { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index fa9916658..a49f70b48 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -45,7 +45,7 @@ class EngineShardSet { } uint32_t size() const { - return uint32_t(shard_queue_.size()); + return size_; } util::ProactorPool* pool() { @@ -63,13 +63,17 @@ class EngineShardSet { // Uses a shard queue to dispatch. Callback runs in a dedicated fiber. template auto Await(ShardId sid, F&& f) { - return shard_queue_[sid]->Await(std::forward(f)); + return shards_[sid]->GetFiberQueue()->Await(std::forward(f)); } // Uses a shard queue to dispatch. Callback runs in a dedicated fiber. template auto Add(ShardId sid, F&& f) { - assert(sid < shard_queue_.size()); - return shard_queue_[sid]->Add(std::forward(f)); + assert(sid < size_); + return shards_[sid]->GetFiberQueue()->Add(std::forward(f)); + } + + template auto AddL2(ShardId sid, F&& f) { + return shards_[sid]->GetSecondaryQueue()->Add(std::forward(f)); } // Runs a brief function on all shards. Waits for it to complete. @@ -94,8 +98,8 @@ class EngineShardSet { // The functions running inside the shard queue run atomically (sequentially) // with respect each other on the same shard. template void AwaitRunningOnShardQueue(U&& func) { - util::fb2::BlockingCounter bc(shard_queue_.size()); - for (size_t i = 0; i < shard_queue_.size(); ++i) { + util::fb2::BlockingCounter bc(size_); + for (size_t i = 0; i < size_; ++i) { Add(i, [&func, bc]() mutable { func(EngineShard::tlocal()); bc->Dec(); @@ -112,7 +116,8 @@ class EngineShardSet { void InitThreadLocal(util::ProactorBase* pb); util::ProactorPool* pp_; - std::vector shard_queue_; + std::unique_ptr shards_; + uint32_t size_ = 0; }; template