chore: Update helio dependency (#553)

Switch to using fibers_ext::Fiber.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-09 16:43:41 +02:00 committed by GitHub
parent c34270c02d
commit 23c902d8e9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 74 additions and 79 deletions

2
helio

@ -1 +1 @@
Subproject commit 8d0f0cc37908623705125128d3c64d35d410fb0f Subproject commit 0c0afcbc85a5ac3347b07a76bb06c0fc98d0fc79

View file

@ -17,6 +17,7 @@
#include "facade/redis_parser.h" #include "facade/redis_parser.h"
#include "facade/service_interface.h" #include "facade/service_interface.h"
#include "util/fiber_sched_algo.h" #include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"
#ifdef DFLY_USE_SSL #ifdef DFLY_USE_SSL
#include "util/tls/tls_socket.h" #include "util/tls/tls_socket.h"
@ -29,7 +30,7 @@ ABSL_FLAG(bool, http_admin_console, true, "If true allows accessing http console
using namespace util; using namespace util;
using namespace std; using namespace std;
using nonstd::make_unexpected; using nonstd::make_unexpected;
namespace this_fiber = boost::this_fiber;
namespace fibers = boost::fibers; namespace fibers = boost::fibers;
namespace facade { namespace facade {
@ -267,7 +268,7 @@ void Connection::UnregisterShutdownHook(ShutdownHandle id) {
} }
void Connection::HandleRequests() { void Connection::HandleRequests() {
this_fiber::properties<FiberProps>().set_name("DflyConnection"); FiberProps::SetName("DflyConnection");
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get()); LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
@ -508,7 +509,7 @@ auto Connection::ParseRedis() -> ParserStatus {
if (dispatch_q_.size() == 1) { if (dispatch_q_.size() == 1) {
evc_.notify(); evc_.notify();
} else if (dispatch_q_.size() > 10) { } else if (dispatch_q_.size() > 10) {
this_fiber::yield(); fibers_ext::Yield();
} }
} }
} }
@ -715,7 +716,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
// InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the // InputLoop. Note: in some cases, InputLoop may decide to dispatch directly and bypass the
// DispatchFiber. // DispatchFiber.
void Connection::DispatchFiber(util::FiberSocketBase* peer) { void Connection::DispatchFiber(util::FiberSocketBase* peer) {
this_fiber::properties<FiberProps>().set_name("DispatchFiber"); FiberProps::SetName("DispatchFiber");
SinkReplyBuilder* builder = cc_->reply_builder(); SinkReplyBuilder* builder = cc_->reply_builder();
DispatchOperations dispatch_op{builder, this}; DispatchOperations dispatch_op{builder, this};

View file

@ -21,6 +21,7 @@
#include "server/string_family.h" #include "server/string_family.h"
#include "server/transaction.h" #include "server/transaction.h"
#include "util/fiber_sched_algo.h" #include "util/fiber_sched_algo.h"
#include "util/fibers/fiber.h"
using namespace std; using namespace std;
@ -30,7 +31,6 @@ ABSL_DECLARE_FLAG(string, dbfilename);
namespace dfly { namespace dfly {
using namespace util; using namespace util;
namespace this_fiber = ::boost::this_fiber;
using boost::intrusive_ptr; using boost::intrusive_ptr;
using boost::fibers::fiber; using boost::fibers::fiber;
using namespace facade; using namespace facade;
@ -275,7 +275,7 @@ void DebugCmd::Populate(CmdArgList args) {
} }
ranges.emplace_back(from, total_count - from); ranges.emplace_back(from, total_count - from);
vector<fiber> fb_arr(ranges.size()); vector<fibers_ext::Fiber> fb_arr(ranges.size());
for (size_t i = 0; i < ranges.size(); ++i) { for (size_t i = 0; i < ranges.size(); ++i) {
auto range = ranges[i]; auto range = ranges[i];
@ -285,14 +285,14 @@ void DebugCmd::Populate(CmdArgList args) {
}); });
} }
for (auto& fb : fb_arr) for (auto& fb : fb_arr)
fb.join(); fb.Join();
(*cntx_)->SendOk(); (*cntx_)->SendOk();
} }
void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix, void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view prefix,
unsigned value_len, bool populate_random_values) { unsigned value_len, bool populate_random_values) {
this_fiber::properties<FiberProps>().set_name("populate_range"); FiberProps::SetName("populate_range");
VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1); VLOG(1) << "PopulateRange: " << from << "-" << (from + len - 1);
string key = absl::StrCat(prefix, ":"); string key = absl::StrCat(prefix, ":");
@ -313,7 +313,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
ess.Add(sid, [=] { ess.Add(sid, [=] {
DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch); DoPopulateBatch(prefix, value_len, populate_random_values, params, shard_batch);
if (i % 50 == 0) { if (i % 50 == 0) {
this_fiber::yield(); fibers_ext::Yield();
} }
}); });

