feat(server): add memory stats for snapshot save (#2072)

* feat(server): add memory stats for snapshot save

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-10-25 17:36:10 +03:00 committed by GitHub
parent 5eed2bfe11
commit 254fb2dd5a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 76 additions and 14 deletions

View file

@ -118,6 +118,10 @@ error_code RdbSnapshot::SaveBody() {
return saver_->SaveBody(&cntx_, &freq_map_);
}
size_t RdbSnapshot::GetSaveBuffersSize() {
return saver_->GetTotalBuffersSize();
}
error_code RdbSnapshot::Close() {
if (is_linux_file_) {
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
@ -156,7 +160,17 @@ GenericError SaveStagesController::Save() {
SaveRdb();
is_saving_->store(true, memory_order_relaxed);
{
lock_guard lk{*save_mu_};
*save_bytes_cb_ = [this]() { return GetSaveBuffersSize(); };
}
RunStage(&SaveStagesController::SaveCb);
{
lock_guard lk{*save_mu_};
*save_bytes_cb_ = nullptr;
}
is_saving_->store(false, memory_order_relaxed);
RunStage(&SaveStagesController::CloseCb);
@ -169,6 +183,22 @@ GenericError SaveStagesController::Save() {
return *shared_err_;
}
size_t SaveStagesController::GetSaveBuffersSize() {
std::atomic<size_t> total_bytes{0};
if (use_dfs_format_) {
auto cb = [this, &total_bytes](ShardId sid) {
total_bytes.fetch_add(snapshots_[sid].first->GetSaveBuffersSize(), memory_order_relaxed);
};
shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); });
} else {
// When rdb format save is running, there is only one rdb saver instance, it is running on the
// connection thread that runs the save command.
total_bytes.store(snapshots_.front().first->GetSaveBuffersSize(), memory_order_relaxed);
}
return total_bytes.load(memory_order_relaxed);
}
// In the new version (.dfs) we store a file for every shard and one more summary file.
// Summary file is always last in snapshots array.
void SaveStagesController::SaveDfs() {

View file

@ -28,6 +28,7 @@ struct SaveStagesInputs {
util::fb2::FiberQueueThreadPool* fq_threadpool_;
std::shared_ptr<LastSaveInfo>* last_save_info_;
util::fb2::Mutex* save_mu_;
std::function<size_t()>* save_bytes_cb_;
std::shared_ptr<SnapshotStorage> snapshot_storage_;
};
@ -42,6 +43,7 @@ class RdbSnapshot {
error_code SaveBody();
error_code Close();
size_t GetSaveBuffersSize();
const RdbTypeFreqMap freq_map() const {
return freq_map_;
@ -103,6 +105,8 @@ struct SaveStagesController : public SaveStagesInputs {
RdbSaver::GlobalData GetGlobalData() const;
size_t GetSaveBuffersSize();
private:
absl::Time start_time_;
std::filesystem::path full_path_;

View file

@ -948,7 +948,7 @@ class RdbSaver::Impl {
CompressionMode compression_mode_;
struct Stats {
size_t pulled_bytes = 0;
std::atomic<size_t> pulled_bytes{0};
} stats_;
};
@ -1039,7 +1039,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
continue;
DVLOG(2) << "Pulled " << record->id;
stats_.pulled_bytes += record->value.size();
stats_.pulled_bytes.fetch_add(record->value.size(), memory_order_relaxed);
io_error = sink_->Write(io::Buffer(record->value));
if (io_error) {
@ -1056,7 +1056,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
DCHECK(!record.has_value() || !channel_.TryPop(*record));
VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes << " pushed bytes: " << pushed_bytes;
VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes.load(memory_order_relaxed)
<< " pushed bytes: " << pushed_bytes;
return io_error;
}
@ -1097,15 +1098,34 @@ void RdbSaver::Impl::Cancel() {
snapshot->Join();
}
// This function is called from connection thread when info command is invoked.
// All accessed variableds must be thread safe, as they are fetched not from the rdb saver thread.
size_t RdbSaver::Impl::GetTotalBuffersSize() const {
DCHECK_EQ(shard_snapshots_.size(), 1u) << "Only supported for dragonfly replication";
auto& snapshot = shard_snapshots_.front();
std::atomic<size_t> pushed_bytes{0};
std::atomic<size_t> serializer_bytes{0};
size_t pulled_bytes = stats_.pulled_bytes.load(memory_order_relaxed);
// Calculate number of enqueued bytes as difference between pushed and pulled
size_t enqueued_bytes = snapshot->pushed_bytes() - stats_.pulled_bytes;
size_t serializer_bytes = snapshot->GetTotalBufferCapacity();
auto cb = [this, &pushed_bytes, &serializer_bytes](ShardId sid) {
auto& snapshot = shard_snapshots_[sid];
pushed_bytes.fetch_add(snapshot->pushed_bytes(), memory_order_relaxed);
serializer_bytes.store(snapshot->GetTotalBufferCapacity(), memory_order_relaxed);
};
return enqueued_bytes + serializer_bytes;
if (shard_snapshots_.size() == 1) {
cb(0);
} else {
shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); });
// Note that pushed bytes and pulled bytes values are fetched at different times, as we need to
// calc the pushed bytes using RunBriefInParallel.
// pulled bytes might be higher untill we return here from RunBriefInParallel.
}
size_t total_bytes = pushed_bytes.load(memory_order_relaxed) +
serializer_bytes.load(memory_order_relaxed) - pulled_bytes;
VLOG(2) << "pushed_bytes:" << pushed_bytes.load(memory_order_relaxed)
<< " serializer_bytes: " << serializer_bytes.load(memory_order_relaxed)
<< " pulled_bytes: " << pulled_bytes << " total_bytes:" << total_bytes;
return total_bytes;
}
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {

View file

@ -108,7 +108,6 @@ class RdbSaver {
return save_mode_;
}
// Can only be called for dragonfly replication.
// Get total size of all rdb serializer buffers and items currently placed in channel
size_t GetTotalBuffersSize() const;

View file

@ -1008,9 +1008,9 @@ GenericError ServerFamily::DoSave() {
}
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) {
SaveStagesController sc{detail::SaveStagesInputs{new_version, basename, trans, &service_,
&is_saving_, fq_threadpool_.get(),
&last_save_info_, &save_mu_, snapshot_storage_}};
SaveStagesController sc{detail::SaveStagesInputs{
new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_,
&save_mu_, &save_bytes_cb_, snapshot_storage_}};
return sc.Save();
}
@ -1491,6 +1491,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes_);
append("replication_full_sync_buffer_bytes", repl_mem.full_sync_buf_bytes_);
}
if (IsSaving()) {
lock_guard lk{save_mu_};
if (save_bytes_cb_) {
append("save_buffer_bytes", save_bytes_cb_());
}
}
}
if (should_enter("STATS")) {

View file

@ -151,7 +151,6 @@ class ServerFamily {
// future with error_code.
Future<GenericError> Load(const std::string& file_name);
// used within tests.
bool IsSaving() const {
return is_saving_.load(std::memory_order_relaxed);
}
@ -257,6 +256,9 @@ class ServerFamily {
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
std::atomic_bool is_saving_{false};
// If a save operation is currently in progress, calling this function will provide information
// about the memory consumption during the save operation.
std::function<size_t()> save_bytes_cb_ = nullptr;
// Used to override save on shutdown behavior that is usually set
// be --dbfilename.