chore: import fiber related primitives under dfly namespace (#1012)

This change removes most mentions of boost::fibers or util::fibers_ext.
Instead it introduces "core/fibers.h" file that incorporates most of
the primitives under dfly namespace. This is done in preparation to
switching from Boost.Fibers to helio native fibers.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-30 13:26:59 +03:00 committed by GitHub
parent 1aab8a6934
commit c271e13176
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 160 additions and 139 deletions

27
src/core/fibers.h Normal file
View file

@ -0,0 +1,27 @@
// Copyright 2023, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
// An import header that centralizes all the imports from helio project regarding fibers
#include "util/fibers/event_count.h"
#include "util/fibers/fiber.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {
using util::fibers_ext::Barrier;
using util::fibers_ext::BlockingCounter;
using util::fibers_ext::Done;
using util::fibers_ext::EventCount;
using util::fibers_ext::Fiber;
using util::fibers_ext::FiberQueue;
using util::fibers_ext::Future;
using util::fibers_ext::Launch;
using util::fibers_ext::Mutex;
using util::fibers_ext::Promise;
using CondVar = ::boost::fibers::condition_variable;
} // namespace dfly

View file

@ -8,7 +8,7 @@
#include <string_view>
#include "core/core_types.h"
#include "util/fibers/event_count.h"
#include "core/fibers.h"
typedef struct lua_State lua_State;
@ -124,7 +124,7 @@ class InterpreterManager {
void Return(Interpreter*);
private:
::util::fibers_ext::EventCount waker_;
EventCount waker_;
std::vector<Interpreter*> available_;
std::vector<Interpreter> storage_;
};

View file