View file

@ -27,7 +27,6 @@ using absl::StrCat;
using ::io::Result; using ::io::Result;
using testing::ElementsAre; using testing::ElementsAre;
using testing::HasSubstr; using testing::HasSubstr;
namespace this_fiber = boost::this_fiber;
namespace { namespace {
@ -178,7 +177,7 @@ TEST_F(DflyEngineTest, MultiConsistent) {
auto fb = pp_->at(1)->LaunchFiber([&] { auto fb = pp_->at(1)->LaunchFiber([&] {
RespExpr resp = Run({"multi"}); RespExpr resp = Run({"multi"});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
this_fiber::sleep_for(1ms); fibers_ext::SleepFor(1ms);
resp = Run({"get", kKey1}); resp = Run({"get", kKey1});
ASSERT_EQ(resp, "QUEUED"); ASSERT_EQ(resp, "QUEUED");
@ -201,8 +200,8 @@ TEST_F(DflyEngineTest, MultiConsistent) {
EXPECT_EQ(sub_arr[0].GetBuf(), resp_arr[0].GetBuf()); EXPECT_EQ(sub_arr[0].GetBuf(), resp_arr[0].GetBuf());
}); });
mset_fb.join(); mset_fb.Join();
fb.join(); fb.Join();
ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4)); ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked()); ASSERT_FALSE(service_->IsShardSetLocked());
@ -271,8 +270,8 @@ TEST_F(DflyEngineTest, MultiHop) {
} }
}); });
p1_fb.join(); p1_fb.Join();
p2_fb.join(); p2_fb.Join();
} }
TEST_F(DflyEngineTest, FlushDb) { TEST_F(DflyEngineTest, FlushDb) {
@ -294,7 +293,7 @@ TEST_F(DflyEngineTest, FlushDb) {
} }
}); });
fb0.join(); fb0.Join();
ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4)); ASSERT_FALSE(service_->IsLocked(0, kKey4));
@ -472,12 +471,12 @@ TEST_F(DflyEngineTest, FlushAll) {
for (size_t i = 1; i < 100; ++i) { for (size_t i = 1; i < 100; ++i) {
RespExpr resp = Run({"set", "foo", "bar"}); RespExpr resp = Run({"set", "foo", "bar"});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
this_fiber::yield(); fibers_ext::Yield();
} }
}); });
fb0.join(); fb0.Join();
fb1.join(); fb1.Join();
} }
TEST_F(DflyEngineTest, OOM) { TEST_F(DflyEngineTest, OOM) {
@ -791,7 +790,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal(); EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
this_fiber::sleep_for(100ms); fibers_ext::SleepFor(100ms);
EXPECT_EQ(shard->stats().defrag_realloc_total, 0); EXPECT_EQ(shard->stats().defrag_realloc_total, 0);
// we are expecting to have at least one try by now // we are expecting to have at least one try by now
EXPECT_GT(shard->stats().defrag_task_invocation_total, 0); EXPECT_GT(shard->stats().defrag_task_invocation_total, 0);
@ -814,7 +813,7 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
if (stats.defrag_realloc_total > 0) { if (stats.defrag_realloc_total > 0) {
break; break;
} }
this_fiber::sleep_for(220ms); fibers_ext::SleepFor(220ms);
} }
// make sure that we successfully found places to defrag in memory // make sure that we successfully found places to defrag in memory
EXPECT_GT(stats.defrag_realloc_total, 0); EXPECT_GT(stats.defrag_realloc_total, 0);

