feat(info): add new persistence section fields (#2396)

* feat(info): add new persistence section fields
implement #2386
added fields:
1) last_failed_save
2) last_error
3) last_failed_save_duration_sec
4) saving
5) current_save_duration_sec
This commit is contained in:
Borys 2024-01-11 12:36:43 +02:00 committed by GitHub
parent 8d09478474
commit 7b61268533
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 68 additions and 40 deletions

View file

@ -694,7 +694,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) {
EXPECT_EQ(Run({"save", "df"}), "OK"); EXPECT_EQ(Run({"save", "df"}), "OK");
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
EXPECT_EQ(Run({"debug", "load", save_info->file_name}), "OK"); EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK");
EXPECT_EQ(CheckedInt({"dbsize"}), 50000); EXPECT_EQ(CheckedInt({"dbsize"}), 50000);
EXPECT_EQ(RunPrivileged({"dflycluster", "config", R"json( EXPECT_EQ(RunPrivileged({"dflycluster", "config", R"json(

View file

@ -350,7 +350,7 @@ void DebugCmd::Reload(CmdArgList args) {
} }
} }
string last_save_file = sf_.GetLastSaveInfo()->file_name; string last_save_file = sf_.GetLastSaveInfo().file_name;
Load(last_save_file); Load(last_save_file);
} }

View file

