chore: clean up TaskQueue since we do not need multiple fibers for it (#3348)

* chore: clean up TaskQueue since we do not need multiple fibers for it

Implement TaskQueue as a wrapper around FiberQueue.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-21 10:27:53 +03:00 committed by GitHub
parent 7b2603aa46
commit c46d95db2f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 22 additions and 147 deletions

View file

@ -6,7 +6,7 @@ set(SEARCH_LIB query_parser)
add_library(dfly_core bloom.cc compact_object.cc dragonfly_core.cc extent_tree.cc
interpreter.cc mi_memory_resource.cc sds_utils.cc
segment_allocator.cc score_map.cc small_string.cc sorted_map.cc
tx_queue.cc dense_set.cc allocation_tracker.cc task_queue.cc
tx_queue.cc dense_set.cc allocation_tracker.cc
string_set.cc string_map.cc detail/bitpacking.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua lua_modules

View file

@ -1,74 +0,0 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/task_queue.h"
#include <absl/strings/str_cat.h>
#include "base/logging.h"
using namespace std;
using namespace util;
namespace dfly {
TaskQueue::TaskQueue(unsigned consumer_fb_cnt, unsigned queue_size)
: queue_(queue_size),
num_consumers_(consumer_fb_cnt),
consumer_fiber_(new fb2::Fiber[consumer_fb_cnt]) {
}
void TaskQueue::Start(string_view base_name) {
CHECK_GT(num_consumers_, 0u);
for (unsigned i = 0; i < num_consumers_; ++i) {
CHECK(!consumer_fiber_[i].IsJoinable());
string name = absl::StrCat(base_name, "_", i);
consumer_fiber_[i] = fb2::Fiber(name, [this] { TaskLoop(); });
}
}
void TaskQueue::Shutdown() {
is_closed_.store(true, memory_order_seq_cst);
pull_ec_.notifyAll();
for (unsigned i = 0; i < num_consumers_; ++i) {
consumer_fiber_[i].Join();
}
consumer_fiber_.reset();
}
void TaskQueue::TaskLoop() {
bool is_closed = false;
CbFunc func;
auto cb = [&] {
if (queue_.try_dequeue(func)) {
push_ec_.notify();
return true;
}
if (is_closed_.load(std::memory_order_acquire)) {
is_closed = true;
return true;
}
return false;
};
while (true) {
pull_ec_.await(cb);
if (is_closed)
break;
try {
concurrency_level_++;
func();
concurrency_level_--; // We check-fail on exception, so it's safe to decrement here.
} catch (std::exception& e) {
LOG(FATAL) << "Exception " << e.what();
}
}
}
} // namespace dfly

View file

@ -4,103 +4,52 @@
#pragma once
#include "base/mpmc_bounded_queue.h"
#include "util/fibers/detail/result_mover.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers.h"
#include "util/fibers/synchronization.h"
namespace dfly {
/**
* MPSC task-queue that is handled by a single consumer thread.
* The design is based on helio's FiberQueue with differrence that we can run multiple
* consumer Fibers in the consumer thread to allow better CPU utilization in case the
* tasks are not CPU bound.
*
* Another difference - TaskQueue manages its consumer fibers itself.
* TODO: consider moving to util/fibers.
* The queue is just a wrapper around FiberQueue that manages its fiber itself.
*/
class TaskQueue {
public:
explicit TaskQueue(unsigned consumer_fb_cnt, unsigned queue_size = 128);
template <typename F> bool TryAdd(F&& f) {
if (queue_.try_enqueue(std::forward<F>(f))) {
pull_ec_.notify();
return true;
}
return false;
explicit TaskQueue(unsigned queue_size = 128) : queue_(queue_size) {
}
template <typename F> bool TryAdd(F&& f) {
return queue_.TryAdd(std::forward<F>(f));
}
/**
* @brief Submits a callback into the queue. Should not be called after calling Shutdown().
*
* @tparam F - callback type
* @param f - callback object
* @return true if Add() had to preempt, false is fast path without preemptions was followed.
*/
template <typename F> bool Add(F&& f) {
if (TryAdd(std::forward<F>(f))) {
return false;
}
bool result = false;
while (true) {
auto key = push_ec_.prepareWait();
if (TryAdd(std::forward<F>(f))) {
break;
}
result = true;
push_ec_.wait(key.epoch());
}
return result;
return queue_.Add(std::forward<F>(f));
}
template <typename F> auto Await(F&& f) -> decltype(f()) {
util::fb2::Done done;
using ResultType = decltype(f());
util::detail::ResultMover<ResultType> mover;
Add([&mover, f = std::forward<F>(f), done]() mutable {
mover.Apply(f);
done.Notify();
});
done.Wait();
return std::move(mover).get();
return queue_.Await(std::forward<F>(f));
}
/**
* @brief Start running consumer loop in the caller thread by spawning fibers.
* Returns immediately.
*/
void Start(std::string_view base_name);
void Start(std::string_view base_name) {
consumer_fiber_ = util::fb2::Fiber(base_name, [this] { queue_.Run(); });
}
/**
* @brief Notifies Run() function to empty the queue and to exit.
* Does not block.
* @brief Notifies Run() function to empty the queue and to exit and waits for the consumer
* fiber to finish.
*/
void Shutdown();
// Returns number of callbacks being currently called by the queue.
// Valid only from the consumer thread.
unsigned concurrency_level() const {
return concurrency_level_;
void Shutdown() {
queue_.Shutdown();
consumer_fiber_.JoinIfNeeded();
}
private:
void TaskLoop();
typedef std::function<void()> CbFunc;
using FuncQ = base::mpmc_bounded_queue<CbFunc>;
FuncQ queue_;
util::fb2::EventCount push_ec_, pull_ec_;
std::atomic_bool is_closed_{false};
unsigned num_consumers_;
std::unique_ptr<util::fb2::Fiber[]> consumer_fiber_;
unsigned concurrency_level_ = 0;
util::fb2::FiberQueue queue_;
util::fb2::Fiber consumer_fiber_;
};
} // namespace dfly

View file

@ -374,7 +374,7 @@ uint32_t EngineShard::DefragTask() {
}
EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
: queue_(1, kQueueLen),
: queue_(kQueueLen),
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
shard_id_(pb->GetPoolIndex()) {