View file

@ -45,7 +45,6 @@ ABSL_FLAG(float, mem_utilization_threshold, 0.8,
namespace dfly { namespace dfly {
using namespace util; using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers; namespace fibers = ::boost::fibers;
using absl::GetFlag; using absl::GetFlag;
@ -179,7 +178,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index)); FiberProps::SetName(absl::StrCat("shard_queue", index));
queue_.Run(); queue_.Run();
}); });

View file

@ -93,8 +93,8 @@ TEST_F(GenericFamilyTest, Del) {
} }
}); });
exist_fb.join(); exist_fb.Join();
del_fb.join(); del_fb.Join();
} }
TEST_F(GenericFamilyTest, TTL) { TEST_F(GenericFamilyTest, TTL) {
@ -160,8 +160,8 @@ TEST_F(GenericFamilyTest, Rename) {
} }
}); });
exist_fb.join(); exist_fb.Join();
ren_fb.join(); ren_fb.Join();
} }
TEST_F(GenericFamilyTest, RenameNonString) { TEST_F(GenericFamilyTest, RenameNonString) {
@ -275,7 +275,7 @@ TEST_F(GenericFamilyTest, Move) {
Run({"move", "l", "1"}); Run({"move", "l", "1"});
}); });
fb_blpop.join(); fb_blpop.Join();
} }
using testing::AnyOf; using testing::AnyOf;

View file