@ -178,8 +178,7 @@ GenericError SaveStagesController::Save() {
FinalizeFileMovement(); FinalizeFileMovement();
if (!shared_err_) UpdateSaveInfo();
UpdateSaveInfo();
return *shared_err_; return *shared_err_;
} }
@ -266,26 +265,32 @@ void SaveStagesController::SaveRdb() {
} }
void SaveStagesController::UpdateSaveInfo() { void SaveStagesController::UpdateSaveInfo() {
auto seconds = (absl::Now() - start_time_) / absl::Seconds(1);
if (shared_err_) {
lock_guard lk{*save_mu_};
last_save_info_->last_error = *shared_err_;
last_save_info_->last_error_time = absl::ToUnixSeconds(start_time_);
last_save_info_->failed_duration_sec = seconds;
return;
}
fs::path resulting_path = full_path_; fs::path resulting_path = full_path_;
if (use_dfs_format_) if (use_dfs_format_)
SetExtension("summary", ".dfs", &resulting_path); SetExtension("summary", ".dfs", &resulting_path);
else else
resulting_path.replace_extension(); // remove .tmp resulting_path.replace_extension(); // remove .tmp
double seconds = double(absl::ToInt64Milliseconds(absl::Now() - start_time_)) / 1000;
LOG(INFO) << "Saving " << resulting_path << " finished after " LOG(INFO) << "Saving " << resulting_path << " finished after "
<< strings::HumanReadableElapsedTime(seconds); << strings::HumanReadableElapsedTime(seconds);
auto save_info = make_shared<LastSaveInfo>();
for (const auto& k_v : rdb_name_map_) {
save_info->freq_map.emplace_back(k_v);
}
save_info->save_time = absl::ToUnixSeconds(start_time_);
save_info->file_name = resulting_path.generic_string();
save_info->duration_sec = uint32_t(seconds);
lock_guard lk{*save_mu_}; lock_guard lk{*save_mu_};
last_save_info_->swap(save_info); // swap - to deallocate the old version outstide of the lock. last_save_info_->freq_map.clear();
for (const auto& k_v : rdb_name_map_) {
last_save_info_->freq_map.emplace_back(k_v);
}
last_save_info_->save_time = absl::ToUnixSeconds(start_time_);
last_save_info_->file_name = resulting_path.generic_string();
last_save_info_->success_duration_sec = seconds;
} }
GenericError SaveStagesController::InitResources() { GenericError SaveStagesController::InitResources() {

View file

@ -26,7 +26,7 @@ struct SaveStagesInputs {
Service* service_; Service* service_;
std::atomic_bool* is_saving_; std::atomic_bool* is_saving_;
util::fb2::FiberQueueThreadPool* fq_threadpool_; util::fb2::FiberQueueThreadPool* fq_threadpool_;
std::shared_ptr<LastSaveInfo>* last_save_info_; LastSaveInfo* last_save_info_ ABSL_GUARDED_BY(save_mu_);
util::fb2::Mutex* save_mu_; util::fb2::Mutex* save_mu_;
std::function<size_t()>* save_bytes_cb_; std::function<size_t()>* save_bytes_cb_;
std::shared_ptr<SnapshotStorage> snapshot_storage_; std::shared_ptr<SnapshotStorage> snapshot_storage_;

View file

@ -636,7 +636,7 @@ TEST_F(GenericFamilyTest, Info) {
InitWithDbFilename(); // Needed for `save` InitWithDbFilename(); // Needed for `save`
auto get_rdb_changes_since_last_save = [](const string& str) -> size_t { auto get_rdb_changes_since_last_save = [](const string& str) -> size_t {
const string matcher = "rdb_changes_since_last_save:"; const string matcher = "rdb_changes_since_last_success_save:";
const auto pos = str.find(matcher) + matcher.size(); const auto pos = str.find(matcher) + matcher.size();
const auto sub = str.substr(pos, 1); const auto sub = str.substr(pos, 1);
return atoi(sub.c_str()); return atoi(sub.c_str());

View file

@ -166,7 +166,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info->file_name}); resp = Run({"debug", "load", save_info.file_name});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
ASSERT_EQ(50000, CheckedInt({"dbsize"})); ASSERT_EQ(50000, CheckedInt({"dbsize"}));
} }
@ -181,7 +181,7 @@ TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) {
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info->file_name}); resp = Run({"debug", "load", save_info.file_name});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
} }
@ -323,8 +323,8 @@ TEST_F(RdbTest, SaveFlush) {
Run({"flushdb"}); Run({"flushdb"});
save_fb.Join(); save_fb.Join();
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size()); ASSERT_EQ(1, save_info.freq_map.size());
auto& k_v = save_info->freq_map.front(); auto& k_v = save_info.freq_map.front();
EXPECT_EQ("string", k_v.first); EXPECT_EQ("string", k_v.first);
EXPECT_EQ(500000, k_v.second); EXPECT_EQ(500000, k_v.second);
} }
@ -360,8 +360,8 @@ TEST_F(RdbTest, SaveManyDbs) {
save_fb.Join(); save_fb.Join();
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
ASSERT_EQ(1, save_info->freq_map.size()); ASSERT_EQ(1, save_info.freq_map.size());
auto& k_v = save_info->freq_map.front(); auto& k_v = save_info.freq_map.front();
EXPECT_EQ("string", k_v.first); EXPECT_EQ("string", k_v.first);
EXPECT_EQ(60000, k_v.second); EXPECT_EQ(60000, k_v.second);

View file

@ -521,8 +521,7 @@ std::string_view GetOSString() {
ServerFamily::ServerFamily(Service* service) : service_(*service) { ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL); start_time_ = time(NULL);
last_save_info_ = make_shared<LastSaveInfo>(); last_save_info_.save_time = start_time_;
last_save_info_->save_time = start_time_;
script_mgr_.reset(new ScriptMgr()); script_mgr_.reset(new ScriptMgr());
journal_.reset(new journal::Journal()); journal_.reset(new journal::Journal());
@ -1227,10 +1226,18 @@ GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transa
StrCat(GlobalStateName(new_state), " - can not save database")}; StrCat(GlobalStateName(new_state), " - can not save database")};
} }
} }
{
std::lock_guard lck(save_mu_);
start_save_time_ = absl::Now();
}
SaveStagesController sc{detail::SaveStagesInputs{ SaveStagesController sc{detail::SaveStagesInputs{
new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_, new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_,
&save_mu_, &save_bytes_cb_, snapshot_storage_}}; &save_mu_, &save_bytes_cb_, snapshot_storage_}};
auto res = sc.Save(); auto res = sc.Save();
{
std::lock_guard lck(save_mu_);
start_save_time_.reset();
}
if (!ignore_state) if (!ignore_state)
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
return res; return res;
@ -1251,7 +1258,7 @@ error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
return error_code{}; return error_code{};
} }
shared_ptr<const LastSaveInfo> ServerFamily::GetLastSaveInfo() const { LastSaveInfo ServerFamily::GetLastSaveInfo() const {
lock_guard lk(save_mu_); lock_guard lk(save_mu_);
return last_save_info_; return last_save_info_;
} }
@ -1782,22 +1789,30 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
} }
if (should_enter("PERSISTENCE", true)) { if (should_enter("PERSISTENCE", true)) {
decltype(last_save_info_) save_info; auto save_info = GetLastSaveInfo();
{
lock_guard lk(save_mu_); // when last success save
save_info = last_save_info_; append("last_success_save", save_info.save_time);
} append("last_saved_file", save_info.file_name);
// when when last save append("last_success_save_duration_sec", save_info.success_duration_sec);
append("last_save", save_info->save_time);
append("last_save_duration_sec", save_info->duration_sec);
append("last_save_file", save_info->file_name);
size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING; size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING;
append("loading", is_loading); append("loading", is_loading);
for (const auto& k_v : save_info->freq_map) { auto curent_durration_sec =
start_save_time_ ? (absl::Now() - *start_save_time_) / absl::Seconds(1) : 0;
append("saving", curent_durration_sec != 0);
append("current_save_duration_sec", curent_durration_sec);
for (const auto& k_v : save_info.freq_map) {
append(StrCat("rdb_", k_v.first), k_v.second); append(StrCat("rdb_", k_v.first), k_v.second);
} }
append("rdb_changes_since_last_save", m.events.update); append("rdb_changes_since_last_success_save", m.events.update);
// when last failed save
append("last_failed_save", save_info.last_error_time);
append("last_error", save_info.last_error.Format());
append("last_failed_save_duration_sec", save_info.failed_duration_sec);
} }
if (should_enter("TRANSACTION", true)) { if (should_enter("TRANSACTION", true)) {
@ -2289,7 +2304,7 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
time_t save_time; time_t save_time;
{ {
lock_guard lk(save_mu_); lock_guard lk(save_mu_);
save_time = last_save_info_->save_time; save_time = last_save_info_.save_time;
} }
cntx->SendLong(save_time); cntx->SendLong(save_time);
} }

