From 7b612685335fb95f0797544a069d8c6330a38901 Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 11 Jan 2024 12:36:43 +0200 Subject: [PATCH] feat(info): add new persistence section fields (#2396) * feat(info): add new persistence section fields implement #2386 added fields: 1) last_failed_save 2) last_error 3) last_failed_save_duration_sec 4) saving 5) current_save_duration_sec --- src/server/cluster/cluster_family_test.cc | 2 +- src/server/debugcmd.cc | 2 +- src/server/detail/save_stages_controller.cc | 29 +++++++------ src/server/detail/save_stages_controller.h | 2 +- src/server/generic_family_test.cc | 2 +- src/server/rdb_test.cc | 12 +++--- src/server/server_family.cc | 45 ++++++++++++++------- src/server/server_family.h | 14 +++++-- 8 files changed, 68 insertions(+), 40 deletions(-) diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index ad5c4c3b3..25c1160ec 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -694,7 +694,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) { EXPECT_EQ(Run({"save", "df"}), "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - EXPECT_EQ(Run({"debug", "load", save_info->file_name}), "OK"); + EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK"); EXPECT_EQ(CheckedInt({"dbsize"}), 50000); EXPECT_EQ(RunPrivileged({"dflycluster", "config", R"json( diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index f3c1fb0ca..f90be5e79 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -350,7 +350,7 @@ void DebugCmd::Reload(CmdArgList args) { } } - string last_save_file = sf_.GetLastSaveInfo()->file_name; + string last_save_file = sf_.GetLastSaveInfo().file_name; Load(last_save_file); } diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 3a021e2c4..18ce620fe 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -178,8 +178,7 @@ GenericError SaveStagesController::Save() { FinalizeFileMovement(); - if (!shared_err_) - UpdateSaveInfo(); + UpdateSaveInfo(); return *shared_err_; } @@ -266,26 +265,32 @@ void SaveStagesController::SaveRdb() { } void SaveStagesController::UpdateSaveInfo() { + auto seconds = (absl::Now() - start_time_) / absl::Seconds(1); + if (shared_err_) { + lock_guard lk{*save_mu_}; + last_save_info_->last_error = *shared_err_; + last_save_info_->last_error_time = absl::ToUnixSeconds(start_time_); + last_save_info_->failed_duration_sec = seconds; + return; + } + fs::path resulting_path = full_path_; if (use_dfs_format_) SetExtension("summary", ".dfs", &resulting_path); else resulting_path.replace_extension(); // remove .tmp - double seconds = double(absl::ToInt64Milliseconds(absl::Now() - start_time_)) / 1000; LOG(INFO) << "Saving " << resulting_path << " finished after " << strings::HumanReadableElapsedTime(seconds); - auto save_info = make_shared(); - for (const auto& k_v : rdb_name_map_) { - save_info->freq_map.emplace_back(k_v); - } - save_info->save_time = absl::ToUnixSeconds(start_time_); - save_info->file_name = resulting_path.generic_string(); - save_info->duration_sec = uint32_t(seconds); - lock_guard lk{*save_mu_}; - last_save_info_->swap(save_info); // swap - to deallocate the old version outstide of the lock. + last_save_info_->freq_map.clear(); + for (const auto& k_v : rdb_name_map_) { + last_save_info_->freq_map.emplace_back(k_v); + } + last_save_info_->save_time = absl::ToUnixSeconds(start_time_); + last_save_info_->file_name = resulting_path.generic_string(); + last_save_info_->success_duration_sec = seconds; } GenericError SaveStagesController::InitResources() { diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index af219c61a..7855b39d0 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -26,7 +26,7 @@ struct SaveStagesInputs { Service* service_; std::atomic_bool* is_saving_; util::fb2::FiberQueueThreadPool* fq_threadpool_; - std::shared_ptr* last_save_info_; + LastSaveInfo* last_save_info_ ABSL_GUARDED_BY(save_mu_); util::fb2::Mutex* save_mu_; std::function* save_bytes_cb_; std::shared_ptr snapshot_storage_; diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index d271faca2..96f1e9638 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -636,7 +636,7 @@ TEST_F(GenericFamilyTest, Info) { InitWithDbFilename(); // Needed for `save` auto get_rdb_changes_since_last_save = [](const string& str) -> size_t { - const string matcher = "rdb_changes_since_last_save:"; + const string matcher = "rdb_changes_since_last_success_save:"; const auto pos = str.find(matcher) + matcher.size(); const auto sub = str.substr(pos, 1); return atoi(sub.c_str()); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 6755a7649..d41a77de5 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -166,7 +166,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - resp = Run({"debug", "load", save_info->file_name}); + resp = Run({"debug", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); ASSERT_EQ(50000, CheckedInt({"dbsize"})); } @@ -181,7 +181,7 @@ TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) { ASSERT_EQ(resp, "OK"); auto save_info = service_->server_family().GetLastSaveInfo(); - resp = Run({"debug", "load", save_info->file_name}); + resp = Run({"debug", "load", save_info.file_name}); ASSERT_EQ(resp, "OK"); } @@ -323,8 +323,8 @@ TEST_F(RdbTest, SaveFlush) { Run({"flushdb"}); save_fb.Join(); auto save_info = service_->server_family().GetLastSaveInfo(); - ASSERT_EQ(1, save_info->freq_map.size()); - auto& k_v = save_info->freq_map.front(); + ASSERT_EQ(1, save_info.freq_map.size()); + auto& k_v = save_info.freq_map.front(); EXPECT_EQ("string", k_v.first); EXPECT_EQ(500000, k_v.second); } @@ -360,8 +360,8 @@ TEST_F(RdbTest, SaveManyDbs) { save_fb.Join(); auto save_info = service_->server_family().GetLastSaveInfo(); - ASSERT_EQ(1, save_info->freq_map.size()); - auto& k_v = save_info->freq_map.front(); + ASSERT_EQ(1, save_info.freq_map.size()); + auto& k_v = save_info.freq_map.front(); EXPECT_EQ("string", k_v.first); EXPECT_EQ(60000, k_v.second); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 9b5e089e3..196957ce4 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -521,8 +521,7 @@ std::string_view GetOSString() { ServerFamily::ServerFamily(Service* service) : service_(*service) { start_time_ = time(NULL); - last_save_info_ = make_shared(); - last_save_info_->save_time = start_time_; + last_save_info_.save_time = start_time_; script_mgr_.reset(new ScriptMgr()); journal_.reset(new journal::Journal()); @@ -1227,10 +1226,18 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa StrCat(GlobalStateName(new_state), " - can not save database")}; } } + { + std::lock_guard lck(save_mu_); + start_save_time_ = absl::Now(); + } SaveStagesController sc{detail::SaveStagesInputs{ new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_, &save_mu_, &save_bytes_cb_, snapshot_storage_}}; auto res = sc.Save(); + { + std::lock_guard lck(save_mu_); + start_save_time_.reset(); + } if (!ignore_state) service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); return res; @@ -1251,7 +1258,7 @@ error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) { return error_code{}; } -shared_ptr ServerFamily::GetLastSaveInfo() const { +LastSaveInfo ServerFamily::GetLastSaveInfo() const { lock_guard lk(save_mu_); return last_save_info_; } @@ -1782,22 +1789,30 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("PERSISTENCE", true)) { - decltype(last_save_info_) save_info; - { - lock_guard lk(save_mu_); - save_info = last_save_info_; - } - // when when last save - append("last_save", save_info->save_time); - append("last_save_duration_sec", save_info->duration_sec); - append("last_save_file", save_info->file_name); + auto save_info = GetLastSaveInfo(); + + // when last success save + append("last_success_save", save_info.save_time); + append("last_saved_file", save_info.file_name); + append("last_success_save_duration_sec", save_info.success_duration_sec); + size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING; append("loading", is_loading); - for (const auto& k_v : save_info->freq_map) { + auto curent_durration_sec = + start_save_time_ ? (absl::Now() - *start_save_time_) / absl::Seconds(1) : 0; + append("saving", curent_durration_sec != 0); + append("current_save_duration_sec", curent_durration_sec); + + for (const auto& k_v : save_info.freq_map) { append(StrCat("rdb_", k_v.first), k_v.second); } - append("rdb_changes_since_last_save", m.events.update); + append("rdb_changes_since_last_success_save", m.events.update); + + // when last failed save + append("last_failed_save", save_info.last_error_time); + append("last_error", save_info.last_error.Format()); + append("last_failed_save_duration_sec", save_info.failed_duration_sec); } if (should_enter("TRANSACTION", true)) { @@ -2289,7 +2304,7 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { time_t save_time; { lock_guard lk(save_mu_); - save_time = last_save_info_->save_time; + save_time = last_save_info_.save_time; } cntx->SendLong(save_time); } diff --git a/src/server/server_family.h b/src/server/server_family.h index b5708d85f..01dcc43fe 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -102,10 +102,15 @@ struct Metrics { }; struct LastSaveInfo { + // last success save info time_t save_time = 0; // epoch time in seconds. - uint32_t duration_sec = 0; + uint32_t success_duration_sec = 0; std::string file_name; // std::vector> freq_map; // RDB_TYPE_xxx -> count mapping. + // last error save info + GenericError last_error; + time_t last_error_time = 0; // epoch time in seconds. + time_t failed_duration_sec = 0; // epoch time in seconds. }; struct SnapshotSpec { @@ -158,7 +163,7 @@ class ServerFamily { // if kDbAll is passed, burns all the databases to the ground. std::error_code Drakarys(Transaction* transaction, DbIndex db_ind); - std::shared_ptr GetLastSaveInfo() const; + LastSaveInfo GetLastSaveInfo() const; // Load snapshot from file (.rdb file or summary.dfs file) and return // future with error_code. @@ -272,8 +277,11 @@ class ServerFamily { time_t start_time_ = 0; // in seconds, epoch time. - std::shared_ptr last_save_info_; // protected by save_mu_; + LastSaveInfo last_save_info_ ABSL_GUARDED_BY(save_mu_); std::atomic_bool is_saving_{false}; + // this field duplicate SaveStagesController::start_save_time_ + // TODO make SaveStagesController as member of this class + std::optional start_save_time_; // If a save operation is currently in progress, calling this function will provide information // about the memory consumption during the save operation. std::function save_bytes_cb_ = nullptr;