From c271e131762775a53c92bbab2633c792c33bb65f Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 30 Mar 2023 13:26:59 +0300 Subject: [PATCH] chore: import fiber related primitives under dfly namespace (#1012) This change removes most mentions of boost::fibers or util::fibers_ext. Instead it introduces "core/fibers.h" file that incorporates most of the primitives under dfly namespace. This is done in preparation to switching from Boost.Fibers to helio native fibers. Signed-off-by: Roman Gershman --- src/core/fibers.h | 27 +++++++++++++++++++ src/core/interpreter.h | 4 +-- src/facade/dragonfly_connection.cc | 16 ++++++------ src/facade/dragonfly_connection.h | 12 +++++---- src/server/channel_store.h | 5 ++-- src/server/common.cc | 2 +- src/server/common.h | 8 +++--- src/server/conn_context.h | 4 +-- src/server/debugcmd.cc | 6 ++--- src/server/dfly_main.cc | 4 +-- src/server/dflycmd.cc | 2 +- src/server/dflycmd.h | 10 ++++---- src/server/dragonfly_test.cc | 12 ++++----- src/server/engine_shard_set.h | 27 +++++++++---------- src/server/generic_family_test.cc | 2 +- src/server/io_mgr.cc | 2 +- src/server/io_utils.h | 6 ++--- src/server/journal/journal.cc | 3 +-- src/server/journal/journal.h | 2 +- src/server/journal/journal_slice.cc | 2 -- src/server/journal/journal_slice.h | 1 - src/server/journal/streamer.cc | 2 +- src/server/journal/streamer.h | 2 +- src/server/list_family_test.cc | 40 ++++++++++++++--------------- src/server/main_service.cc | 7 +++-- src/server/main_service.h | 2 +- src/server/multi_test.cc | 2 +- src/server/rdb_load.cc | 2 +- src/server/replica.cc | 18 ++++++------- src/server/replica.h | 20 +++++++-------- src/server/script_mgr.cc | 2 +- src/server/script_mgr.h | 2 +- src/server/server_family.cc | 16 ++++++------ src/server/server_family.h | 10 ++++---- src/server/snapshot.cc | 2 +- src/server/snapshot.h | 4 +-- src/server/test_utils.cc | 2 +- src/server/test_utils.h | 2 +- src/server/tiered_storage.h | 2 +- src/server/transaction.h | 5 ++-- 40 files changed, 160 insertions(+), 139 deletions(-) create mode 100644 src/core/fibers.h diff --git a/src/core/fibers.h b/src/core/fibers.h new file mode 100644 index 000000000..90050cea6 --- /dev/null +++ b/src/core/fibers.h @@ -0,0 +1,27 @@ +// Copyright 2023, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +// An import header that centralizes all the imports from helio project regarding fibers + +#include "util/fibers/event_count.h" +#include "util/fibers/fiber.h" +#include "util/fibers/fiberqueue_threadpool.h" +#include "util/fibers/fibers_ext.h" + +namespace dfly { + +using util::fibers_ext::Barrier; +using util::fibers_ext::BlockingCounter; +using util::fibers_ext::Done; +using util::fibers_ext::EventCount; +using util::fibers_ext::Fiber; +using util::fibers_ext::FiberQueue; +using util::fibers_ext::Future; +using util::fibers_ext::Launch; +using util::fibers_ext::Mutex; +using util::fibers_ext::Promise; +using CondVar = ::boost::fibers::condition_variable; + +} // namespace dfly diff --git a/src/core/interpreter.h b/src/core/interpreter.h index 08cb83761..4d644784c 100644 --- a/src/core/interpreter.h +++ b/src/core/interpreter.h @@ -8,7 +8,7 @@ #include #include "core/core_types.h" -#include "util/fibers/event_count.h" +#include "core/fibers.h" typedef struct lua_State lua_State; @@ -124,7 +124,7 @@ class InterpreterManager { void Return(Interpreter*); private: - ::util::fibers_ext::EventCount waker_; + EventCount waker_; std::vector available_; std::vector storage_; }; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 31417ef48..bd8d4ec9d 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -359,17 +359,17 @@ void Connection::OnShutdown() { void Connection::OnPreMigrateThread() { // If we migrating to another io_uring we should cancel any pending requests we have. - if (break_poll_id_ != kuint32max) { + if (break_poll_id_ != UINT32_MAX) { auto* ls = static_cast(socket_.get()); ls->CancelPoll(break_poll_id_); - break_poll_id_ = kuint32max; + break_poll_id_ = UINT32_MAX; } } void Connection::OnPostMigrateThread() { // Once we migrated, we should rearm OnBreakCb callback. if (breaker_cb_) { - DCHECK_EQ(kuint32max, break_poll_id_); + DCHECK_EQ(UINT32_MAX, break_poll_id_); auto* ls = static_cast(socket_.get()); break_poll_id_ = @@ -448,7 +448,7 @@ void Connection::HandleRequests() { ConnectionFlow(peer); - if (break_poll_id_ != kuint32max) { + if (break_poll_id_ != UINT32_MAX) { us->CancelPoll(break_poll_id_); } @@ -513,7 +513,7 @@ string Connection::GetClientInfo(unsigned thread_id) const { return res; } -uint32 Connection::GetClientId() const { +uint32_t Connection::GetClientId() const { return id_; } @@ -558,7 +558,7 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) { stats_ = service_->GetThreadLocalConnectionStats(); - auto dispatch_fb = MakeFiber(fibers_ext::Launch::dispatch, [&] { DispatchFiber(peer); }); + auto dispatch_fb = MakeFiber(dfly::Launch::dispatch, [&] { DispatchFiber(peer); }); ++stats_->num_conns; ++stats_->conn_received_cnt; @@ -683,7 +683,7 @@ auto Connection::ParseRedis() -> ParserStatus { if (dispatch_q_.size() == 1) { evc_.notify(); } else if (dispatch_q_.size() > 10) { - fibers_ext::Yield(); + ThisFiber::Yield(); } } } @@ -762,7 +762,7 @@ void Connection::OnBreakCb(int32_t mask) { VLOG(1) << "Got event " << mask; CHECK(cc_); cc_->conn_closing = true; - break_poll_id_ = kuint32max; // do not attempt to cancel it. + break_poll_id_ = UINT32_MAX; // do not attempt to cancel it. breaker_cb_(mask); evc_.notify(); // Notify dispatch fiber. diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index a46b3fa8f..7508845b8 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -10,11 +10,13 @@ #include #include "base/io_buf.h" +#include "util/connection.h" +#include "util/http/http_handler.h" + +// +#include "core/fibers.h" #include "facade/facade_types.h" #include "facade/resp_expr.h" -#include "util/connection.h" -#include "util/fibers/fibers_ext.h" -#include "util/http/http_handler.h" typedef struct ssl_ctx_st SSL_CTX; typedef struct mi_heap_s mi_heap_t; @@ -92,7 +94,7 @@ class Connection : public util::Connection { std::string RemoteEndpointStr() const; std::string RemoteEndpointAddress() const; std::string LocalBindAddress() const; - uint32 GetClientId() const; + uint32_t GetClientId() const; void ShutdownSelf(); @@ -153,7 +155,7 @@ class Connection : public util::Connection { uint32_t pipeline_msg_cnt_ = 0; static thread_local std::vector free_req_pool_; - util::fibers_ext::EventCount evc_; + dfly::EventCount evc_; RespVec parse_args_; CmdArgVec cmd_vec_; diff --git a/src/server/channel_store.h b/src/server/channel_store.h index 767956cea..a60e6c59a 100644 --- a/src/server/channel_store.h +++ b/src/server/channel_store.h @@ -5,7 +5,6 @@ #include -#include #include #include "server/conn_context.h" @@ -54,7 +53,7 @@ class ChannelStore { static bool ByThread(const Subscriber& lhs, const Subscriber& rhs); ConnectionContext* conn_cntx; - util::fibers_ext::BlockingCounter borrow_token; // to keep connection alive + BlockingCounter borrow_token; // to keep connection alive uint32_t thread_id; std::string pattern; // non-empty if registered via psubscribe }; @@ -107,7 +106,7 @@ class ChannelStore { // Centralized controller to prevent overlaping updates. struct ControlBlock { std::atomic most_recent; - ::boost::fibers::mutex update_mu; // locked during updates. + Mutex update_mu; // locked during updates. }; private: diff --git a/src/server/common.cc b/src/server/common.cc index fb6e1f6d2..dda6f1d19 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -311,7 +311,7 @@ GenericError Context::ReportErrorInternal(GenericError&& err) { CHECK(!err_handler_fb_.IsJoinable()); if (err_handler_) - err_handler_fb_ = util::fibers_ext::Fiber{err_handler_, err_}; + err_handler_fb_ = Fiber{err_handler_, err_}; Cancellation::Cancel(); return err_; diff --git a/src/server/common.h b/src/server/common.h index 5f80d5f5a..1fba24bbc 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -12,9 +12,9 @@ #include #include +#include "core/fibers.h" #include "facade/facade_types.h" #include "facade/op_status.h" -#include "util/fibers/fiber.h" namespace dfly { @@ -215,7 +215,7 @@ template struct AggregateValue { } private: - util::fibers_ext::Mutex mu_{}; + Mutex mu_{}; T current_{}; }; @@ -319,10 +319,10 @@ class Context : protected Cancellation { private: GenericError err_; - util::fibers_ext::Mutex mu_; + Mutex mu_; ErrHandler err_handler_; - ::util::fibers_ext::Fiber err_handler_fb_; + Fiber err_handler_fb_; }; struct ScanOpts { diff --git a/src/server/conn_context.h b/src/server/conn_context.h index d655c2006..e3da287a8 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -6,9 +6,9 @@ #include +#include "core/fibers.h" #include "facade/conn_context.h" #include "server/common.h" -#include "util/fibers/fibers_ext.h" namespace dfly { @@ -82,7 +82,7 @@ struct ConnectionState { absl::flat_hash_set channels; absl::flat_hash_set patterns; - util::fibers_ext::BlockingCounter borrow_token{0}; + BlockingCounter borrow_token{0}; }; struct ReplicationInfo { diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 680753b88..779598ddb 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -297,7 +297,7 @@ void DebugCmd::Populate(CmdArgList args) { } ranges.emplace_back(from, total_count - from); - vector fb_arr(ranges.size()); + vector fb_arr(ranges.size()); for (size_t i = 0; i < ranges.size(); ++i) { auto range = ranges[i]; @@ -335,7 +335,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view ess.Add(sid, [=] { DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch); if (i % 50 == 0) { - fibers_ext::Yield(); + ThisFiber::Yield(); } }); @@ -417,7 +417,7 @@ void DebugCmd::Inspect(string_view key) { } void DebugCmd::Watched() { - util::fibers_ext::Mutex mu; + Mutex mu; vector watched_keys; vector awaked_trans; diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index b1f2d75c8..3b005b910 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -176,8 +176,8 @@ std::optional GetRemoteVersion(ProactorBase* proactor, SSL_CTX* ssl } struct VersionMonitor { - fibers_ext::Fiber version_fiber_; - fibers_ext::Done monitor_ver_done_; + Fiber version_fiber_; + Done monitor_ver_done_; void Run(ProactorPool* proactor_pool); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index c0436cd5b..c3b620fa0 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -383,7 +383,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard); } - flow->full_sync_fb = fibers_ext::Fiber(&DflyCmd::FullSyncFb, this, flow, cntx); + flow->full_sync_fb = Fiber(&DflyCmd::FullSyncFb, this, flow, cntx); return OpStatus::OK; } diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index deb225511..3ab16b9d1 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -1,4 +1,4 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. +// Copyright 2023, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // @@ -85,8 +85,8 @@ class DflyCmd { facade::Connection* conn; - util::fibers_ext::Fiber full_sync_fb; // Full sync fiber. - std::unique_ptr saver; // Saver used by the full sync phase. + Fiber full_sync_fb; // Full sync fiber. + std::unique_ptr saver; // Saver used by the full sync phase. std::unique_ptr streamer; std::string eof_token; @@ -108,7 +108,7 @@ class DflyCmd { uint32_t listening_port; std::vector flows; - util::fibers_ext::Mutex mu; // See top of header for locking levels. + Mutex mu; // See top of header for locking levels. }; struct ReplicaRoleInfo { @@ -204,7 +204,7 @@ class DflyCmd { using ReplicaInfoMap = absl::btree_map>; ReplicaInfoMap replica_infos_; - util::fibers_ext::Mutex mu_; // Guard global operations. See header top for locking levels. + Mutex mu_; // Guard global operations. See header top for locking levels. }; } // namespace dfly diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 45eaa49d0..ed8c3acac 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -154,7 +154,7 @@ TEST_F(DflyEngineTest, EvalBug713) { // A auto fb0 = pp_->at(1)->LaunchFiber([&] { - fibers_ext::Yield(); + ThisFiber::Yield(); for (unsigned i = 0; i < 50; ++i) { Run({"eval", script, "3", kKeySid0, kKeySid1, kKeySid2}); } @@ -179,7 +179,7 @@ TEST_F(DflyEngineTest, EvalBug713b) { const char* script = "return redis.call('get', KEYS[1])"; const uint32_t kNumFibers = 20; - fibers_ext::Fiber fibers[kNumFibers]; + Fiber fibers[kNumFibers]; for (unsigned j = 0; j < kNumFibers; ++j) { fibers[j] = pp_->at(1)->LaunchFiber([j, script, this] { @@ -293,7 +293,7 @@ TEST_F(DflyEngineTest, FlushAll) { for (size_t i = 1; i < 100; ++i) { RespExpr resp = Run({"set", "foo", "bar"}); ASSERT_EQ(resp, "OK"); - fibers_ext::Yield(); + ThisFiber::Yield(); } }); @@ -553,13 +553,13 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { EngineShard* shard = EngineShard::tlocal(); ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! - fibers_ext::SleepFor(100ms); + ThisFiber::SleepFor(100ms); // make sure that the task that collect memory usage from all shard ran // for at least once, and that no defrag was done yet. auto stats = shard->stats(); for (int i = 0; i < 3; i++) { - fibers_ext::SleepFor(100ms); + ThisFiber::SleepFor(100ms); EXPECT_EQ(stats.defrag_realloc_total, 0); } }); @@ -578,7 +578,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { auto stats = shard->stats(); for (int i = 0; i < kMaxDefragTriesForTests && stats.defrag_realloc_total == 0; i++) { stats = shard->stats(); - fibers_ext::SleepFor(220ms); + ThisFiber::SleepFor(220ms); } // make sure that we successfully found places to defrag in memory EXPECT_GT(stats.defrag_realloc_total, 0); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index be54d1ec1..e1ca87bb6 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -13,14 +13,15 @@ extern "C" { #include #include "base/string_view_sso.h" +#include "util/proactor_pool.h" +#include "util/sliding_counter.h" + +// #include "core/external_alloc.h" +#include "core/fibers.h" #include "core/mi_memory_resource.h" #include "core/tx_queue.h" #include "server/db_slice.h" -#include "util/fibers/fiberqueue_threadpool.h" -#include "util/fibers/fibers_ext.h" -#include "util/proactor_pool.h" -#include "util/sliding_counter.h" namespace dfly { @@ -72,7 +73,7 @@ class EngineShard { return &mi_resource_; } - ::util::fibers_ext::FiberQueue* GetFiberQueue() { + FiberQueue* GetFiberQueue() { return &queue_; } @@ -188,8 +189,8 @@ class EngineShard { // return true if we did not complete the shard scan bool DoDefrag(); - ::util::fibers_ext::FiberQueue queue_; - util::fibers_ext::Fiber fiber_q_; + FiberQueue queue_; + Fiber fiber_q_; TxQueue txq_; MiMemoryResource mi_resource_; @@ -207,8 +208,8 @@ class EngineShard { IntentLock shard_lock_; uint32_t defrag_task_ = 0; - ::util::fibers_ext::Fiber fiber_periodic_; - ::util::fibers_ext::Done fiber_periodic_done_; + Fiber fiber_periodic_; + Done fiber_periodic_done_; DefragTaskState defrag_state_; std::unique_ptr tiered_storage_; @@ -275,7 +276,7 @@ class EngineShardSet { // The functions running inside the shard queue run atomically (sequentially) // with respect each other on the same shard. template void AwaitRunningOnShardQueue(U&& func) { - util::fibers_ext::BlockingCounter bc{unsigned(shard_queue_.size())}; + BlockingCounter bc{unsigned(shard_queue_.size())}; for (size_t i = 0; i < shard_queue_.size(); ++i) { Add(i, [&func, bc]() mutable { func(EngineShard::tlocal()); @@ -294,12 +295,12 @@ class EngineShardSet { void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); util::ProactorPool* pp_; - std::vector shard_queue_; + std::vector shard_queue_; }; template void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const { - util::fibers_ext::BlockingCounter bc{0}; + BlockingCounter bc{0}; for (uint32_t i = 0; i < size(); ++i) { if (!pred(i)) @@ -316,7 +317,7 @@ void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const { } template void EngineShardSet::RunBlockingInParallel(U&& func) { - util::fibers_ext::BlockingCounter bc{size()}; + BlockingCounter bc{size()}; for (uint32_t i = 0; i < size(); ++i) { util::ProactorBase* dest = pp_->at(i); diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index d7cd24699..d9772d987 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -276,7 +276,7 @@ TEST_F(GenericFamilyTest, Move) { ASSERT_THAT(Run({"get", "a"}), "test"); // Check MOVE awakes blocking operations - auto fb_blpop = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto fb_blpop = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { Run({"select", "1"}); auto resp = Run({"blpop", "l", "0"}); ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); diff --git a/src/server/io_mgr.cc b/src/server/io_mgr.cc index 67acf7724..bd0197af6 100644 --- a/src/server/io_mgr.cc +++ b/src/server/io_mgr.cc @@ -136,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) { void IoMgr::Shutdown() { while (flags_val) { - fibers_ext::SleepFor(200us); // TODO: hacky for now. + ThisFiber::SleepFor(200us); // TODO: hacky for now. } } diff --git a/src/server/io_utils.h b/src/server/io_utils.h index 807922845..5f0409699 100644 --- a/src/server/io_utils.h +++ b/src/server/io_utils.h @@ -60,9 +60,9 @@ class BufferedStreamerBase : public io::Sink { bool IsStopped(); protected: - bool producer_done_ = false; // whether producer is done - unsigned buffered_ = 0; // how many entries are buffered - ::util::fibers_ext::EventCount waker_{}; // two sided waker + bool producer_done_ = false; // whether producer is done + unsigned buffered_ = 0; // how many entries are buffered + EventCount waker_{}; // two sided waker const Cancellation* cll_; // global cancellation diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 77ae187a0..e541e85e9 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -17,7 +17,6 @@ namespace journal { namespace fs = std::filesystem; using namespace std; using namespace util; -namespace fibers = boost::fibers; namespace { @@ -55,7 +54,7 @@ error_code Journal::Close() { VLOG(1) << "Journal::Close"; - fibers_ext::Mutex ec_mu; + Mutex ec_mu; error_code res; lock_guard lk(state_mu_); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index d7ef7634a..2182f74b0 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -53,7 +53,7 @@ class Journal { bool await); private: - mutable util::fibers_ext::Mutex state_mu_; + mutable Mutex state_mu_; std::atomic_bool lameduck_{false}; }; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 500244d27..1ccfbcb12 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -11,13 +11,11 @@ #include #include "base/logging.h" -#include "util/fibers/fibers_ext.h" namespace dfly { namespace journal { using namespace std; using namespace util; -namespace fibers = boost::fibers; namespace fs = std::filesystem; namespace { diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 8c655a8ef..e8e5bb0ca 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -10,7 +10,6 @@ #include "base/ring_buffer.h" #include "server/common.h" #include "server/journal/types.h" -#include "util/fibers/fibers_ext.h" #include "util/uring/uring_file.h" namespace dfly { diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6e9df49d6..ce118b522 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -7,7 +7,7 @@ namespace dfly { void JournalStreamer::Start(io::Sink* dest) { - write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest); + write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest); journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) { if (entry.opcode == journal::Op::NOOP) { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 795cb3064..e9cfb3145 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -41,7 +41,7 @@ class JournalStreamer : protected BufferedStreamerBase { uint32_t journal_cb_id_{0}; journal::Journal* journal_; - util::fibers_ext::Fiber write_fb_{}; + Fiber write_fb_{}; JournalWriter writer_{this}; std::atomic_uint64_t record_cnt_{0}; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 9038a0cdc..e61d4c141 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -111,17 +111,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) { RespExpr resp0, resp1; // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { resp0 = Run({"blpop", "x", "0"}); LOG(INFO) << "pop0"; }); - fibers_ext::SleepFor(50us); + ThisFiber::SleepFor(50us); auto fb1 = pp_->at(1)->LaunchFiber([&] { resp1 = Run({"blpop", "x", "0"}); LOG(INFO) << "pop1"; }); - fibers_ext::SleepFor(30us); + ThisFiber::SleepFor(30us); RespExpr resp = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "2", "1"}); }); ASSERT_THAT(resp, IntArg(2)); @@ -150,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) { ASSERT_FALSE(IsLocked(0, kKey1)); ASSERT_FALSE(IsLocked(0, kKey2)); - auto fb1 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto fb1 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { resp0 = Run({"blpop", kKey1, kKey2, "0"}); }); @@ -203,7 +203,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { Run({"exists", kKey1, kKey2, kKey3}); ASSERT_EQ(3, GetDebugInfo().shards_count); RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); @@ -242,7 +242,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) { TEST_F(ListFamilyTest, BLPopSerialize) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"}); }); @@ -312,7 +312,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) { TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -339,7 +339,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { TEST_F(ListFamilyTest, BPopSameKeyTwice) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); EXPECT_EQ(0, NumWatched()); }); @@ -352,7 +352,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { ASSERT_THAT(blpop_resp, ArrLen(2)); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); - pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); }); @@ -370,7 +370,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { ASSERT_EQ(1, GetDebugInfo().shards_count); RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", "x", "y", "0"}); EXPECT_FALSE(IsLocked(0, "y")); ASSERT_EQ(0, NumWatched()); @@ -391,7 +391,7 @@ TEST_F(ListFamilyTest, BPopRename) { Run({"exists", kKey1, kKey2}); ASSERT_EQ(2, GetDebugInfo().shards_count); - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -409,7 +409,7 @@ TEST_F(ListFamilyTest, BPopRename) { TEST_F(ListFamilyTest, BPopFlush) { RespExpr blpop_resp; - auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, "0"}); }); @@ -665,11 +665,11 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { for (int i = 0; i < 300; i++) { Run(id, {"rpush", "a", "DATA"}); } - fibers_ext::SleepFor(50ms); + ThisFiber::SleepFor(50ms); running = false; }; - vector fbs; + vector fbs; // more likely to reproduce the bug if we start pop_fiber first. for (int i = 0; i < 2; i++) { @@ -715,10 +715,10 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) { RespExpr resp; // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { resp = Run({"brpoplpush", "x", "y", "0"}); }); - fibers_ext::SleepFor(30us); + ThisFiber::SleepFor(30us); pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); }); pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); }); @@ -735,9 +735,9 @@ TEST_F(ListFamilyTest, BRPopContended) { constexpr auto kNumFibers = 4; // Run the fiber at creation. - fibers_ext::Fiber fb[kNumFibers]; + Fiber fb[kNumFibers]; for (int i = 0; i < kNumFibers; i++) { - fb[i] = pp_->at(1)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + fb[i] = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] { string id = StrCat("id", i); while (!done) { Run(id, {"brpop", "k0", "k1", "k2", "k3", "k4", "0.1"}); @@ -772,11 +772,11 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { ASSERT_EQ(0, NumWatched()); // Run the fiber at creation. - auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] { + auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] { resp = Run({"brpoplpush", "x", "z", "0"}); }); - fibers_ext::SleepFor(30us); + ThisFiber::SleepFor(30us); RespExpr resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "z", "val2"}); }); ASSERT_THAT(resp_push, IntArg(1)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 6919abffd..6963e23e3 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -57,11 +57,10 @@ namespace dfly { #endif using namespace util; -using base::VarzValue; -using ::boost::intrusive_ptr; -namespace fibers = ::boost::fibers; using absl::GetFlag; using absl::StrCat; +using base::VarzValue; +using ::boost::intrusive_ptr; using namespace facade; namespace h2 = boost::beast::http; @@ -555,7 +554,7 @@ void Service::Shutdown() { pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); }); // wait for all the pending callbacks to stop. - fibers_ext::SleepFor(10ms); + ThisFiber::SleepFor(10ms); } static void MultiSetError(ConnectionContext* cntx) { diff --git a/src/server/main_service.h b/src/server/main_service.h index ec5389f6d..472ffc233 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -128,7 +128,7 @@ class Service : public facade::ServiceInterface { CommandRegistry registry_; absl::flat_hash_map unknown_cmds_; - mutable util::fibers_ext::Mutex mu_; + mutable Mutex mu_; GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_; }; diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index f7b949176..5c95d2a43 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -208,7 +208,7 @@ TEST_F(MultiTest, MultiConsistent) { auto fb = pp_->at(1)->LaunchFiber([&] { RespExpr resp = Run({"multi"}); ASSERT_EQ(resp, "OK"); - fibers_ext::SleepFor(1ms); + ThisFiber::SleepFor(1ms); resp = Run({"get", kKey1}); ASSERT_EQ(resp, "QUEUED"); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index d5986abca..12891b061 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1851,7 +1851,7 @@ error_code RdbLoader::Load(io::Source* src) { } void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { - fibers_ext::BlockingCounter bc(shard_set->size()); + BlockingCounter bc(shard_set->size()); for (unsigned i = 0; i < shard_set->size(); ++i) { // Flush the remaining items. FlushShardAsync(i); diff --git a/src/server/replica.cc b/src/server/replica.cc index 7288d2f6f..77cae9f82 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -160,7 +160,7 @@ bool Replica::Start(ConnectionContext* cntx) { cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this)); // 5. Spawn main coordination fiber. - sync_fb_ = fibers_ext::Fiber(&Replica::MainReplicationFb, this); + sync_fb_ = Fiber(&Replica::MainReplicationFb, this); (*cntx)->SendOk(); return true; @@ -196,7 +196,7 @@ void Replica::MainReplicationFb() { // 1. Connect socket. if ((state_mask_ & R_TCP_CONNECTED) == 0) { - fibers_ext::SleepFor(500ms); + ThisFiber::SleepFor(500ms); if (is_paused_) continue; @@ -488,7 +488,7 @@ error_code Replica::InitiatePSync() { // There is a data race condition in Redis-master code, where "ACK 0" handler may be // triggered before Redis is ready to transition to the streaming state and it silenty ignores // "ACK 0". We reduce the chance it happens with this delay. - fibers_ext::SleepFor(50ms); + ThisFiber::SleepFor(50ms); return error_code{}; } @@ -514,7 +514,7 @@ error_code Replica::InitiateDflySync() { } // Blocked on until all flows got full sync cut. - fibers_ext::BlockingCounter sync_block{num_df_flows_}; + BlockingCounter sync_block{num_df_flows_}; // Switch to new error handler that closes flow sockets. auto err_handler = [this, sync_block](const auto& ge) mutable { @@ -734,7 +734,7 @@ error_code Replica::SendNextPhaseRequest(bool stable) { return std::error_code{}; } -error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* cntx) { +error_code Replica::StartFullSyncFlow(BlockingCounter sb, Context* cntx) { DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); RETURN_ON_ERR(ConnectAndAuth()); @@ -771,7 +771,7 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = fibers_ext::Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); + sync_fb_ = Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); return error_code{}; } @@ -784,15 +784,15 @@ error_code Replica::StartStableSyncFlow(Context* cntx) { CHECK(sock_->IsOpen()); // sock_.reset(mythread->CreateSocket()); // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); - sync_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyReadFb, this, cntx); + sync_fb_ = Fiber(&Replica::StableSyncDflyReadFb, this, cntx); if (use_multi_shard_exe_sync_) { - execution_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyExecFb, this, cntx); + execution_fb_ = Fiber(&Replica::StableSyncDflyExecFb, this, cntx); } return std::error_code{}; } -void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, Context* cntx) { +void Replica::FullSyncDflyFb(string eof_token, BlockingCounter bc, Context* cntx) { DCHECK(leftover_buf_); SocketSource ss{sock_.get()}; io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; diff --git a/src/server/replica.h b/src/server/replica.h index 6311b159c..718b3e3f1 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -13,7 +13,6 @@ #include "server/common.h" #include "server/journal/types.h" #include "util/fiber_socket_base.h" -#include "util/fibers/fibers_ext.h" namespace facade { class ReqSerializer; @@ -81,12 +80,12 @@ class Replica { // Coorindator for multi shard execution. struct MultiShardExecution { - util::fibers_ext::Mutex map_mu; + Mutex map_mu; struct TxExecutionSync { - util::fibers_ext::Barrier barrier; + Barrier barrier; std::atomic_uint32_t counter; - util::fibers_ext::BlockingCounter block; + BlockingCounter block; TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter), block(counter) { } @@ -141,14 +140,13 @@ class Replica { std::shared_ptr shared_exe_data); // Start replica initialized as dfly flow. - std::error_code StartFullSyncFlow(util::fibers_ext::BlockingCounter block, Context* cntx); + std::error_code StartFullSyncFlow(BlockingCounter block, Context* cntx); // Transition into stable state mode as dfly flow. std::error_code StartStableSyncFlow(Context* cntx); // Single flow full sync fiber spawned by StartFullSyncFlow. - void FullSyncDflyFb(std::string eof_token, util::fibers_ext::BlockingCounter block, - Context* cntx); + void FullSyncDflyFb(std::string eof_token, BlockingCounter block, Context* cntx); // Single flow stable state sync fiber spawned by StartStableSyncFlow. void StableSyncDflyReadFb(Context* cntx); @@ -225,7 +223,7 @@ class Replica { std::queue> trans_data_queue_; static constexpr size_t kYieldAfterItemsInQueue = 50; - ::util::fibers_ext::EventCount waker_; // waker for trans_data_queue_ + EventCount waker_; // waker for trans_data_queue_ bool use_multi_shard_exe_sync_; std::unique_ptr executor_; @@ -233,13 +231,13 @@ class Replica { std::atomic_uint64_t journal_rec_executed_ = 0; // MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode. - ::util::fibers_ext::Fiber sync_fb_; - ::util::fibers_ext::Fiber execution_fb_; + Fiber sync_fb_; + Fiber execution_fb_; std::vector> shard_flows_; // Guard operations where flows might be in a mixed state (transition/setup) - util::fibers_ext::Mutex flows_op_mu_; + Mutex flows_op_mu_; std::optional leftover_buf_; std::unique_ptr parser_; diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index 120a4523b..14a6f0282 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -156,7 +156,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const { void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const { absl::flat_hash_map result; - fibers_ext::Mutex mu; + Mutex mu; shard_set->pool()->AwaitFiberOnAll([&](auto* pb) { auto* ss = ServerState::tlocal(); diff --git a/src/server/script_mgr.h b/src/server/script_mgr.h index 091175ea1..3ce6d71ff 100644 --- a/src/server/script_mgr.h +++ b/src/server/script_mgr.h @@ -70,7 +70,7 @@ class ScriptMgr { ScriptParams default_params_; absl::flat_hash_map db_; - mutable util::fibers_ext::Mutex mu_; + mutable Mutex mu_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8c27357a5..99093d16c 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -471,7 +471,7 @@ void ServerFamily::Shutdown() { // Load starts as many fibers as there are files to load each one separately. // It starts one more fiber that waits for all load fibers to finish and returns the first // error (if any occured) with a future. -fibers_ext::Future ServerFamily::Load(const std::string& load_path) { +Future ServerFamily::Load(const std::string& load_path) { CHECK(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs")); vector paths{{load_path}}; @@ -482,7 +482,7 @@ fibers_ext::Future ServerFamily::Load(const std::string& load_p io::Result files = io::StatFiles(glob); if (files && files->size() == 0) { - fibers_ext::Promise ec_promise; + Promise ec_promise; ec_promise.set_value(make_error_code(errc::no_such_file_or_directory)); return ec_promise.get_future(); } @@ -498,7 +498,7 @@ fibers_ext::Future ServerFamily::Load(const std::string& load_p (void)fs::canonical(path, ec); if (ec) { LOG(ERROR) << "Error loading " << load_path << " " << ec.message(); - fibers_ext::Promise ec_promise; + Promise ec_promise; ec_promise.set_value(ec); return ec_promise.get_future(); } @@ -524,7 +524,7 @@ fibers_ext::Future ServerFamily::Load(const std::string& load_p auto& pool = service_.proactor_pool(); - vector load_fibers; + vector load_fibers; load_fibers.reserve(paths.size()); auto first_error = std::make_shared(); @@ -545,8 +545,8 @@ fibers_ext::Future ServerFamily::Load(const std::string& load_p load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber))); } - boost::fibers::promise ec_promise; - boost::fibers::future ec_future = ec_promise.get_future(); + Promise ec_promise; + Future ec_future = ec_promise.get_future(); // Run fiber that empties the channel and sets ec_promise. auto load_join_fiber = [this, first_error, load_fibers = std::move(load_fibers), @@ -914,7 +914,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) { vector> snapshots; absl::flat_hash_map rdb_name_map; - fibers_ext::Mutex mu; // guards rdb_name_map + Mutex mu; // guards rdb_name_map auto save_cb = [&](unsigned index) { auto& snapshot = snapshots[index]; @@ -1391,7 +1391,7 @@ static void MergeInto(const DbSlice::Stats& src, Metrics* dest) { Metrics ServerFamily::GetMetrics() const { Metrics result; - fibers_ext::Mutex mu; + Mutex mu; auto cb = [&](ProactorBase* pb) { EngineShard* shard = EngineShard::tlocal(); diff --git a/src/server/server_family.h b/src/server/server_family.h index e09cb1b74..b160c2d1d 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -101,7 +101,7 @@ class ServerFamily { // Load snapshot from file (.rdb file or summary.dfs file) and return // future with error_code. - util::fibers_ext::Future Load(const std::string& file_name); + Future Load(const std::string& file_name); // used within tests. bool IsSaving() const { @@ -163,8 +163,8 @@ class ServerFamily { void SnapshotScheduling(const SnapshotSpec& time); - util::fibers_ext::Fiber snapshot_fiber_; - util::fibers_ext::Future load_result_; + Fiber snapshot_fiber_; + Future load_result_; uint32_t stats_caching_task_ = 0; Service& service_; @@ -173,7 +173,7 @@ class ServerFamily { util::ListenerInterface* main_listener_ = nullptr; util::ProactorBase* pb_task_ = nullptr; - mutable util::fibers_ext::Mutex replicaof_mu_, save_mu_; + mutable Mutex replicaof_mu_, save_mu_; std::shared_ptr replica_; // protected by replica_of_mu_ std::unique_ptr script_mgr_; @@ -188,7 +188,7 @@ class ServerFamily { std::shared_ptr last_save_info_; // protected by save_mu_; std::atomic_bool is_saving_{false}; - util::fibers_ext::Done is_snapshot_done_; + Done is_snapshot_done_; std::unique_ptr fq_threadpool_; }; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 1ca4dc379..204f94f02 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -137,7 +137,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { if (stats_.loop_serialized >= last_yield + 100) { DVLOG(2) << "Before sleep " << ThisFiber::GetName(); - fibers_ext::Yield(); + ThisFiber::Yield(); DVLOG(2) << "After sleep"; last_yield = stats_.loop_serialized; diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 810e8261f..b1666ef4e 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -130,8 +130,8 @@ class SliceSnapshot { std::unique_ptr serializer_; - util::fibers_ext::Mutex mu_; - ::util::fibers_ext::Fiber snapshot_fb_; // IterateEntriesFb + Mutex mu_; + Fiber snapshot_fb_; // IterateEntriesFb CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index a43ed98ba..4dc1bff51 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -171,7 +171,7 @@ void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double t auto timeout_micro = chrono::duration_cast(1000ms * timeout); int64_t steps = timeout_micro.count() / step.count(); do { - fibers_ext::SleepFor(step); + ThisFiber::SleepFor(step); } while (!IsLocked(db_index, key) && --steps > 0); CHECK(IsLocked(db_index, key)); } diff --git a/src/server/test_utils.h b/src/server/test_utils.h index ae00ebeeb..ea89c986d 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -95,7 +95,7 @@ class BaseFamilyTest : public ::testing::Test { unsigned num_threads_ = 3; absl::flat_hash_map> connections_; - util::fibers_ext::Mutex mu_; + Mutex mu_; ConnectionContext::DebugInfo last_cmd_dbg_info_; std::vector resp_vec_; diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 0f54e4e25..91485eab4 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -61,7 +61,7 @@ class TieredStorage { size_t submitted_io_write_size_ = 0; uint32_t num_active_requests_ = 0; - util::fibers_ext::EventCount active_req_sem_; + EventCount active_req_sem_; struct PerDb; std::vector db_arr_; diff --git a/src/server/transaction.h b/src/server/transaction.h index 707a35f1e..b410af533 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -19,7 +19,6 @@ #include "server/common.h" #include "server/journal/types.h" #include "server/table.h" -#include "util/fibers/fibers_ext.h" namespace dfly { @@ -494,8 +493,8 @@ class Transaction { uint32_t unique_shard_cnt_{0}; // Number of unique shards active ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 - util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions. - util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks + EventCount blocking_ec_; // Used to wake blocking transactions. + EventCount run_ec_; // Used to wait for shard callbacks // Transaction coordinator state, written and read by coordinator thread. // Can be read by shard threads as long as we respect ordering rules, i.e. when