View file

@ -102,10 +102,15 @@ struct Metrics {
}; };
struct LastSaveInfo { struct LastSaveInfo {
// last success save info
time_t save_time = 0; // epoch time in seconds. time_t save_time = 0; // epoch time in seconds.
uint32_t duration_sec = 0; uint32_t success_duration_sec = 0;
std::string file_name; // std::string file_name; //
std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping. std::vector<std::pair<std::string_view, size_t>> freq_map; // RDB_TYPE_xxx -> count mapping.
// last error save info
GenericError last_error;
time_t last_error_time = 0; // epoch time in seconds.
time_t failed_duration_sec = 0; // epoch time in seconds.
}; };
struct SnapshotSpec { struct SnapshotSpec {
@ -158,7 +163,7 @@ class ServerFamily {
// if kDbAll is passed, burns all the databases to the ground. // if kDbAll is passed, burns all the databases to the ground.
std::error_code Drakarys(Transaction* transaction, DbIndex db_ind); std::error_code Drakarys(Transaction* transaction, DbIndex db_ind);
std::shared_ptr<const LastSaveInfo> GetLastSaveInfo() const; LastSaveInfo GetLastSaveInfo() const;
// Load snapshot from file (.rdb file or summary.dfs file) and return // Load snapshot from file (.rdb file or summary.dfs file) and return
// future with error_code. // future with error_code.
@ -272,8 +277,11 @@ class ServerFamily {
time_t start_time_ = 0; // in seconds, epoch time. time_t start_time_ = 0; // in seconds, epoch time.
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_; LastSaveInfo last_save_info_ ABSL_GUARDED_BY(save_mu_);
std::atomic_bool is_saving_{false}; std::atomic_bool is_saving_{false};
// this field duplicate SaveStagesController::start_save_time_
// TODO make SaveStagesController as member of this class
std::optional<absl::Time> start_save_time_;
// If a save operation is currently in progress, calling this function will provide information // If a save operation is currently in progress, calling this function will provide information
// about the memory consumption during the save operation. // about the memory consumption during the save operation.
std::function<size_t()> save_bytes_cb_ = nullptr; std::function<size_t()> save_bytes_cb_ = nullptr;