diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index 8c6919a94..a53822c89 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -118,6 +118,10 @@ error_code RdbSnapshot::SaveBody() { return saver_->SaveBody(&cntx_, &freq_map_); } +size_t RdbSnapshot::GetSaveBuffersSize() { + return saver_->GetTotalBuffersSize(); +} + error_code RdbSnapshot::Close() { if (is_linux_file_) { return static_cast(io_sink_.get())->Close(); @@ -156,7 +160,17 @@ GenericError SaveStagesController::Save() { SaveRdb(); is_saving_->store(true, memory_order_relaxed); + { + lock_guard lk{*save_mu_}; + *save_bytes_cb_ = [this]() { return GetSaveBuffersSize(); }; + } + RunStage(&SaveStagesController::SaveCb); + { + lock_guard lk{*save_mu_}; + *save_bytes_cb_ = nullptr; + } + is_saving_->store(false, memory_order_relaxed); RunStage(&SaveStagesController::CloseCb); @@ -169,6 +183,22 @@ GenericError SaveStagesController::Save() { return *shared_err_; } +size_t SaveStagesController::GetSaveBuffersSize() { + std::atomic total_bytes{0}; + if (use_dfs_format_) { + auto cb = [this, &total_bytes](ShardId sid) { + total_bytes.fetch_add(snapshots_[sid].first->GetSaveBuffersSize(), memory_order_relaxed); + }; + shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); }); + + } else { + // When rdb format save is running, there is only one rdb saver instance, it is running on the + // connection thread that runs the save command. + total_bytes.store(snapshots_.front().first->GetSaveBuffersSize(), memory_order_relaxed); + } + return total_bytes.load(memory_order_relaxed); +} + // In the new version (.dfs) we store a file for every shard and one more summary file. // Summary file is always last in snapshots array. void SaveStagesController::SaveDfs() { diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 80ebd9799..1d596155c 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -28,6 +28,7 @@ struct SaveStagesInputs { util::fb2::FiberQueueThreadPool* fq_threadpool_; std::shared_ptr* last_save_info_; util::fb2::Mutex* save_mu_; + std::function* save_bytes_cb_; std::shared_ptr snapshot_storage_; }; @@ -42,6 +43,7 @@ class RdbSnapshot { error_code SaveBody(); error_code Close(); + size_t GetSaveBuffersSize(); const RdbTypeFreqMap freq_map() const { return freq_map_; @@ -103,6 +105,8 @@ struct SaveStagesController : public SaveStagesInputs { RdbSaver::GlobalData GetGlobalData() const; + size_t GetSaveBuffersSize(); + private: absl::Time start_time_; std::filesystem::path full_path_; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 6566ac2a3..71b6210d0 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -948,7 +948,7 @@ class RdbSaver::Impl { CompressionMode compression_mode_; struct Stats { - size_t pulled_bytes = 0; + std::atomic pulled_bytes{0}; } stats_; }; @@ -1039,7 +1039,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { continue; DVLOG(2) << "Pulled " << record->id; - stats_.pulled_bytes += record->value.size(); + stats_.pulled_bytes.fetch_add(record->value.size(), memory_order_relaxed); io_error = sink_->Write(io::Buffer(record->value)); if (io_error) { @@ -1056,7 +1056,8 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { DCHECK(!record.has_value() || !channel_.TryPop(*record)); - VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes << " pushed bytes: " << pushed_bytes; + VLOG(1) << "Channel pulled bytes: " << stats_.pulled_bytes.load(memory_order_relaxed) + << " pushed bytes: " << pushed_bytes; return io_error; } @@ -1097,15 +1098,34 @@ void RdbSaver::Impl::Cancel() { snapshot->Join(); } +// This function is called from connection thread when info command is invoked. +// All accessed variableds must be thread safe, as they are fetched not from the rdb saver thread. size_t RdbSaver::Impl::GetTotalBuffersSize() const { - DCHECK_EQ(shard_snapshots_.size(), 1u) << "Only supported for dragonfly replication"; - auto& snapshot = shard_snapshots_.front(); + std::atomic pushed_bytes{0}; + std::atomic serializer_bytes{0}; + size_t pulled_bytes = stats_.pulled_bytes.load(memory_order_relaxed); - // Calculate number of enqueued bytes as difference between pushed and pulled - size_t enqueued_bytes = snapshot->pushed_bytes() - stats_.pulled_bytes; - size_t serializer_bytes = snapshot->GetTotalBufferCapacity(); + auto cb = [this, &pushed_bytes, &serializer_bytes](ShardId sid) { + auto& snapshot = shard_snapshots_[sid]; + pushed_bytes.fetch_add(snapshot->pushed_bytes(), memory_order_relaxed); + serializer_bytes.store(snapshot->GetTotalBufferCapacity(), memory_order_relaxed); + }; - return enqueued_bytes + serializer_bytes; + if (shard_snapshots_.size() == 1) { + cb(0); + } else { + shard_set->RunBriefInParallel([&](EngineShard* es) { cb(es->shard_id()); }); + // Note that pushed bytes and pulled bytes values are fetched at different times, as we need to + // calc the pushed bytes using RunBriefInParallel. + // pulled bytes might be higher untill we return here from RunBriefInParallel. + } + size_t total_bytes = pushed_bytes.load(memory_order_relaxed) + + serializer_bytes.load(memory_order_relaxed) - pulled_bytes; + VLOG(2) << "pushed_bytes:" << pushed_bytes.load(memory_order_relaxed) + << " serializer_bytes: " << serializer_bytes.load(memory_order_relaxed) + << " pulled_bytes: " << pulled_bytes << " total_bytes:" << total_bytes; + + return total_bytes; } void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 255a90982..033d70b78 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -108,7 +108,6 @@ class RdbSaver { return save_mode_; } - // Can only be called for dragonfly replication. // Get total size of all rdb serializer buffers and items currently placed in channel size_t GetTotalBuffersSize() const; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 2049e0d34..56b4fb0fe 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1008,9 +1008,9 @@ GenericError ServerFamily::DoSave() { } GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) { - SaveStagesController sc{detail::SaveStagesInputs{new_version, basename, trans, &service_, - &is_saving_, fq_threadpool_.get(), - &last_save_info_, &save_mu_, snapshot_storage_}}; + SaveStagesController sc{detail::SaveStagesInputs{ + new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_, + &save_mu_, &save_bytes_cb_, snapshot_storage_}}; return sc.Save(); } @@ -1491,6 +1491,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes_); append("replication_full_sync_buffer_bytes", repl_mem.full_sync_buf_bytes_); } + + if (IsSaving()) { + lock_guard lk{save_mu_}; + if (save_bytes_cb_) { + append("save_buffer_bytes", save_bytes_cb_()); + } + } } if (should_enter("STATS")) { diff --git a/src/server/server_family.h b/src/server/server_family.h index 0e1c56714..a79c0e9e4 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -151,7 +151,6 @@ class ServerFamily { // future with error_code. Future Load(const std::string& file_name); - // used within tests. bool IsSaving() const { return is_saving_.load(std::memory_order_relaxed); } @@ -257,6 +256,9 @@ class ServerFamily { std::shared_ptr last_save_info_; // protected by save_mu_; std::atomic_bool is_saving_{false}; + // If a save operation is currently in progress, calling this function will provide information + // about the memory consumption during the save operation. + std::function save_bytes_cb_ = nullptr; // Used to override save on shutdown behavior that is usually set // be --dbfilename.