@ -359,17 +359,17 @@ void Connection::OnShutdown() {
void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have.
if (break_poll_id_ != kuint32max) {
if (break_poll_id_ != UINT32_MAX) {
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
ls->CancelPoll(break_poll_id_);
break_poll_id_ = kuint32max;
break_poll_id_ = UINT32_MAX;
}
}
void Connection::OnPostMigrateThread() {
// Once we migrated, we should rearm OnBreakCb callback.
if (breaker_cb_) {
DCHECK_EQ(kuint32max, break_poll_id_);
DCHECK_EQ(UINT32_MAX, break_poll_id_);
auto* ls = static_cast<LinuxSocketBase*>(socket_.get());
break_poll_id_ =
@ -448,7 +448,7 @@ void Connection::HandleRequests() {
ConnectionFlow(peer);
if (break_poll_id_ != kuint32max) {
if (break_poll_id_ != UINT32_MAX) {
us->CancelPoll(break_poll_id_);
}
@ -513,7 +513,7 @@ string Connection::GetClientInfo(unsigned thread_id) const {
return res;
}
uint32 Connection::GetClientId() const {
uint32_t Connection::GetClientId() const {
return id_;
}
@ -558,7 +558,7 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
void Connection::ConnectionFlow(FiberSocketBase* peer) {
stats_ = service_->GetThreadLocalConnectionStats();
auto dispatch_fb = MakeFiber(fibers_ext::Launch::dispatch, [&] { DispatchFiber(peer); });
auto dispatch_fb = MakeFiber(dfly::Launch::dispatch, [&] { DispatchFiber(peer); });
++stats_->num_conns;
++stats_->conn_received_cnt;
@ -683,7 +683,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.size() == 1) {
evc_.notify();
} else if (dispatch_q_.size() > 10) {
fibers_ext::Yield();
ThisFiber::Yield();
}
}
}
@ -762,7 +762,7 @@ void Connection::OnBreakCb(int32_t mask) {
VLOG(1) << "Got event " << mask;
CHECK(cc_);
cc_->conn_closing = true;
break_poll_id_ = kuint32max; // do not attempt to cancel it.
break_poll_id_ = UINT32_MAX; // do not attempt to cancel it.
breaker_cb_(mask);
evc_.notify(); // Notify dispatch fiber.

View file

@ -10,11 +10,13 @@
#include <variant>
#include "base/io_buf.h"
#include "util/connection.h"
#include "util/http/http_handler.h"
//
#include "core/fibers.h"
#include "facade/facade_types.h"
#include "facade/resp_expr.h"
#include "util/connection.h"
#include "util/fibers/fibers_ext.h"
#include "util/http/http_handler.h"
typedef struct ssl_ctx_st SSL_CTX;
typedef struct mi_heap_s mi_heap_t;
@ -92,7 +94,7 @@ class Connection : public util::Connection {
std::string RemoteEndpointStr() const;
std::string RemoteEndpointAddress() const;
std::string LocalBindAddress() const;
uint32 GetClientId() const;
uint32_t GetClientId() const;
void ShutdownSelf();
@ -153,7 +155,7 @@ class Connection : public util::Connection {
uint32_t pipeline_msg_cnt_ = 0;
static thread_local std::vector<RequestPtr> free_req_pool_;
util::fibers_ext::EventCount evc_;
dfly::EventCount evc_;
RespVec parse_args_;
CmdArgVec cmd_vec_;

View file

@ -5,7 +5,6 @@
#include <absl/container/flat_hash_map.h>
#include <boost/fiber/mutex.hpp>
#include <string_view>
#include "server/conn_context.h"
@ -54,7 +53,7 @@ class ChannelStore {
static bool ByThread(const Subscriber& lhs, const Subscriber& rhs);
ConnectionContext* conn_cntx;
util::fibers_ext::BlockingCounter borrow_token; // to keep connection alive
BlockingCounter borrow_token; // to keep connection alive
uint32_t thread_id;
std::string pattern; // non-empty if registered via psubscribe
};
@ -107,7 +106,7 @@ class ChannelStore {
// Centralized controller to prevent overlaping updates.
struct ControlBlock {
std::atomic<ChannelStore*> most_recent;
::boost::fibers::mutex update_mu; // locked during updates.
Mutex update_mu; // locked during updates.
};
private:

View file

@ -311,7 +311,7 @@ GenericError Context::ReportErrorInternal(GenericError&& err) {
CHECK(!err_handler_fb_.IsJoinable());
if (err_handler_)
err_handler_fb_ = util::fibers_ext::Fiber{err_handler_, err_};
err_handler_fb_ = Fiber{err_handler_, err_};
Cancellation::Cancel();
return err_;

View file

@ -12,9 +12,9 @@
#include <string_view>
#include <vector>
#include "core/fibers.h"
#include "facade/facade_types.h"
#include "facade/op_status.h"
#include "util/fibers/fiber.h"
namespace dfly {
@ -215,7 +215,7 @@ template <typename T> struct AggregateValue {
}
private:
util::fibers_ext::Mutex mu_{};
Mutex mu_{};
T current_{};
};
@ -319,10 +319,10 @@ class Context : protected Cancellation {
private:
GenericError err_;
util::fibers_ext::Mutex mu_;
Mutex mu_;
ErrHandler err_handler_;
::util::fibers_ext::Fiber err_handler_fb_;
Fiber err_handler_fb_;
};
struct ScanOpts {

View file

@ -6,9 +6,9 @@
#include <absl/container/flat_hash_set.h>
#include "core/fibers.h"
#include "facade/conn_context.h"
#include "server/common.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {
@ -82,7 +82,7 @@ struct ConnectionState {
absl::flat_hash_set<std::string> channels;
absl::flat_hash_set<std::string> patterns;
util::fibers_ext::BlockingCounter borrow_token{0};
BlockingCounter borrow_token{0};
};
struct ReplicationInfo {

View file

@ -297,7 +297,7 @@ void DebugCmd::Populate(CmdArgList args) {
}
ranges.emplace_back(from, total_count - from);
vector<fibers_ext::Fiber> fb_arr(ranges.size());
vector<Fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) {
auto range = ranges[i];
@ -335,7 +335,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
ess.Add(sid, [=] {
DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch);
if (i % 50 == 0) {
fibers_ext::Yield();
ThisFiber::Yield();
}
});
@ -417,7 +417,7 @@ void DebugCmd::Inspect(string_view key) {
}
void DebugCmd::Watched() {
util::fibers_ext::Mutex mu;
Mutex mu;
vector<string> watched_keys;
vector<string> awaked_trans;

View file

@ -176,8 +176,8 @@ std::optional<std::string> GetRemoteVersion(ProactorBase* proactor, SSL_CTX* ssl
}
struct VersionMonitor {
fibers_ext::Fiber version_fiber_;
fibers_ext::Done monitor_ver_done_;
Fiber version_fiber_;
Done monitor_ver_done_;
void Run(ProactorPool* proactor_pool);

View file

@ -383,7 +383,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
flow->saver->StartSnapshotInShard(true, cntx->GetCancellation(), shard);
}
flow->full_sync_fb = fibers_ext::Fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
flow->full_sync_fb = Fiber(&DflyCmd::FullSyncFb, this, flow, cntx);
return OpStatus::OK;
}

View file

@ -1,4 +1,4 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
@ -85,7 +85,7 @@ class DflyCmd {
facade::Connection* conn;
util::fibers_ext::Fiber full_sync_fb; // Full sync fiber.
Fiber full_sync_fb; // Full sync fiber.
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
std::unique_ptr<JournalStreamer> streamer;
std::string eof_token;
@ -108,7 +108,7 @@ class DflyCmd {
uint32_t listening_port;
std::vector<FlowInfo> flows;
util::fibers_ext::Mutex mu; // See top of header for locking levels.
Mutex mu; // See top of header for locking levels.
};
struct ReplicaRoleInfo {
@ -204,7 +204,7 @@ class DflyCmd {
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;
ReplicaInfoMap replica_infos_;
util::fibers_ext::Mutex mu_; // Guard global operations. See header top for locking levels.
Mutex mu_; // Guard global operations. See header top for locking levels.
};
} // namespace dfly

View file

@ -154,7 +154,7 @@ TEST_F(DflyEngineTest, EvalBug713) {
// A
auto fb0 = pp_->at(1)->LaunchFiber([&] {
fibers_ext::Yield();
ThisFiber::Yield();
for (unsigned i = 0; i < 50; ++i) {
Run({"eval", script, "3", kKeySid0, kKeySid1, kKeySid2});
}
@ -179,7 +179,7 @@ TEST_F(DflyEngineTest, EvalBug713b) {
const char* script = "return redis.call('get', KEYS[1])";
const uint32_t kNumFibers = 20;
fibers_ext::Fiber fibers[kNumFibers];
Fiber fibers[kNumFibers];
for (unsigned j = 0; j < kNumFibers; ++j) {
fibers[j] = pp_->at(1)->LaunchFiber([j, script, this] {
@ -293,7 +293,7 @@ TEST_F(DflyEngineTest, FlushAll) {
for (size_t i = 1; i < 100; ++i) {
RespExpr resp = Run({"set", "foo", "bar"});
ASSERT_EQ(resp, "OK");
fibers_ext::Yield();
ThisFiber::Yield();
}
});
@ -553,13 +553,13 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
fibers_ext::SleepFor(100ms);
ThisFiber::SleepFor(100ms);
// make sure that the task that collect memory usage from all shard ran
// for at least once, and that no defrag was done yet.
auto stats = shard->stats();
for (int i = 0; i < 3; i++) {
fibers_ext::SleepFor(100ms);
ThisFiber::SleepFor(100ms);
EXPECT_EQ(stats.defrag_realloc_total, 0);
}
});
@ -578,7 +578,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
auto stats = shard->stats();
for (int i = 0; i < kMaxDefragTriesForTests && stats.defrag_realloc_total == 0; i++) {
stats = shard->stats();
fibers_ext::SleepFor(220ms);
ThisFiber::SleepFor(220ms);
}
// make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.defrag_realloc_total, 0);

View file

@ -13,14 +13,15 @@ extern "C" {
#include <xxhash.h>
#include "base/string_view_sso.h"
#include "util/proactor_pool.h"
#include "util/sliding_counter.h"
//
#include "core/external_alloc.h"
#include "core/fibers.h"
#include "core/mi_memory_resource.h"
#include "core/tx_queue.h"
#include "server/db_slice.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/fibers_ext.h"
#include "util/proactor_pool.h"
#include "util/sliding_counter.h"
namespace dfly {
@ -72,7 +73,7 @@ class EngineShard {
return &mi_resource_;
}
::util::fibers_ext::FiberQueue* GetFiberQueue() {
FiberQueue* GetFiberQueue() {
return &queue_;
}
@ -188,8 +189,8 @@ class EngineShard {
// return true if we did not complete the shard scan
bool DoDefrag();
::util::fibers_ext::FiberQueue queue_;
util::fibers_ext::Fiber fiber_q_;
FiberQueue queue_;
Fiber fiber_q_;
TxQueue txq_;
MiMemoryResource mi_resource_;
@ -207,8 +208,8 @@ class EngineShard {
IntentLock shard_lock_;
uint32_t defrag_task_ = 0;
::util::fibers_ext::Fiber fiber_periodic_;
::util::fibers_ext::Done fiber_periodic_done_;
Fiber fiber_periodic_;
Done fiber_periodic_done_;
DefragTaskState defrag_state_;
std::unique_ptr<TieredStorage> tiered_storage_;
@ -275,7 +276,7 @@ class EngineShardSet {
// The functions running inside the shard queue run atomically (sequentially)
// with respect each other on the same shard.
template <typename U> void AwaitRunningOnShardQueue(U&& func) {
util::fibers_ext::BlockingCounter bc{unsigned(shard_queue_.size())};
BlockingCounter bc{unsigned(shard_queue_.size())};
for (size_t i = 0; i < shard_queue_.size(); ++i) {
Add(i, [&func, bc]() mutable {
func(EngineShard::tlocal());
@ -294,12 +295,12 @@ class EngineShardSet {
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
util::ProactorPool* pp_;
std::vector<util::fibers_ext::FiberQueue*> shard_queue_;
std::vector<FiberQueue*> shard_queue_;
};
template <typename U, typename P>
void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
util::fibers_ext::BlockingCounter bc{0};
BlockingCounter bc{0};
for (uint32_t i = 0; i < size(); ++i) {
if (!pred(i))
@ -316,7 +317,7 @@ void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
}
template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
util::fibers_ext::BlockingCounter bc{size()};
BlockingCounter bc{size()};
for (uint32_t i = 0; i < size(); ++i) {
util::ProactorBase* dest = pp_->at(i);

View file

@ -276,7 +276,7 @@ TEST_F(GenericFamilyTest, Move) {
ASSERT_THAT(Run({"get", "a"}), "test");
// Check MOVE awakes blocking operations
auto fb_blpop = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto fb_blpop = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
Run({"select", "1"});
auto resp = Run({"blpop", "l", "0"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));

View file

@ -136,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
void IoMgr::Shutdown() {
while (flags_val) {
fibers_ext::SleepFor(200us); // TODO: hacky for now.
ThisFiber::SleepFor(200us); // TODO: hacky for now.
}
}

View file

@ -62,7 +62,7 @@ class BufferedStreamerBase : public io::Sink {
protected:
bool producer_done_ = false; // whether producer is done
unsigned buffered_ = 0; // how many entries are buffered
::util::fibers_ext::EventCount waker_{}; // two sided waker
EventCount waker_{}; // two sided waker
const Cancellation* cll_; // global cancellation

View file

@ -17,7 +17,6 @@ namespace journal {
namespace fs = std::filesystem;
using namespace std;
using namespace util;
namespace fibers = boost::fibers;
namespace {
@ -55,7 +54,7 @@ error_code Journal::Close() {
VLOG(1) << "Journal::Close";
fibers_ext::Mutex ec_mu;
Mutex ec_mu;
error_code res;
lock_guard lk(state_mu_);

View file

@ -53,7 +53,7 @@ class Journal {
bool await);
private:
mutable util::fibers_ext::Mutex state_mu_;
mutable Mutex state_mu_;
std::atomic_bool lameduck_{false};
};

View file

@ -11,13 +11,11 @@
#include <filesystem>
#include "base/logging.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {
namespace journal {
using namespace std;
using namespace util;
namespace fibers = boost::fibers;
namespace fs = std::filesystem;
namespace {

View file

@ -10,7 +10,6 @@
#include "base/ring_buffer.h"
#include "server/common.h"
#include "server/journal/types.h"
#include "util/fibers/fibers_ext.h"
#include "util/uring/uring_file.h"
namespace dfly {

View file

@ -7,7 +7,7 @@
namespace dfly {
void JournalStreamer::Start(io::Sink* dest) {
write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest);
write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ =
journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) {
if (entry.opcode == journal::Op::NOOP) {

View file

@ -41,7 +41,7 @@ class JournalStreamer : protected BufferedStreamerBase {
uint32_t journal_cb_id_{0};
journal::Journal* journal_;
util::fibers_ext::Fiber write_fb_{};
Fiber write_fb_{};
JournalWriter writer_{this};
std::atomic_uint64_t record_cnt_{0};

View file

@ -111,17 +111,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
RespExpr resp0, resp1;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
resp0 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop0";
});
fibers_ext::SleepFor(50us);
ThisFiber::SleepFor(50us);
auto fb1 = pp_->at(1)->LaunchFiber([&] {
resp1 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop1";
});
fibers_ext::SleepFor(30us);
ThisFiber::SleepFor(30us);
RespExpr resp = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "x", "2", "1"}); });
ASSERT_THAT(resp, IntArg(2));
@ -150,7 +150,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
ASSERT_FALSE(IsLocked(0, kKey1));
ASSERT_FALSE(IsLocked(0, kKey2));
auto fb1 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto fb1 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
resp0 = Run({"blpop", kKey1, kKey2, "0"});
});
@ -203,7 +203,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
Run({"exists", kKey1, kKey2, kKey3});
ASSERT_EQ(3, GetDebugInfo().shards_count);
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
@ -242,7 +242,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
TEST_F(ListFamilyTest, BLPopSerialize) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
@ -312,7 +312,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -339,7 +339,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
TEST_F(ListFamilyTest, BPopSameKeyTwice) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
EXPECT_EQ(0, NumWatched());
});
@ -352,7 +352,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});
@ -370,7 +370,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
ASSERT_EQ(1, GetDebugInfo().shards_count);
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", "x", "y", "0"});
EXPECT_FALSE(IsLocked(0, "y"));
ASSERT_EQ(0, NumWatched());
@ -391,7 +391,7 @@ TEST_F(ListFamilyTest, BPopRename) {
Run({"exists", kKey1, kKey2});
ASSERT_EQ(2, GetDebugInfo().shards_count);
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -409,7 +409,7 @@ TEST_F(ListFamilyTest, BPopRename) {
TEST_F(ListFamilyTest, BPopFlush) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
@ -665,11 +665,11 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
for (int i = 0; i < 300; i++) {
Run(id, {"rpush", "a", "DATA"});
}
fibers_ext::SleepFor(50ms);
ThisFiber::SleepFor(50ms);
running = false;
};
vector<fibers_ext::Fiber> fbs;
vector<Fiber> fbs;
// more likely to reproduce the bug if we start pop_fiber first.
for (int i = 0; i < 2; i++) {
@ -715,10 +715,10 @@ TEST_F(ListFamilyTest, BRPopLPushSingleShardBlocking) {
RespExpr resp;
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
resp = Run({"brpoplpush", "x", "y", "0"});
});
fibers_ext::SleepFor(30us);
ThisFiber::SleepFor(30us);
pp_->at(1)->Await([&] { Run("B1", {"lpush", "y", "2"}); });
pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "1"}); });
@ -735,9 +735,9 @@ TEST_F(ListFamilyTest, BRPopContended) {
constexpr auto kNumFibers = 4;
// Run the fiber at creation.
fibers_ext::Fiber fb[kNumFibers];
Fiber fb[kNumFibers];
for (int i = 0; i < kNumFibers; i++) {
fb[i] = pp_->at(1)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
fb[i] = pp_->at(1)->LaunchFiber(Launch::dispatch, [&] {
string id = StrCat("id", i);
while (!done) {
Run(id, {"brpop", "k0", "k1", "k2", "k3", "k4", "0.1"});
@ -772,11 +772,11 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
ASSERT_EQ(0, NumWatched());
// Run the fiber at creation.
auto fb0 = pp_->at(0)->LaunchFiber(fibers_ext::Launch::dispatch, [&] {
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
resp = Run({"brpoplpush", "x", "z", "0"});
});
fibers_ext::SleepFor(30us);
ThisFiber::SleepFor(30us);
RespExpr resp_push = pp_->at(1)->Await([&] { return Run("B1", {"lpush", "z", "val2"}); });
ASSERT_THAT(resp_push, IntArg(1));

View file

@ -57,11 +57,10 @@ namespace dfly {
#endif
using namespace util;
using base::VarzValue;
using ::boost::intrusive_ptr;
namespace fibers = ::boost::fibers;
using absl::GetFlag;
using absl::StrCat;
using base::VarzValue;
using ::boost::intrusive_ptr;
using namespace facade;
namespace h2 = boost::beast::http;
@ -555,7 +554,7 @@ void Service::Shutdown() {
pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });
// wait for all the pending callbacks to stop.
fibers_ext::SleepFor(10ms);
ThisFiber::SleepFor(10ms);
}
static void MultiSetError(ConnectionContext* cntx) {

View file

@ -128,7 +128,7 @@ class Service : public facade::ServiceInterface {
CommandRegistry registry_;
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;
mutable util::fibers_ext::Mutex mu_;
mutable Mutex mu_;
GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_;
};

View file

@ -208,7 +208,7 @@ TEST_F(MultiTest, MultiConsistent) {
auto fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"multi"});
ASSERT_EQ(resp, "OK");
fibers_ext::SleepFor(1ms);
ThisFiber::SleepFor(1ms);
resp = Run({"get", kKey1});
ASSERT_EQ(resp, "QUEUED");

View file

@ -1851,7 +1851,7 @@ error_code RdbLoader::Load(io::Source* src) {
}
void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
fibers_ext::BlockingCounter bc(shard_set->size());
BlockingCounter bc(shard_set->size());
for (unsigned i = 0; i < shard_set->size(); ++i) {
// Flush the remaining items.
FlushShardAsync(i);

View file

@ -160,7 +160,7 @@ bool Replica::Start(ConnectionContext* cntx) {
cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this));
// 5. Spawn main coordination fiber.
sync_fb_ = fibers_ext::Fiber(&Replica::MainReplicationFb, this);
sync_fb_ = Fiber(&Replica::MainReplicationFb, this);
(*cntx)->SendOk();
return true;
@ -196,7 +196,7 @@ void Replica::MainReplicationFb() {
// 1. Connect socket.
if ((state_mask_ & R_TCP_CONNECTED) == 0) {
fibers_ext::SleepFor(500ms);
ThisFiber::SleepFor(500ms);
if (is_paused_)
continue;
@ -488,7 +488,7 @@ error_code Replica::InitiatePSync() {
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
// triggered before Redis is ready to transition to the streaming state and it silenty ignores
// "ACK 0". We reduce the chance it happens with this delay.
fibers_ext::SleepFor(50ms);
ThisFiber::SleepFor(50ms);
return error_code{};
}
@ -514,7 +514,7 @@ error_code Replica::InitiateDflySync() {
}
// Blocked on until all flows got full sync cut.
fibers_ext::BlockingCounter sync_block{num_df_flows_};
BlockingCounter sync_block{num_df_flows_};
// Switch to new error handler that closes flow sockets.
auto err_handler = [this, sync_block](const auto& ge) mutable {
@ -734,7 +734,7 @@ error_code Replica::SendNextPhaseRequest(bool stable) {
return std::error_code{};
}
error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* cntx) {
error_code Replica::StartFullSyncFlow(BlockingCounter sb, Context* cntx) {
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
RETURN_ON_ERR(ConnectAndAuth());
@ -771,7 +771,7 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c
// We can not discard io_buf because it may contain data
// besides the response we parsed. Therefore we pass it further to ReplicateDFFb.
sync_fb_ = fibers_ext::Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx);
sync_fb_ = Fiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx);
return error_code{};
}
@ -784,15 +784,15 @@ error_code Replica::StartStableSyncFlow(Context* cntx) {
CHECK(sock_->IsOpen());
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyReadFb, this, cntx);
sync_fb_ = Fiber(&Replica::StableSyncDflyReadFb, this, cntx);
if (use_multi_shard_exe_sync_) {
execution_fb_ = fibers_ext::Fiber(&Replica::StableSyncDflyExecFb, this, cntx);
execution_fb_ = Fiber(&Replica::StableSyncDflyExecFb, this, cntx);
}
return std::error_code{};
}
void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, Context* cntx) {
void Replica::FullSyncDflyFb(string eof_token, BlockingCounter bc, Context* cntx) {
DCHECK(leftover_buf_);
SocketSource ss{sock_.get()};
io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss};

View file

@ -13,7 +13,6 @@
#include "server/common.h"
#include "server/journal/types.h"
#include "util/fiber_socket_base.h"
#include "util/fibers/fibers_ext.h"
namespace facade {
class ReqSerializer;
@ -81,12 +80,12 @@ class Replica {
// Coorindator for multi shard execution.
struct MultiShardExecution {
util::fibers_ext::Mutex map_mu;
Mutex map_mu;
struct TxExecutionSync {
util::fibers_ext::Barrier barrier;
Barrier barrier;
std::atomic_uint32_t counter;
util::fibers_ext::BlockingCounter block;
BlockingCounter block;
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter), block(counter) {
}
@ -141,14 +140,13 @@ class Replica {
std::shared_ptr<MultiShardExecution> shared_exe_data);
// Start replica initialized as dfly flow.
std::error_code StartFullSyncFlow(util::fibers_ext::BlockingCounter block, Context* cntx);
std::error_code StartFullSyncFlow(BlockingCounter block, Context* cntx);
// Transition into stable state mode as dfly flow.
std::error_code StartStableSyncFlow(Context* cntx);
// Single flow full sync fiber spawned by StartFullSyncFlow.
void FullSyncDflyFb(std::string eof_token, util::fibers_ext::BlockingCounter block,
Context* cntx);
void FullSyncDflyFb(std::string eof_token, BlockingCounter block, Context* cntx);
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyReadFb(Context* cntx);
@ -225,7 +223,7 @@ class Replica {
std::queue<std::pair<TransactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
::util::fibers_ext::EventCount waker_; // waker for trans_data_queue_
EventCount waker_; // waker for trans_data_queue_
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
@ -233,13 +231,13 @@ class Replica {
std::atomic_uint64_t journal_rec_executed_ = 0;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
::util::fibers_ext::Fiber sync_fb_;
::util::fibers_ext::Fiber execution_fb_;
Fiber sync_fb_;
Fiber execution_fb_;
std::vector<std::unique_ptr<Replica>> shard_flows_;
// Guard operations where flows might be in a mixed state (transition/setup)
util::fibers_ext::Mutex flows_op_mu_;
Mutex flows_op_mu_;
std::optional<base::IoBuf> leftover_buf_;
std::unique_ptr<facade::RedisParser> parser_;

View file

@ -156,7 +156,7 @@ void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
absl::flat_hash_map<std::string, base::Histogram> result;
fibers_ext::Mutex mu;
Mutex mu;
shard_set->pool()->AwaitFiberOnAll([&](auto* pb) {
auto* ss = ServerState::tlocal();

View file

@ -70,7 +70,7 @@ class ScriptMgr {
ScriptParams default_params_;
absl::flat_hash_map<ScriptKey, InternalScriptData> db_;
mutable util::fibers_ext::Mutex mu_;
mutable Mutex mu_;
};
} // namespace dfly

View file

@ -471,7 +471,7 @@ void ServerFamily::Shutdown() {
// Load starts as many fibers as there are files to load each one separately.
// It starts one more fiber that waits for all load fibers to finish and returns the first
// error (if any occured) with a future.
fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
CHECK(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"));
vector<std::string> paths{{load_path}};
@ -482,7 +482,7 @@ fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_p
io::Result<io::StatShortVec> files = io::StatFiles(glob);
if (files && files->size() == 0) {
fibers_ext::Promise<std::error_code> ec_promise;
Promise<std::error_code> ec_promise;
ec_promise.set_value(make_error_code(errc::no_such_file_or_directory));
return ec_promise.get_future();
}
@ -498,7 +498,7 @@ fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_p
(void)fs::canonical(path, ec);
if (ec) {
LOG(ERROR) << "Error loading " << load_path << " " << ec.message();
fibers_ext::Promise<std::error_code> ec_promise;
Promise<std::error_code> ec_promise;
ec_promise.set_value(ec);
return ec_promise.get_future();
}
@ -524,7 +524,7 @@ fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_p
auto& pool = service_.proactor_pool();
vector<util::fibers_ext::Fiber> load_fibers;
vector<Fiber> load_fibers;
load_fibers.reserve(paths.size());
auto first_error = std::make_shared<AggregateError>();
@ -545,8 +545,8 @@ fibers_ext::Future<std::error_code> ServerFamily::Load(const std::string& load_p
load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber)));
}
boost::fibers::promise<std::error_code> ec_promise;
boost::fibers::future<std::error_code> ec_future = ec_promise.get_future();
Promise<std::error_code> ec_promise;
Future<std::error_code> ec_future = ec_promise.get_future();
// Run fiber that empties the channel and sets ec_promise.
auto load_join_fiber = [this, first_error, load_fibers = std::move(load_fibers),
@ -914,7 +914,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
vector<unique_ptr<RdbSnapshot>> snapshots;
absl::flat_hash_map<string_view, size_t> rdb_name_map;
fibers_ext::Mutex mu; // guards rdb_name_map
Mutex mu; // guards rdb_name_map
auto save_cb = [&](unsigned index) {
auto& snapshot = snapshots[index];
@ -1391,7 +1391,7 @@ static void MergeInto(const DbSlice::Stats& src, Metrics* dest) {
Metrics ServerFamily::GetMetrics() const {
Metrics result;
fibers_ext::Mutex mu;
Mutex mu;
auto cb = [&](ProactorBase* pb) {
EngineShard* shard = EngineShard::tlocal();

View file

@ -101,7 +101,7 @@ class ServerFamily {
// Load snapshot from file (.rdb file or summary.dfs file) and return
// future with error_code.
util::fibers_ext::Future<std::error_code> Load(const std::string& file_name);
Future<std::error_code> Load(const std::string& file_name);
// used within tests.
bool IsSaving() const {
@ -163,8 +163,8 @@ class ServerFamily {
void SnapshotScheduling(const SnapshotSpec& time);
util::fibers_ext::Fiber snapshot_fiber_;
util::fibers_ext::Future<std::error_code> load_result_;
Fiber snapshot_fiber_;
Future<std::error_code> load_result_;
uint32_t stats_caching_task_ = 0;
Service& service_;
@ -173,7 +173,7 @@ class ServerFamily {
util::ListenerInterface* main_listener_ = nullptr;
util::ProactorBase* pb_task_ = nullptr;
mutable util::fibers_ext::Mutex replicaof_mu_, save_mu_;
mutable Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
std::unique_ptr<ScriptMgr> script_mgr_;
@ -188,7 +188,7 @@ class ServerFamily {
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
std::atomic_bool is_saving_{false};
util::fibers_ext::Done is_snapshot_done_;
Done is_snapshot_done_;
std::unique_ptr<util::fibers_ext::FiberQueueThreadPool> fq_threadpool_;
};

View file

@ -137,7 +137,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << ThisFiber::GetName();
fibers_ext::Yield();
ThisFiber::Yield();
DVLOG(2) << "After sleep";
last_yield = stats_.loop_serialized;

View file

@ -130,8 +130,8 @@ class SliceSnapshot {
std::unique_ptr<RdbSerializer> serializer_;
util::fibers_ext::Mutex mu_;
::util::fibers_ext::Fiber snapshot_fb_; // IterateEntriesFb
Mutex mu_;
Fiber snapshot_fb_; // IterateEntriesFb
CompressionMode compression_mode_;
RdbTypeFreqMap type_freq_map_;

View file

@ -171,7 +171,7 @@ void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double t
auto timeout_micro = chrono::duration_cast<chrono::microseconds>(1000ms * timeout);
int64_t steps = timeout_micro.count() / step.count();
do {
fibers_ext::SleepFor(step);
ThisFiber::SleepFor(step);
} while (!IsLocked(db_index, key) && --steps > 0);
CHECK(IsLocked(db_index, key));
}

View file

@ -95,7 +95,7 @@ class BaseFamilyTest : public ::testing::Test {
unsigned num_threads_ = 3;
absl::flat_hash_map<std::string, std::unique_ptr<TestConnWrapper>> connections_;
util::fibers_ext::Mutex mu_;
Mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
std::vector<RespVec*> resp_vec_;

View file

@ -61,7 +61,7 @@ class TieredStorage {
size_t submitted_io_write_size_ = 0;
uint32_t num_active_requests_ = 0;
util::fibers_ext::EventCount active_req_sem_;
EventCount active_req_sem_;
struct PerDb;
std::vector<PerDb*> db_arr_;

View file

@ -19,7 +19,6 @@
#include "server/common.h"
#include "server/journal/types.h"
#include "server/table.h"
#include "util/fibers/fibers_ext.h"
namespace dfly {
@ -494,8 +493,8 @@ class Transaction {
uint32_t unique_shard_cnt_{0}; // Number of unique shards active
ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1
util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks
EventCount blocking_ec_; // Used to wake blocking transactions.
EventCount run_ec_; // Used to wait for shard callbacks
// Transaction coordinator state, written and read by coordinator thread.
// Can be read by shard threads as long as we respect ordering rules, i.e. when