From 977fc18e252be3b01ba11fc3ff9e1b0da5f27828 Mon Sep 17 00:00:00 2001 From: Dor Avrahami Date: Mon, 31 Jul 2023 20:26:01 +0300 Subject: [PATCH] feat: support for cron expressions based snapshot. (#1599) Introducing a new flag `--snapshot_cron`, which enables users to use cron expressions to time snapshot saves. Signed-off-by: Dor Avrahami --- src/CMakeLists.txt | 11 +++++ src/server/CMakeLists.txt | 2 +- src/server/server_family.cc | 83 ++++++++++++++++++++------------ src/server/server_family.h | 2 +- tests/dragonfly/snapshot_test.py | 21 ++++++++ 5 files changed, 85 insertions(+), 34 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 671881bf1..94723b9ca 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -76,11 +76,22 @@ add_third_party( INSTALL_COMMAND ${DFLY_TOOLS_MAKE} install BUILD_SHARED=no PREFIX=${THIRD_PARTY_LIB_DIR}/lz4 ) +add_third_party( + croncpp + URL https://github.com/mariusbancila/croncpp/archive/refs/tags/v2023.03.30.tar.gz + LIB "none" +) + add_library(TRDP::jsoncons INTERFACE IMPORTED) add_dependencies(TRDP::jsoncons jsoncons_project) set_target_properties(TRDP::jsoncons PROPERTIES INTERFACE_INCLUDE_DIRECTORIES "${JSONCONS_INCLUDE_DIR}") +add_library(TRDP::croncpp INTERFACE IMPORTED) +add_dependencies(TRDP::croncpp croncpp_project) +set_target_properties(TRDP::croncpp PROPERTIES + INTERFACE_INCLUDE_DIRECTORIES "${CRONCPP_INCLUDE_DIR}") + Message(STATUS "THIRD_PARTY_LIB_DIR ${THIRD_PARTY_LIB_DIR}") diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 93aa065c5..ab0517f4c 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -29,7 +29,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib aws_lib strings_lib html_lib - http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4) + http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4 TRDP::croncpp) if (DF_USE_SSL) set(TLS_LIB tls_lib) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f7a2a13f6..acb68855f 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -23,6 +23,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" +#include "croncpp.h" // cron::cronexpr #include "facade/dragonfly_connection.h" #include "io/file_util.h" #include "io/proc_reader.h" @@ -60,6 +61,7 @@ ABSL_FLAG(string, requirepass, "", "If empty can also be set with DFLY_PASSWORD environment variable."); ABSL_FLAG(string, save_schedule, "", "glob spec for the UTC time to save a snapshot which matches HH:MM 24h time"); +ABSL_FLAG(string, snapshot_cron, "", "cron expression for the time to save a snapshot"); ABSL_FLAG(bool, df_snapshot_format, true, "if true, save in dragonfly-specific snapshotting format"); ABSL_FLAG(int, epoll_file_threads, 0, @@ -500,6 +502,39 @@ bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) { DoesTimeNibbleMatchSpecifier(spec.minute_spec, min); } +std::optional InferSnapshotCronExpr() { + string save_time = GetFlag(FLAGS_save_schedule); + string snapshot_cron_exp = GetFlag(FLAGS_snapshot_cron); + + if (!snapshot_cron_exp.empty() && !save_time.empty()) { + LOG(ERROR) << "save_time and cron_exp flags should not be set simultaneously"; + quick_exit(1); + } + + string raw_cron_expr; + if (!save_time.empty()) { + std::optional spec = ParseSaveSchedule(save_time); + + if (spec) { + // Setting snapshot to HH:mm everyday, as specified by `save_schedule` flag + raw_cron_expr = "0 " + spec.value().minute_spec + " " + spec.value().hour_spec + " * * *"; + } else { + LOG(WARNING) << "Invalid snapshot time specifier " << save_time; + } + } else if (!snapshot_cron_exp.empty()) { + raw_cron_expr = snapshot_cron_exp; + } + + if (!raw_cron_expr.empty()) { + try { + return std::optional(cron::make_cron(raw_cron_expr)); + } catch (const cron::bad_cronexpr& ex) { + LOG(WARNING) << "Invalid cron expression: " << ex.what(); + } + } + return std::nullopt; +} + ServerFamily::ServerFamily(Service* service) : service_(*service) { start_time_ = time(NULL); last_save_info_ = make_shared(); @@ -570,16 +605,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector spec = ParseSaveSchedule(save_time); - if (spec) { - snapshot_schedule_fb_ = service_.proactor_pool().GetNextProactor()->LaunchFiber( - [save_spec = std::move(spec.value()), this] { SnapshotScheduling(save_spec); }); - } else { - LOG(WARNING) << "Invalid snapshot time specifier " << save_time; - } - } + snapshot_schedule_fb_ = + service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); }); } void ServerFamily::Shutdown() { @@ -720,30 +747,22 @@ Future ServerFamily::Load(const std::string& load_path) { return ec_future; } -void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) { - const auto loop_sleep_time = std::chrono::seconds(20); +void ServerFamily::SnapshotScheduling() { + const std::optional cron_expr = InferSnapshotCronExpr(); + if (!cron_expr) { + return; + } + + const auto loading_check_interval = std::chrono::seconds(10); + while (service_.GetGlobalState() == GlobalState::LOADING) { + schedule_done_.WaitFor(loading_check_interval); + } + while (true) { - if (schedule_done_.WaitFor(loop_sleep_time)) { - break; - } + const std::chrono::time_point now = std::chrono::system_clock::now(); + const std::chrono::time_point next = cron::cron_next(cron_expr.value(), now); - time_t now = std::time(NULL); - - if (!DoesTimeMatchSpecifier(spec, now)) { - continue; - } - - // if it matches check the last save time, if it is the same minute don't save another - // snapshot - time_t last_save; - { - lock_guard lk(save_mu_); - last_save = last_save_info_->save_time; - } - - if ((last_save / 60) == (now / 60)) { - continue; - } + schedule_done_.WaitFor(next - now); GenericError ec = DoSave(); if (ec) { diff --git a/src/server/server_family.h b/src/server/server_family.h index 5334e920c..edc3f6cca 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -195,7 +195,7 @@ class ServerFamily { // Returns the number of loaded keys if successfull. io::Result LoadRdb(const std::string& rdb_file); - void SnapshotScheduling(const SnapshotSpec& time); + void SnapshotScheduling(); Fiber snapshot_schedule_fb_; Future load_result_; diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 8135f34c1..d48294478 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -160,6 +160,27 @@ class TestPeriodicSnapshot(SnapshotTestBase): assert super().get_main_file("test-periodic-summary.dfs") +# save every 2 seconds +@dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "snapshot_cron": "*/2 * * * * *"}) +class TestCronPeriodicSnapshot(SnapshotTestBase): + """Test periodic snapshotting""" + + @pytest.fixture(autouse=True) + def setup(self, tmp_dir: Path): + super().setup(tmp_dir) + + @pytest.mark.asyncio + async def test_snapshot(self, df_seeder_factory, df_server): + seeder = df_seeder_factory.create( + port=df_server.port, keys=10, multi_transaction_probability=0 + ) + await seeder.run(target_deviation=0.5) + + time.sleep(60) + + assert super().get_main_file("test-periodic-summary.dfs") + + @dfly_args({**BASIC_ARGS}) class TestPathEscapes(SnapshotTestBase): """Test that we don't allow path escapes. We just check that df_server.start()