@ -21,7 +21,6 @@ using namespace util;
using namespace facade; using namespace facade;
using uring::FiberCall; using uring::FiberCall;
using uring::Proactor; using uring::Proactor;
namespace this_fiber = ::boost::this_fiber;
namespace { namespace {
@ -137,7 +136,7 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
void IoMgr::Shutdown() { void IoMgr::Shutdown() {
while (flags_val) { while (flags_val) {
this_fiber::sleep_for(200us); // TODO: hacky for now. fibers_ext::SleepFor(200us); // TODO: hacky for now.
} }
} }

View file

@ -19,7 +19,6 @@
using namespace testing; using namespace testing;
using namespace std; using namespace std;
using namespace util; using namespace util;
namespace this_fiber = ::boost::this_fiber;
namespace fibers = ::boost::fibers; namespace fibers = ::boost::fibers;
namespace dfly { namespace dfly {
@ -94,17 +93,17 @@ TEST_F(ListFamilyTest, BLPopBlocking) {
LOG(INFO) << "pop0"; LOG(INFO) << "pop0";
}); });
this_fiber::sleep_for(50us); fibers_ext::SleepFor(50us);
auto fb1 = pp_->at(1)->LaunchFiber([&] { auto fb1 = pp_->at(1)->LaunchFiber([&] {
resp1 = Run({"blpop", "x", "0"}); resp1 = Run({"blpop", "x", "0"});
LOG(INFO) << "pop1"; LOG(INFO) << "pop1";
}); });
this_fiber::sleep_for(30us); fibers_ext::SleepFor(30us);
pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); }); pp_->at(1)->Await([&] { Run("B1", {"lpush", "x", "2", "1"}); });
fb0.join(); fb0.Join();
fb1.join(); fb1.Join();
// fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'. // fb0 should start first and be the first transaction blocked. Therefore, it should pop '1'.
// sometimes order is switched, need to think how to fix it. // sometimes order is switched, need to think how to fix it.
@ -131,7 +130,7 @@ TEST_F(ListFamilyTest, BLPopMultiple) {
}); });
pp_->at(1)->Await([&] { Run({"lpush", kKey1, "1", "2", "3"}); }); pp_->at(1)->Await([&] { Run({"lpush", kKey1, "1", "2", "3"}); });
fb1.join(); fb1.Join();
ASSERT_THAT(resp0, ArrLen(2)); ASSERT_THAT(resp0, ArrLen(2));
EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3")); EXPECT_THAT(resp0.GetVec(), ElementsAre(kKey1, "3"));
@ -202,10 +201,10 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
ASSERT_THAT(resp, ArrLen(6)); ASSERT_THAT(resp, ArrLen(6));
}); });
p1_fb.join(); p1_fb.Join();
p2_fb.join(); p2_fb.Join();
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec(); auto resp_arr = blpop_resp.GetVec();
@ -267,10 +266,10 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
LOG(INFO) << "push2 ts: " << cl2; LOG(INFO) << "push2 ts: " << cl2;
}); });
p1_fb.join(); p1_fb.Join();
p2_fb.join(); p2_fb.Join();
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec(); auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING))); EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING)));
@ -303,8 +302,8 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
Run({"lpush", kKey1, "B"}); Run({"lpush", kKey1, "B"});
}); });
p1_fb.join(); p1_fb.Join();
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "B")); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "B"));
} }
@ -321,7 +320,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1); WaitUntilLocked(0, kKey1);
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); }); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
@ -333,7 +332,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
WaitUntilLocked(0, kKey1); WaitUntilLocked(0, kKey1);
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); }); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar")); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey2, "bar"));
@ -355,7 +354,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
WaitUntilLocked(0, "x"); WaitUntilLocked(0, "x");
pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); }); pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar")); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre("x", "bar"));
@ -377,7 +376,7 @@ TEST_F(ListFamilyTest, BPopRename) {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"})); EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Run({"rename", "a", kKey1}); Run({"rename", "a", kKey1});
}); });
pop_fb.join(); pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2)); ASSERT_THAT(blpop_resp, ArrLen(2));
EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar")); EXPECT_THAT(blpop_resp.GetVec(), ElementsAre(kKey1, "bar"));
@ -395,7 +394,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
Run({"flushdb"}); Run({"flushdb"});
EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"}));
}); });
pop_fb.join(); pop_fb.Join();
} }
TEST_F(ListFamilyTest, LRem) { TEST_F(ListFamilyTest, LRem) {
@ -641,24 +640,23 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
for (int i = 0; i < 1000; i++) { for (int i = 0; i < 1000; i++) {
Run(id, {"rpush", "a", "DATA"}); Run(id, {"rpush", "a", "DATA"});
} }
::boost::this_fiber::sleep_for(100ms); fibers_ext::SleepFor(100ms);
running = false; running = false;
}; };
vector<boost::fibers::fiber> fbs; vector<fibers_ext::Fiber> fbs;
// more likely to reproduce the bug if we start pop_fiber first. // more likely to reproduce the bug if we start pop_fiber first.
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber)); fbs.push_back(pp_->at(i)->LaunchFiber(pop_fiber));
} }
for (int i = 0; i < 2; i++) { for (int i = 0; i < 2; i++) {
fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber)); fbs.push_back(pp_->at(i)->LaunchFiber(push_fiber));
} }
for (auto& f : fbs) for (auto& f : fbs)
f.join(); f.Join();
} }
} // namespace dfly } // namespace dfly

View file

