diff --git a/src/core/fibers.h b/src/core/fibers.h index 90050cea6..287249dcd 100644 --- a/src/core/fibers.h +++ b/src/core/fibers.h @@ -5,10 +5,12 @@ // An import header that centralizes all the imports from helio project regarding fibers +#include "util/fiber_sched_algo.h" #include "util/fibers/event_count.h" #include "util/fibers/fiber.h" #include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/fibers_ext.h" +#include "util/fibers/simple_channel.h" namespace dfly { @@ -18,10 +20,12 @@ using util::fibers_ext::Done; using util::fibers_ext::EventCount; using util::fibers_ext::Fiber; using util::fibers_ext::FiberQueue; +using util::fibers_ext::FiberQueueThreadPool; using util::fibers_ext::Future; using util::fibers_ext::Launch; using util::fibers_ext::Mutex; using util::fibers_ext::Promise; +using util::fibers_ext::SimpleChannel; using CondVar = ::boost::fibers::condition_variable; } // namespace dfly diff --git a/src/core/uring.h b/src/core/uring.h new file mode 100644 index 000000000..aaa230ed7 --- /dev/null +++ b/src/core/uring.h @@ -0,0 +1,17 @@ +// Copyright 2023, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "util/uring/proactor.h" +#include "util/uring/uring_file.h" + +namespace dfly { + +using util::uring::FiberCall; +using util::uring::LinuxFile; +using util::uring::OpenLinux; +using util::uring::OpenRead; + +} // namespace dfly diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index bd8d4ec9d..5eda4e6af 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -14,8 +14,6 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" -#include "util/fiber_sched_algo.h" -#include "util/fibers/fiber.h" #ifdef DFLY_USE_SSL #include "util/tls/tls_socket.h" @@ -393,7 +391,7 @@ void Connection::UnregisterShutdownHook(ShutdownHandle id) { } void Connection::HandleRequests() { - FiberProps::SetName("DflyConnection"); + ThisFiber::SetName("DflyConnection"); LinuxSocketBase* lsb = static_cast(socket_.get()); @@ -841,7 +839,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantreply_builder(); DispatchOperations dispatch_op{builder, this}; diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index 0dc7b1bfd..627c857f7 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -46,6 +46,8 @@ CONFIG_enum(tls_auth_clients, "yes", "", tls_auth_clients_enum, tls_auth_clients namespace facade { using namespace util; +using util::detail::SafeErrorMessage; + using absl::GetFlag; namespace { @@ -152,7 +154,7 @@ error_code Listener::ConfigureServerSocket(int fd) { constexpr int kInterval = 300; // 300 seconds is ok to start checking for liveness. if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)) < 0) { - LOG(WARNING) << "Could not set reuse addr on socket " << detail::SafeErrorMessage(errno); + LOG(WARNING) << "Could not set reuse addr on socket " << SafeErrorMessage(errno); } bool success = ConfigureKeepAlive(fd, kInterval); @@ -165,7 +167,7 @@ error_code Listener::ConfigureServerSocket(int fd) { // Ignore the error on UDS. if (getsockopt(fd, SOL_SOCKET, SO_DOMAIN, &socket_type, &length) != 0 || socket_type != AF_UNIX) { - LOG(WARNING) << "Could not configure keep alive " << detail::SafeErrorMessage(myerr); + LOG(WARNING) << "Could not configure keep alive " << SafeErrorMessage(myerr); } } diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index 0f3735ced..0ea514f63 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -7,7 +7,6 @@ #include #include "facade/facade_types.h" -#include "util/fibers/fiber.h" #include "util/http/http_handler.h" #include "util/listener_interface.h" diff --git a/src/server/common.cc b/src/server/common.cc index dda6f1d19..2475f3f21 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -27,6 +27,7 @@ extern "C" { namespace dfly { using namespace std; +using namespace util; atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_current(0); @@ -311,7 +312,7 @@ GenericError Context::ReportErrorInternal(GenericError&& err) { CHECK(!err_handler_fb_.IsJoinable()); if (err_handler_) - err_handler_fb_ = Fiber{err_handler_, err_}; + err_handler_fb_ = MakeFiber(err_handler_, err_); Cancellation::Cancel(); return err_; diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index d8d526c14..6faccbcf3 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -10,7 +10,6 @@ #include "server/server_family.h" #include "server/server_state.h" #include "src/facade/dragonfly_connection.h" -#include "util/proactor_base.h" namespace dfly { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 1863d8151..f18e7b7f4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -13,9 +13,6 @@ extern "C" { #include "server/journal/journal.h" #include "server/server_state.h" #include "server/tiered_storage.h" -#include "util/fiber_sched_algo.h" -#include "util/fibers/fiber.h" -#include "util/proactor_base.h" namespace dfly { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 9f27ca534..0cdeaf4a9 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -11,10 +11,6 @@ #include "server/conn_context.h" #include "server/table.h" -namespace util { -class ProactorBase; -} - namespace dfly { using facade::OpResult; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 779598ddb..575a66b4f 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -19,8 +19,6 @@ #include "server/server_state.h" #include "server/string_family.h" #include "server/transaction.h" -#include "util/fiber_sched_algo.h" -#include "util/fibers/fiber.h" using namespace std; @@ -314,7 +312,7 @@ void DebugCmd::Populate(CmdArgList args) { void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, unsigned value_len, bool populate_random_values) { - FiberProps::SetName("populate_range"); + ThisFiber::SetName("populate_range"); VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1); string key = absl::StrCat(prefix, ":"); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index c3b620fa0..91e100023 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -383,7 +383,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); } - flow->full_sync_fb = Fiber(&DflyCmd::FullSyncFb, this, flow, cntx); + flow->full_sync_fb = MakeFiber(&DflyCmd::FullSyncFb, this, flow, cntx); return OpStatus::OK; } diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 3ab16b9d1..84a15b853 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -10,7 +10,6 @@ #include #include "server/conn_context.h" -#include "util/fibers/fiber.h" namespace facade { class RedisReplyBuilder; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 3412051d9..4a4c7ef0d 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -15,7 +15,6 @@ extern "C" { #include "server/server_state.h" #include "server/tiered_storage.h" #include "server/transaction.h" -#include "util/fiber_sched_algo.h" #include "util/varz.h" using namespace std; @@ -532,7 +531,7 @@ BlockingController* EngineShard::EnsureBlockingController() { void EngineShard::TEST_EnableHeartbeat() { fiber_periodic_ = MakeFiber([this, period_ms = 1] { - FiberProps::SetName("shard_periodic_TEST"); + ThisFiber::SetName("shard_periodic_TEST"); RunPeriodic(std::chrono::milliseconds(period_ms)); }); } diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index bd0197af6..b3883db4b 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -10,7 +10,6 @@ #include "base/flags.h" #include "base/logging.h" #include "facade/facade_types.h" -#include "util/uring/proactor.h" ABSL_FLAG(bool, backing_file_direct, false, "If true uses O_DIRECT to open backing files"); @@ -19,8 +18,12 @@ namespace dfly { using namespace std; using namespace util; using namespace facade; -using uring::FiberCall; + +#ifdef USE_FB2 +#else using uring::Proactor; +using uring::SubmitEntry; +#endif namespace { @@ -44,13 +47,13 @@ error_code IoMgr::Open(const string& path) { if (absl::GetFlag(FLAGS_backing_file_direct)) { kFlags |= O_DIRECT; } - auto res = uring::OpenLinux(path, kFlags, 0666); + auto res = OpenLinux(path, kFlags, 0666); if (!res) return res.error(); backing_file_ = move(res.value()); Proactor* proactor = (Proactor*)ProactorBase::me(); { - uring::FiberCall fc(proactor); + FiberCall fc(proactor); fc->PrepFallocate(backing_file_->fd(), 0, 0, kInitialSize); FiberCall::IoResult io_res = fc.Get(); if (io_res < 0) { @@ -58,7 +61,7 @@ error_code IoMgr::Open(const string& path) { } } { - uring::FiberCall fc(proactor); + FiberCall fc(proactor); fc->PrepFadvise(backing_file_->fd(), 0, 0, POSIX_FADV_RANDOM); FiberCall::IoResult io_res = fc.Get(); if (io_res < 0) { @@ -78,7 +81,7 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) { Proactor* proactor = (Proactor*)ProactorBase::me(); - uring::SubmitEntry entry = proactor->GetSubmitEntry( + SubmitEntry entry = proactor->GetSubmitEntry( [this, cb = move(cb)](Proactor::IoResult res, uint32_t, int64_t arg) { this->flags.grow_progress = 0; sz_ += (res == 0 ? arg : 0); @@ -102,7 +105,7 @@ error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) { cb(res); }; - uring::SubmitEntry se = proactor->GetSubmitEntry(move(ring_cb), 0); + SubmitEntry se = proactor->GetSubmitEntry(move(ring_cb), 0); se.PrepWrite(backing_file_->fd(), blob.data(), blob.size(), offset); return error_code{}; diff --git a/src/server/io_mgr.h b/src/server/io_mgr.h index 61de32525..99fa55bbe 100644 --- a/src/server/io_mgr.h +++ b/src/server/io_mgr.h @@ -7,14 +7,15 @@ #include #include -#include "util/uring/uring_file.h" +#include "core/uring.h" namespace dfly { class IoMgr { public: // first arg - io result. - // using WriteCb = fu2::function_base; + // using WriteCb = fu2::function_base; using WriteCb = std::function; // (io_res, ) @@ -46,7 +47,7 @@ class IoMgr { } private: - std::unique_ptr backing_file_; + std::unique_ptr backing_file_; size_t sz_ = 0; union { diff --git a/src/server/io_utils.h b/src/server/io_utils.h index 5f0409699..a3e74468c 100644 --- a/src/server/io_utils.h +++ b/src/server/io_utils.h @@ -5,7 +5,6 @@ #include "base/io_buf.h" #include "io/io.h" #include "server/common.h" -#include "util/fibers/event_count.h" namespace dfly { @@ -62,7 +61,7 @@ class BufferedStreamerBase : public io::Sink { protected: bool producer_done_ = false; // whether producer is done unsigned buffered_ = 0; // how many entries are buffered - EventCount waker_{}; // two sided waker + EventCount waker_; // two sided waker const Cancellation* cll_; // global cancellation diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 1ccfbcb12..65335f8e6 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -11,6 +11,7 @@ #include #include "base/logging.h" +#include "util/uring/uring_file.h" namespace dfly { namespace journal { @@ -84,8 +85,7 @@ std::error_code JournalSlice::Open(std::string_view dir) { // https://www.evanjones.ca/durability-filesystem.html // NOTE: O_DSYNC is omitted. constexpr auto kJournalFlags = O_CLOEXEC | O_CREAT | O_TRUNC | O_RDWR; - io::Result> res = - uring::OpenLinux(shard_path_, kJournalFlags, 0666); + io::Result> res = OpenLinux(shard_path_, kJournalFlags, 0666); if (!res) { return res.error(); } diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index e8e5bb0ca..c811a0aff 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -8,9 +8,9 @@ #include #include "base/ring_buffer.h" +#include "core/uring.h" #include "server/common.h" #include "server/journal/types.h" -#include "util/uring/uring_file.h" namespace dfly { namespace journal { @@ -49,10 +49,10 @@ class JournalSlice { struct RingItem; std::string shard_path_; - std::unique_ptr shard_file_; + std::unique_ptr shard_file_; std::optional> ring_buffer_; - util::fibers_ext::SharedMutex cb_mu_; + util::SharedMutex cb_mu_; std::vector> change_cb_arr_; size_t file_offset_ = 0; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index ce118b522..308c6a84f 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -5,9 +5,10 @@ #include "server/journal/streamer.h" namespace dfly { +using namespace util; void JournalStreamer::Start(io::Sink* dest) { - write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest); + write_fb_ = MakeFiber(&JournalStreamer::WriterFb, this, dest); journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) { if (entry.opcode == journal::Op::NOOP) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index e9cfb3145..68939c128 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -7,7 +7,6 @@ #include "server/io_utils.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" -#include "util/fibers/fiber.h" namespace dfly { diff --git a/src/server/replica.cc b/src/server/replica.cc index 4b6806c63..783adea2f 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -25,7 +25,6 @@ extern "C" { #include "server/main_service.h" #include "server/rdb_load.h" #include "strings/human_readable.h" -#include "util/proactor_base.h" ABSL_FLAG(bool, enable_multi_shard_sync, false, "Execute multi shards commands on replica syncrhonized"); @@ -160,7 +159,7 @@ bool Replica::Start(ConnectionContext* cntx) { cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this)); // 5. Spawn main coordination fiber. - sync_fb_ = Fiber(&Replica::MainReplicationFb, this); + sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); (*cntx)->SendOk(); return true; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 99093d16c..a8f2cd220 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -70,13 +70,11 @@ ABSL_DECLARE_FLAG(uint32_t, hz); namespace dfly { namespace fs = std::filesystem; -namespace uring = util::uring; using absl::GetFlag; using absl::StrCat; using namespace facade; using namespace util; -using fibers_ext::FiberQueueThreadPool; using http::StringResponse; using strings::HumanReadableNumBytes; @@ -165,17 +163,17 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) { // takes ownership over the file. class LinuxWriteWrapper : public io::Sink { public: - LinuxWriteWrapper(uring::LinuxFile* lf) : lf_(lf) { + LinuxWriteWrapper(LinuxFile* lf) : lf_(lf) { } io::Result WriteSome(const iovec* v, uint32_t len) final; - std::error_code Close() { + error_code Close() { return lf_->Close(); } private: - std::unique_ptr lf_; + unique_ptr lf_; off_t offset_ = 0; }; @@ -184,7 +182,7 @@ class RdbSnapshot { RdbSnapshot(FiberQueueThreadPool* fq_tp) : fq_tp_(fq_tp) { } - error_code Start(SaveMode save_mode, const std::string& path, const StringVec& lua_scripts); + error_code Start(SaveMode save_mode, const string& path, const StringVec& lua_scripts); void StartInShard(EngineShard* shard); error_code SaveBody(); @@ -201,8 +199,8 @@ class RdbSnapshot { private: bool started_ = false; FiberQueueThreadPool* fq_tp_; - std::unique_ptr io_sink_; - std::unique_ptr saver_; + unique_ptr io_sink_; + unique_ptr saver_; RdbTypeFreqMap freq_map_; Cancellation cll_{}; @@ -226,7 +224,7 @@ error_code RdbSnapshot::Start(SaveMode save_mode, const std::string& path, return res.error(); io_sink_.reset(*res); } else { - auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666); + auto res = OpenLinux(path, kRdbWriteFlags, 0666); if (!res) { return res.error(); } @@ -608,7 +606,7 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) { if (fq_threadpool_) { res = util::OpenFiberReadFile(rdb_file, fq_threadpool_.get()); } else { - res = uring::OpenRead(rdb_file); + res = OpenRead(rdb_file); } if (res) { diff --git a/src/server/server_family.h b/src/server/server_family.h index b160c2d1d..be6ba113b 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -10,8 +10,6 @@ #include "facade/redis_parser.h" #include "server/channel_store.h" #include "server/engine_shard_set.h" -#include "util/fibers/fiber.h" -#include "util/proactor_pool.h" namespace util { class AcceptServer; @@ -189,7 +187,7 @@ class ServerFamily { std::atomic_bool is_saving_{false}; Done is_snapshot_done_; - std::unique_ptr fq_threadpool_; + std::unique_ptr fq_threadpool_; }; } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 204f94f02..1c38df3ed 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -18,8 +18,6 @@ extern "C" { #include "server/journal/journal.h" #include "server/rdb_extensions.h" #include "server/rdb_save.h" -#include "util/fiber_sched_algo.h" -#include "util/proactor_base.h" namespace dfly { @@ -107,7 +105,7 @@ void SliceSnapshot::Join() { void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { { auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex()); - FiberProps::SetName(std::move(fiber_name)); + ThisFiber::SetName(std::move(fiber_name)); } PrimeTable::Cursor cursor; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index b1666ef4e..f28d5fd04 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -12,7 +12,6 @@ #include "server/db_slice.h" #include "server/rdb_save.h" #include "server/table.h" -#include "util/fibers/simple_channel.h" namespace dfly { @@ -55,8 +54,7 @@ class SliceSnapshot { std::string value; }; - using RecordChannel = - ::util::fibers_ext::SimpleChannel>; + using RecordChannel = SimpleChannel>; SliceSnapshot(DbSlice* slice, RecordChannel* dest, CompressionMode compression_mode); ~SliceSnapshot(); diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 837e4266e..c986b41ae 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -14,7 +14,6 @@ extern "C" { #include "base/logging.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" -#include "util/proactor_base.h" ABSL_FLAG(uint32_t, tiered_storage_max_pending_writes, 32, "Maximal number of pending writes per thread"); diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 91485eab4..1fc5fbe2a 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -9,7 +9,6 @@ #include "server/common.h" #include "server/io_mgr.h" #include "server/table.h" -#include "util/fibers/event_count.h" namespace dfly {