From 1a5eacca87fa2a08995594df47877f2f77bd84be Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 22 Apr 2024 20:18:10 +0300 Subject: [PATCH] chore: Pull helio with new future (#2944) --- helio | 2 +- src/server/blocking_controller_test.cc | 2 +- src/server/channel_store.cc | 2 +- src/server/cluster/cluster_family.cc | 2 +- src/server/common.cc | 2 +- src/server/conn_context.cc | 4 +- src/server/debugcmd.cc | 5 +- src/server/dflycmd.cc | 2 +- src/server/main_service.cc | 4 +- src/server/memory_cmd.cc | 12 ++--- src/server/script_mgr.cc | 2 +- src/server/server_family.cc | 71 +++++++++++++------------- src/server/server_family.h | 4 +- src/server/string_family.cc | 2 +- src/server/test_utils.cc | 5 +- src/server/tiering/op_manager.cc | 4 +- src/server/tiering/op_manager.h | 2 +- src/server/tiering/op_manager_test.cc | 6 +-- 18 files changed, 67 insertions(+), 66 deletions(-) diff --git a/helio b/helio index 499a3f573..8095758cd 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 499a3f5736935ea11a0c531bb10c60dc6c101657 +Subproject commit 8095758cd5ed11bd40b76bbf20c852bde6180c95 diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 6d1e746fc..2079f244a 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -44,7 +44,7 @@ class BlockingControllerTest : public Test { void BlockingControllerTest::SetUp() { pp_.reset(fb2::Pool::Epoll(kNumThreads)); pp_->Run(); - pp_->Await([](unsigned index, ProactorBase* p) { + pp_->AwaitBrief([](unsigned index, ProactorBase* p) { ServerState::Init(index, kNumThreads, nullptr); if (facade::tl_facade_stats == nullptr) { facade::tl_facade_stats = new facade::FacadeStats; diff --git a/src/server/channel_store.cc b/src/server/channel_store.cc index 736c4e672..822f82d9d 100644 --- a/src/server/channel_store.cc +++ b/src/server/channel_store.cc @@ -218,7 +218,7 @@ void ChannelStoreUpdater::Apply() { // structs. This means that any point on the other thread is safe to update the channel store. // Regardless of whether we need to replace, we dispatch to make sure all // queued SubscribeMaps in the freelist are no longer in use. - shard_set->pool()->Await([](unsigned idx, util::ProactorBase*) { + shard_set->pool()->AwaitBrief([](unsigned idx, util::ProactorBase*) { ServerState::tlocal()->UpdateChannelStore( ChannelStore::control_block.most_recent.load(memory_order_relaxed)); }); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index ecd5796d6..cc907d65d 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -658,7 +658,7 @@ static string_view StateToStr(MigrationState state) { static uint64_t GetKeyCount(const SlotRanges& slots) { atomic_uint64_t keys = 0; - shard_set->pool()->Await([&](auto*) { + shard_set->pool()->AwaitFiberOnAll([&](auto*) { EngineShard* shard = EngineShard::tlocal(); if (shard == nullptr) return; diff --git a/src/server/common.cc b/src/server/common.cc index 4f6084261..358093457 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -61,7 +61,7 @@ thread_local std::optional locktag_lock_options; void TEST_InvalidateLockTagOptions() { locktag_lock_options = nullopt; // For test main thread CHECK(shard_set != nullptr); - shard_set->pool()->Await( + shard_set->pool()->AwaitBrief( [](ShardId shard, ProactorBase* proactor) { locktag_lock_options = nullopt; }); } diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 55152c75b..011793636 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -114,8 +114,8 @@ void ConnectionContext::ChangeMonitor(bool start) { my_monitors.Remove(conn()); } // Tell other threads that about the change in the number of connection that we monitor - shard_set->pool()->Await( - [start](auto*) { ServerState::tlocal()->Monitors().NotifyChangeCount(start); }); + shard_set->pool()->AwaitBrief( + [start](unsigned, auto*) { ServerState::tlocal()->Monitors().NotifyChangeCount(start); }); EnableMonitoring(start); } diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 4465bc886..2b66913d9 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -560,9 +560,8 @@ void DebugCmd::Load(string_view filename) { path = dir_path; } - auto fut_ec = sf_.Load(path.generic_string()); - if (fut_ec.valid()) { - GenericError ec = fut_ec.get(); + if (auto fut_ec = sf_.Load(path.generic_string()); fut_ec) { + GenericError ec = fut_ec->Get(); if (ec) { string msg = ec.Format(); LOG(WARNING) << "Could not load file " << msg; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 86c255de5..0e5f5db81 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -400,7 +400,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled // after this function exits but before the actual shutdown. facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners(), cntx->conn()}; - shard_set->pool()->Await([&](unsigned index, auto* pb) { + shard_set->pool()->AwaitBrief([&](unsigned index, auto* pb) { sf_->CancelBlockingOnThread(); tracker.TrackOnThread(); }); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b35783800..f768f3426 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -855,7 +855,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector } // Must initialize before the shard_set because EngineShard::Init references ServerState. - pp_.Await([&](uint32_t index, ProactorBase* pb) { + pp_.AwaitBrief([&](uint32_t index, ProactorBase* pb) { tl_facade_stats = new FacadeStats; ServerState::Init(index, shard_num, &user_registry_); }); @@ -873,7 +873,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector server_family_.Init(acceptor, std::move(listeners)); ChannelStore* cs = new ChannelStore{}; - pp_.Await( + pp_.AwaitBrief( [cs](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->UpdateChannelStore(cs); }); } diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index 03d0c6f00..87faadd84 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -138,7 +138,7 @@ void MemoryCmd::Run(CmdArgList args) { } if (sub_cmd == "DECOMMIT") { - shard_set->pool()->Await([](auto* pb) { + shard_set->pool()->AwaitBrief([](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap | ServerState::kGlibcmalloc); }); @@ -196,7 +196,7 @@ ConnectionMemoryUsage GetConnectionMemoryUsage(ServerFamily* server) { }); } - shard_set->pool()->Await([&](unsigned index, auto*) { + shard_set->pool()->AwaitBrief([&](unsigned index, auto*) { mems[index].pipelined_bytes += tl_facade_stats->conn_stats.pipeline_cmd_cache_bytes; mems[index].pipelined_bytes += tl_facade_stats->conn_stats.dispatch_queue_bytes; }); @@ -372,7 +372,7 @@ void MemoryCmd::Track(CmdArgList args) { } atomic_bool error{false}; - shard_set->pool()->Await([&](unsigned index, auto*) { + shard_set->pool()->AwaitBrief([&](unsigned index, auto*) { if (!AllocationTracker::Get().Add(tracking_info)) { error.store(true); } @@ -392,7 +392,7 @@ void MemoryCmd::Track(CmdArgList args) { } atomic_bool error{false}; - shard_set->pool()->Await([&, lo = lower_bound, hi = upper_bound](unsigned index, auto*) { + shard_set->pool()->AwaitBrief([&, lo = lower_bound, hi = upper_bound](unsigned index, auto*) { if (!AllocationTracker::Get().Remove(lo, hi)) { error.store(true); } @@ -406,7 +406,7 @@ void MemoryCmd::Track(CmdArgList args) { } if (sub_cmd == "CLEAR") { - shard_set->pool()->Await([&](unsigned index, auto*) { AllocationTracker::Get().Clear(); }); + shard_set->pool()->AwaitBrief([&](unsigned index, auto*) { AllocationTracker::Get().Clear(); }); return cntx_->SendOk(); } @@ -433,7 +433,7 @@ void MemoryCmd::Track(CmdArgList args) { } atomic_bool found{false}; - shard_set->pool()->Await([&](unsigned index, auto*) { + shard_set->pool()->AwaitBrief([&](unsigned index, auto*) { if (mi_heap_check_owned(mi_heap_get_backing(), (void*)ptr)) { found.store(true); } diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index 214d9d354..02820abf9 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -336,7 +336,7 @@ vector> ScriptMgr::GetAll() const { } void ScriptMgr::UpdateScriptCaches(ScriptKey sha, ScriptParams params) const { - shard_set->pool()->Await([&sha, ¶ms](auto index, auto* pb) { + shard_set->pool()->AwaitBrief([&sha, ¶ms](auto index, auto* pb) { ServerState::tlocal()->SetScriptParams(sha, params); }); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f485d3a3f..4f61b30ad 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -277,8 +277,8 @@ template void UpdateMax(T* maxv, T current) { } void SetMasterFlagOnAllThreads(bool is_master) { - auto cb = [is_master](auto* pb) { ServerState::tlocal()->is_master = is_master; }; - shard_set->pool()->Await(cb); + auto cb = [is_master](unsigned, auto*) { ServerState::tlocal()->is_master = is_master; }; + shard_set->pool()->AwaitBrief(cb); } std::optional InferSnapshotCronExpr() { @@ -616,7 +616,7 @@ std::optional Pause(std::vector listeners, facade // command that did not pause on the new state yet we will pause after waking up. DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */, true /*ignore blocking*/}; - shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) { + shard_set->pool()->AwaitBrief([&tracker, pause_state](unsigned, util::ProactorBase*) { // Commands don't suspend before checking the pause state, so // it's impossible to deadlock on waiting for a command that will be paused. tracker.TrackOnThread(); @@ -628,7 +628,7 @@ std::optional Pause(std::vector listeners, facade const absl::Duration kDispatchTimeout = absl::Seconds(1); if (!tracker.Wait(kDispatchTimeout)) { LOG(WARNING) << "Couldn't wait for commands to finish dispatching in " << kDispatchTimeout; - shard_set->pool()->Await([pause_state](util::ProactorBase* pb) { + shard_set->pool()->AwaitBrief([pause_state](unsigned, util::ProactorBase*) { ServerState::tlocal()->SetPauseState(pause_state, false); }); return std::nullopt; @@ -821,8 +821,9 @@ void ServerFamily::JoinSnapshotSchedule() { void ServerFamily::Shutdown() { VLOG(1) << "ServerFamily::Shutdown"; - if (load_result_.valid()) - load_result_.wait(); + if (load_result_) { + std::exchange(load_result_, std::nullopt)->Get(); + } JoinSnapshotSchedule(); @@ -864,14 +865,14 @@ struct AggregateLoadResult { // Load starts as many fibers as there are files to load each one separately. // It starts one more fiber that waits for all load fibers to finish and returns the first // error (if any occured) with a future. -fb2::Future ServerFamily::Load(const std::string& load_path) { +std::optional> ServerFamily::Load(const std::string& load_path) { auto paths_result = snapshot_storage_->LoadPaths(load_path); if (!paths_result) { LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format(); - fb2::Promise ec_promise; - ec_promise.set_value(paths_result.error()); - return ec_promise.get_future(); + fb2::Future future; + future.Resolve(paths_result.error()); + return future; } std::vector paths = *paths_result; @@ -913,12 +914,11 @@ fb2::Future ServerFamily::Load(const std::string& load_path) { load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber))); } - fb2::Promise ec_promise; - fb2::Future ec_future = ec_promise.get_future(); + fb2::Future future; // Run fiber that empties the channel and sets ec_promise. auto load_join_fiber = [this, aggregated_result, load_fibers = std::move(load_fibers), - ec_promise = std::move(ec_promise)]() mutable { + future]() mutable { for (auto& fiber : load_fibers) { fiber.Join(); } @@ -932,11 +932,11 @@ fb2::Future ServerFamily::Load(const std::string& load_path) { LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - ec_promise.set_value(*(aggregated_result->first_error)); + future.Resolve(*(aggregated_result->first_error)); }; pool.GetNextProactor()->Dispatch(std::move(load_join_fiber)); - return ec_future; + return future; } void ServerFamily::SnapshotScheduling() { @@ -1784,29 +1784,30 @@ static void MergeDbSliceStats(const DbSlice::Stats& src, Metrics* dest) { } void ServerFamily::ResetStat() { - shard_set->pool()->Await([registry = service_.mutable_registry(), this](unsigned index, auto*) { - registry->ResetCallStats(index); - SinkReplyBuilder::ResetThreadLocalStats(); - auto& stats = tl_facade_stats->conn_stats; - stats.command_cnt = 0; - stats.pipelined_cmd_cnt = 0; + shard_set->pool()->AwaitBrief( + [registry = service_.mutable_registry(), this](unsigned index, auto*) { + registry->ResetCallStats(index); + SinkReplyBuilder::ResetThreadLocalStats(); + auto& stats = tl_facade_stats->conn_stats; + stats.command_cnt = 0; + stats.pipelined_cmd_cnt = 0; - EngineShard* shard = EngineShard::tlocal(); - shard->db_slice().ResetEvents(); - tl_facade_stats->conn_stats.conn_received_cnt = 0; - tl_facade_stats->conn_stats.pipelined_cmd_cnt = 0; - tl_facade_stats->conn_stats.command_cnt = 0; - tl_facade_stats->conn_stats.io_read_cnt = 0; - tl_facade_stats->conn_stats.io_read_bytes = 0; + EngineShard* shard = EngineShard::tlocal(); + shard->db_slice().ResetEvents(); + tl_facade_stats->conn_stats.conn_received_cnt = 0; + tl_facade_stats->conn_stats.pipelined_cmd_cnt = 0; + tl_facade_stats->conn_stats.command_cnt = 0; + tl_facade_stats->conn_stats.io_read_cnt = 0; + tl_facade_stats->conn_stats.io_read_bytes = 0; - tl_facade_stats->reply_stats.io_write_bytes = 0; - tl_facade_stats->reply_stats.io_write_cnt = 0; - tl_facade_stats->reply_stats.send_stats = {}; - tl_facade_stats->reply_stats.script_error_count = 0; - tl_facade_stats->reply_stats.err_count.clear(); + tl_facade_stats->reply_stats.io_write_bytes = 0; + tl_facade_stats->reply_stats.io_write_cnt = 0; + tl_facade_stats->reply_stats.send_stats = {}; + tl_facade_stats->reply_stats.script_error_count = 0; + tl_facade_stats->reply_stats.err_count.clear(); - service_.mutable_registry()->ResetCallStats(index); - }); + service_.mutable_registry()->ResetCallStats(index); + }); } Metrics ServerFamily::GetMetrics() const { diff --git a/src/server/server_family.h b/src/server/server_family.h index f17f3fbb4..912d71647 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -185,7 +185,7 @@ class ServerFamily { // Load snapshot from file (.rdb file or summary.dfs file) and return // future with error_code. - util::fb2::Future Load(const std::string& file_name); + std::optional> Load(const std::string& file_name); bool TEST_IsSaving() const; @@ -288,7 +288,7 @@ class ServerFamily { void StopAllClusterReplicas(); util::fb2::Fiber snapshot_schedule_fb_; - util::fb2::Future load_result_; + std::optional> load_result_; uint32_t stats_caching_task_ = 0; Service& service_; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 40d42a2f2..dd7cb4975 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -505,7 +505,7 @@ string StringValue::Get() && { auto prev = exchange(v_, monostate{}); if (holds_alternative(prev)) return std::move(std::get(prev)); - return std::get>(prev).get(); + return std::get>(prev).Get(); } bool StringValue::IsEmpty() const { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 3b6ad3b3d..682e4bcf7 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -304,8 +304,9 @@ unsigned BaseFamilyTest::NumLocked() { } void BaseFamilyTest::ClearMetrics() { - shard_set->pool()->Await( - [](auto*) { ServerState::tlocal()->stats = ServerState::Stats(shard_set->size()); }); + shard_set->pool()->AwaitBrief([](unsigned, auto*) { + ServerState::tlocal()->stats = ServerState::Stats(shard_set->size()); + }); } void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) { diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index 021ec6cd9..2cb4fb99d 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -35,7 +35,7 @@ void OpManager::Close() { util::fb2::Future OpManager::Read(EntryId id, DiskSegment segment) { // Fill pages for prepared read as it has no penalty and potentially covers more small segments - return PrepareRead(segment.FillPages()).ForId(id, segment).futures.emplace_back().get_future(); + return PrepareRead(segment.FillPages()).ForId(id, segment).futures.emplace_back(); } void OpManager::Delete(EntryId id) { @@ -98,7 +98,7 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) { auto key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length); for (auto& fut : ko.futures) - fut.set_value(std::string{key_value}); + fut.Resolve(std::string{key_value}); ReportFetched(Borrowed(ko.id), key_value, ko.segment); } diff --git a/src/server/tiering/op_manager.h b/src/server/tiering/op_manager.h index 26d39f653..c2f4cbc08 100644 --- a/src/server/tiering/op_manager.h +++ b/src/server/tiering/op_manager.h @@ -58,7 +58,7 @@ class OpManager { OwnedEntryId id; DiskSegment segment; - absl::InlinedVector, 1> futures; + absl::InlinedVector, 1> futures; }; // Describes an ongoing read operation for a fixed segment diff --git a/src/server/tiering/op_manager_test.cc b/src/server/tiering/op_manager_test.cc index 52674d22c..9e41ed0b0 100644 --- a/src/server/tiering/op_manager_test.cc +++ b/src/server/tiering/op_manager_test.cc @@ -58,7 +58,7 @@ TEST_F(OpManagerTest, SimpleStashesWithReads) { for (unsigned i = 0; i < 100; i++) { EXPECT_GE(stashed_[i].offset, i > 0); EXPECT_EQ(stashed_[i].length, 10 + (i > 9)); - EXPECT_EQ(Read(i, stashed_[i]).get(), absl::StrCat("VALUE", i, "real")); + EXPECT_EQ(Read(i, stashed_[i]).Get(), absl::StrCat("VALUE", i, "real")); EXPECT_EQ(fetched_.extract(i).mapped(), absl::StrCat("VALUE", i, "real")); } @@ -80,7 +80,7 @@ TEST_F(OpManagerTest, DeleteAfterReads) { Delete(stashed_[0u]); for (auto& fut : reads) - EXPECT_EQ(fut.get(), "DATA"); + EXPECT_EQ(fut.Get(), "DATA"); Close(); }); @@ -111,7 +111,7 @@ TEST_F(OpManagerTest, ReadSamePageDifferentOffsets) { futures.emplace_back(Read(absl::StrCat("k", i), number_segments[i])); for (size_t i = 0; i < 100; i++) - EXPECT_EQ(futures[i].get(), std::to_string(i)); + EXPECT_EQ(futures[i].Get(), std::to_string(i)); Close(); });