@ -56,7 +56,6 @@ using namespace util;
using base::VarzValue; using base::VarzValue;
using ::boost::intrusive_ptr; using ::boost::intrusive_ptr;
namespace fibers = ::boost::fibers; namespace fibers = ::boost::fibers;
namespace this_fiber = ::boost::this_fiber;
using absl::GetFlag; using absl::GetFlag;
using absl::StrCat; using absl::StrCat;
using namespace facade; using namespace facade;
@ -510,7 +509,7 @@ void Service::Shutdown() {
shard_set->Shutdown(); shard_set->Shutdown();
// wait for all the pending callbacks to stop. // wait for all the pending callbacks to stop.
boost::this_fiber::sleep_for(10ms); fibers_ext::SleepFor(10ms);
} }
static void MultiSetError(ConnectionContext* cntx) { static void MultiSetError(ConnectionContext* cntx) {

View file

@ -226,7 +226,7 @@ TEST_F(RdbTest, SaveFlush) {
} while (!service_->server_family().IsSaving()); } while (!service_->server_family().IsSaving());
Run({"flushdb"}); Run({"flushdb"});
save_fb.join(); save_fb.Join();
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size()); ASSERT_EQ(1, save_info->freq_map.size());
auto& k_v = save_info->freq_map.front(); auto& k_v = save_info->freq_map.front();
@ -261,7 +261,7 @@ TEST_F(RdbTest, SaveManyDbs) {
} }
}); });
save_fb.join(); save_fb.Join();
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size()); ASSERT_EQ(1, save_info->freq_map.size());

View file

