This commit is contained in:
kostas 2025-05-06 18:15:30 +03:00
parent 7ff2a81aa3
commit 9e77a8516a
No known key found for this signature in database
GPG key ID: 1860AC7B1177CACB
4 changed files with 19 additions and 14 deletions

View file

@ -164,10 +164,9 @@ void RdbSnapshot::StartInShard(EngineShard* shard) {
started_shards_.fetch_add(1, memory_order_relaxed);
}
SaveStagesController::SaveStagesController(SaveStagesInputs&& inputs, bool is_bg_save)
SaveStagesController::SaveStagesController(SaveStagesInputs&& inputs)
: SaveStagesInputs{std::move(inputs)} {
start_time_ = time(NULL);
is_bg_save_ = is_bg_save;
}
SaveStagesController::~SaveStagesController() {

View file

@ -35,6 +35,8 @@ struct SaveStagesInputs {
Service* service_;
util::fb2::FiberQueueThreadPool* fq_threadpool_;
std::shared_ptr<SnapshotStorage> snapshot_storage_;
// true if the command that triggered this flow is bgsave. false otherwise.
bool is_bg_save_;
};
class RdbSnapshot {
@ -77,7 +79,7 @@ class RdbSnapshot {
};
struct SaveStagesController : public SaveStagesInputs {
SaveStagesController(SaveStagesInputs&& input, bool is_bg_save = false);
explicit SaveStagesController(SaveStagesInputs&& input);
// Objects of this class are used concurrently. Call this function
// in a mutually exlusive context to avoid data races.
// Also call this function before any call to `WaitAllSnapshots`

View file

@ -1707,8 +1707,8 @@ GenericError ServerFamily::DoSave(bool ignore_state) {
}
GenericError ServerFamily::DoSaveCheckAndStart(const SaveCmdOptions& save_cmd_opts,
Transaction* trans, bool ignore_state,
bool bg_save) {
Transaction* trans, DoSaveCheckAndStartOpts opts) {
auto [ignore_state, bg_save] = opts;
auto state = ServerState::tlocal()->gstate();
// In some cases we want to create a snapshot even if server is not active, f.e in takeover
@ -1727,11 +1727,9 @@ GenericError ServerFamily::DoSaveCheckAndStart(const SaveCmdOptions& save_cmd_op
? snapshot_storage_
: CreateCloudSnapshotStorage(save_cmd_opts.cloud_uri);
save_controller_ = make_unique<SaveStagesController>(
detail::SaveStagesInputs{save_cmd_opts.new_version, save_cmd_opts.cloud_uri,
save_cmd_opts.basename, trans, &service_, fq_threadpool_.get(),
snapshot_storage},
true);
save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
save_cmd_opts.new_version, save_cmd_opts.cloud_uri, save_cmd_opts.basename, trans,
&service_, fq_threadpool_.get(), snapshot_storage, opts.bg_save});
auto res = save_controller_->InitResourcesAndStart();
@ -1780,7 +1778,8 @@ GenericError ServerFamily::WaitUntilSaveFinished(Transaction* trans, bool ignore
GenericError ServerFamily::DoSave(const SaveCmdOptions& save_cmd_opts, Transaction* trans,
bool ignore_state) {
if (auto ec = DoSaveCheckAndStart(save_cmd_opts, trans, ignore_state); ec) {
DoSaveCheckAndStartOpts opts{.ignore_state = ignore_state};
if (auto ec = DoSaveCheckAndStart(save_cmd_opts, trans, opts); ec) {
return ec;
}
@ -2195,7 +2194,8 @@ void ServerFamily::BgSave(CmdArgList args, const CommandContext& cmd_cntx) {
return;
}
if (auto ec = DoSaveCheckAndStart(*maybe_res, cmd_cntx.tx, false, true); ec) {
DoSaveCheckAndStartOpts opts{.bg_save = true};
if (auto ec = DoSaveCheckAndStart(*maybe_res, cmd_cntx.tx, opts); ec) {
cmd_cntx.rb->SendError(ec.Format());
return;
}

View file

@ -343,9 +343,13 @@ class ServerFamily {
void BgSaveFb(boost::intrusive_ptr<Transaction> trans);
struct DoSaveCheckAndStartOpts {
bool ignore_state = false;
bool bg_save = false;
};
GenericError DoSaveCheckAndStart(const SaveCmdOptions& save_cmd_opts, Transaction* trans,
bool ignore_state = false, bool bg_save = false)
ABSL_LOCKS_EXCLUDED(save_mu_);
DoSaveCheckAndStartOpts opts) ABSL_LOCKS_EXCLUDED(save_mu_);
GenericError WaitUntilSaveFinished(Transaction* trans,
bool ignore_state = false) ABSL_NO_THREAD_SAFETY_ANALYSIS;