@ -28,7 +28,6 @@ using namespace util;
using namespace boost::asio; using namespace boost::asio;
using namespace facade; using namespace facade;
using absl::StrCat; using absl::StrCat;
namespace this_fiber = ::boost::this_fiber;
namespace { namespace {
@ -171,7 +170,7 @@ void Replica::MainReplicationFb() {
while (state_mask_ & R_ENABLED) { while (state_mask_ & R_ENABLED) {
// 1. Connect socket. // 1. Connect socket.
if ((state_mask_ & R_TCP_CONNECTED) == 0) { if ((state_mask_ & R_TCP_CONNECTED) == 0) {
this_fiber::sleep_for(500ms); fibers_ext::SleepFor(500ms);
if (is_paused_) if (is_paused_)
continue; continue;
@ -210,7 +209,7 @@ void Replica::MainReplicationFb() {
// triggered // triggered
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK // before Redis is ready to transition to the streaming state and it silenty ignores "ACK
// 0". We reduce the chance it happens with this delay. // 0". We reduce the chance it happens with this delay.
this_fiber::sleep_for(50ms); fibers_ext::SleepFor(50ms);
} }
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);

View file

@ -403,8 +403,8 @@ void ServerFamily::Shutdown() {
load_result_.wait(); load_result_.wait();
is_snapshot_done_.Notify(); is_snapshot_done_.Notify();
if (snapshot_fiber_.joinable()) { if (snapshot_fiber_.IsJoinable()) {
snapshot_fiber_.join(); snapshot_fiber_.Join();
} }
pb_task_->Await([this] { pb_task_->Await([this] {
@ -479,7 +479,7 @@ fibers::future<std::error_code> ServerFamily::Load(const std::string& load_path)
auto& pool = service_.proactor_pool(); auto& pool = service_.proactor_pool();
std::vector<::boost::fibers::fiber> load_fibers; vector<util::fibers_ext::Fiber> load_fibers;
load_fibers.reserve(paths.size()); load_fibers.reserve(paths.size());
auto first_error = std::make_shared<AggregateError>(); auto first_error = std::make_shared<AggregateError>();
@ -507,7 +507,7 @@ fibers::future<std::error_code> ServerFamily::Load(const std::string& load_path)
auto load_join_fiber = [this, first_error, load_fibers = std::move(load_fibers), auto load_join_fiber = [this, first_error, load_fibers = std::move(load_fibers),
ec_promise = std::move(ec_promise)]() mutable { ec_promise = std::move(ec_promise)]() mutable {
for (auto& fiber : load_fibers) { for (auto& fiber : load_fibers) {
fiber.join(); fiber.Join();
} }
VLOG(1) << "Load finished"; VLOG(1) << "Load finished";

View file

@ -9,6 +9,7 @@
#include "facade/conn_context.h" #include "facade/conn_context.h"
#include "facade/redis_parser.h" #include "facade/redis_parser.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "util/fibers/fiber.h"
#include "util/proactor_pool.h" #include "util/proactor_pool.h"
namespace util { namespace util {
@ -148,7 +149,7 @@ class ServerFamily {
void SnapshotScheduling(const SnapshotSpec& time); void SnapshotScheduling(const SnapshotSpec& time);
boost::fibers::fiber snapshot_fiber_; util::fibers_ext::Fiber snapshot_fiber_;
boost::fibers::future<std::error_code> load_result_; boost::fibers::future<std::error_code> load_result_;
uint32_t stats_caching_task_ = 0; uint32_t stats_caching_task_ = 0;

View file

@ -108,7 +108,7 @@ void SliceSnapshot::Join() {
void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
{ {
auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex()); auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex());
this_fiber::properties<FiberProps>().set_name(std::move(fiber_name)); FiberProps::SetName(std::move(fiber_name));
} }
PrimeTable::Cursor cursor; PrimeTable::Cursor cursor;
@ -138,7 +138,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
if (stats_.serialized >= last_yield + 100) { if (stats_.serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name(); DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
this_fiber::yield(); fibers_ext::Yield();
DVLOG(2) << "After sleep"; DVLOG(2) << "After sleep";
last_yield = stats_.serialized; last_yield = stats_.serialized;

View file

@ -223,8 +223,8 @@ TEST_F(StringFamilyTest, MGetSet) {
} }
}); });
mget_fb.join(); mget_fb.Join();
set_fb.join(); set_fb.Join();
} }
TEST_F(StringFamilyTest, MSetGet) { TEST_F(StringFamilyTest, MSetGet) {
@ -260,8 +260,8 @@ TEST_F(StringFamilyTest, MSetGet) {
} }
}); });
mset_fb.join(); mset_fb.Join();
get_fb.join(); get_fb.Join();
} }
TEST_F(StringFamilyTest, MSetDel) { TEST_F(StringFamilyTest, MSetDel) {
@ -277,8 +277,8 @@ TEST_F(StringFamilyTest, MSetDel) {
} }
}); });
mset_fb.join(); mset_fb.Join();
del_fb.join(); del_fb.Join();
} }
TEST_F(StringFamilyTest, IntKey) { TEST_F(StringFamilyTest, IntKey) {
@ -310,8 +310,8 @@ TEST_F(StringFamilyTest, SingleShard) {
Run({"mget", "x", "b", "y"}); Run({"mget", "x", "b", "y"});
} }
}); });
mset_fb.join(); mset_fb.Join();
mget_fb.join(); mget_fb.Join();
} }
TEST_F(StringFamilyTest, MSetIncr) { TEST_F(StringFamilyTest, MSetIncr) {
@ -352,8 +352,8 @@ TEST_F(StringFamilyTest, MSetIncr) {
ASSERT_LE(a, c); ASSERT_LE(a, c);
} }
}); });
mset_fb.join(); mset_fb.Join();
get_fb.join(); get_fb.Join();
} }
TEST_F(StringFamilyTest, SetEx) { TEST_F(StringFamilyTest, SetEx) {

View file

@ -166,7 +166,7 @@ void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double t
auto timeout_micro = chrono::duration_cast<chrono::microseconds>(1000ms * timeout); auto timeout_micro = chrono::duration_cast<chrono::microseconds>(1000ms * timeout);
int64_t steps = timeout_micro.count() / step.count(); int64_t steps = timeout_micro.count() / step.count();
do { do {
::boost::this_fiber::sleep_for(step); fibers_ext::SleepFor(step);
} while (!IsLocked(db_index, key) && --steps > 0); } while (!IsLocked(db_index, key) && --steps > 0);
CHECK(IsLocked(db_index, key)); CHECK(IsLocked(db_index, key));
} }