feat: support for cron expressions based snapshot. (#1599)

Introducing a new flag `--snapshot_cron`, which enables users
to use cron expressions to time snapshot saves.

Signed-off-by: Dor Avrahami <da19965@gmail.com>
This commit is contained in:
Dor Avrahami 2023-07-31 20:26:01 +03:00 committed by GitHub
parent 7673e027b6
commit 977fc18e25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 85 additions and 34 deletions

View file

@ -76,11 +76,22 @@ add_third_party(
INSTALL_COMMAND ${DFLY_TOOLS_MAKE} install BUILD_SHARED=no PREFIX=${THIRD_PARTY_LIB_DIR}/lz4
)
add_third_party(
croncpp
URL https://github.com/mariusbancila/croncpp/archive/refs/tags/v2023.03.30.tar.gz
LIB "none"
)
add_library(TRDP::jsoncons INTERFACE IMPORTED)
add_dependencies(TRDP::jsoncons jsoncons_project)
set_target_properties(TRDP::jsoncons PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${JSONCONS_INCLUDE_DIR}")
add_library(TRDP::croncpp INTERFACE IMPORTED)
add_dependencies(TRDP::croncpp croncpp_project)
set_target_properties(TRDP::croncpp PROPERTIES
INTERFACE_INCLUDE_DIRECTORIES "${CRONCPP_INCLUDE_DIR}")
Message(STATUS "THIRD_PARTY_LIB_DIR ${THIRD_PARTY_LIB_DIR}")

View file

@ -29,7 +29,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib aws_lib strings_lib html_lib
http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4)
http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4 TRDP::croncpp)
if (DF_USE_SSL)
set(TLS_LIB tls_lib)

View file

@ -23,6 +23,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "croncpp.h" // cron::cronexpr
#include "facade/dragonfly_connection.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
@ -60,6 +61,7 @@ ABSL_FLAG(string, requirepass, "",
"If empty can also be set with DFLY_PASSWORD environment variable.");
ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(string, snapshot_cron, "", "cron expression for the time to save a snapshot");
ABSL_FLAG(bool, df_snapshot_format, true,
"if true, save in dragonfly-specific snapshotting format");
ABSL_FLAG(int, epoll_file_threads, 0,
@ -500,6 +502,39 @@ bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) {
DoesTimeNibbleMatchSpecifier(spec.minute_spec, min);
}
std::optional<cron::cronexpr> InferSnapshotCronExpr() {
string save_time = GetFlag(FLAGS_save_schedule);
string snapshot_cron_exp = GetFlag(FLAGS_snapshot_cron);
if (!snapshot_cron_exp.empty() && !save_time.empty()) {
LOG(ERROR) << "save_time and cron_exp flags should not be set simultaneously";
quick_exit(1);
}
string raw_cron_expr;
if (!save_time.empty()) {
std::optional<SnapshotSpec> spec = ParseSaveSchedule(save_time);
if (spec) {
// Setting snapshot to HH:mm everyday, as specified by `save_schedule` flag
raw_cron_expr = "0 " + spec.value().minute_spec + " " + spec.value().hour_spec + " * * *";
} else {
LOG(WARNING) << "Invalid snapshot time specifier " << save_time;
}
} else if (!snapshot_cron_exp.empty()) {
raw_cron_expr = snapshot_cron_exp;
}
if (!raw_cron_expr.empty()) {
try {
return std::optional<cron::cronexpr>(cron::make_cron(raw_cron_expr));
} catch (const cron::bad_cronexpr& ex) {
LOG(WARNING) << "Invalid cron expression: " << ex.what();
}
}
return std::nullopt;
}
ServerFamily::ServerFamily(Service* service) : service_(*service) {
start_time_ = time(NULL);
last_save_info_ = make_shared<LastSaveInfo>();
@ -570,16 +605,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
load_result_ = Load(load_path);
}
string save_time = GetFlag(FLAGS_save_schedule);
if (!save_time.empty()) {
std::optional<SnapshotSpec> spec = ParseSaveSchedule(save_time);
if (spec) {
snapshot_schedule_fb_ = service_.proactor_pool().GetNextProactor()->LaunchFiber(
[save_spec = std::move(spec.value()), this] { SnapshotScheduling(save_spec); });
} else {
LOG(WARNING) << "Invalid snapshot time specifier " << save_time;
}
}
snapshot_schedule_fb_ =
service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); });
}
void ServerFamily::Shutdown() {
@ -720,30 +747,22 @@ Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
return ec_future;
}
void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) {
const auto loop_sleep_time = std::chrono::seconds(20);
void ServerFamily::SnapshotScheduling() {
const std::optional<cron::cronexpr> cron_expr = InferSnapshotCronExpr();
if (!cron_expr) {
return;
}
const auto loading_check_interval = std::chrono::seconds(10);
while (service_.GetGlobalState() == GlobalState::LOADING) {
schedule_done_.WaitFor(loading_check_interval);
}
while (true) {
if (schedule_done_.WaitFor(loop_sleep_time)) {
break;
}
const std::chrono::time_point now = std::chrono::system_clock::now();
const std::chrono::time_point next = cron::cron_next(cron_expr.value(), now);
time_t now = std::time(NULL);
if (!DoesTimeMatchSpecifier(spec, now)) {
continue;
}
// if it matches check the last save time, if it is the same minute don't save another
// snapshot
time_t last_save;
{
lock_guard lk(save_mu_);
last_save = last_save_info_->save_time;
}
if ((last_save / 60) == (now / 60)) {
continue;
}
schedule_done_.WaitFor(next - now);
GenericError ec = DoSave();
if (ec) {

View file

@ -195,7 +195,7 @@ class ServerFamily {
// Returns the number of loaded keys if successfull.
io::Result<size_t> LoadRdb(const std::string& rdb_file);
void SnapshotScheduling(const SnapshotSpec& time);
void SnapshotScheduling();
Fiber snapshot_schedule_fb_;
Future<std::error_code> load_result_;

View file

@ -160,6 +160,27 @@ class TestPeriodicSnapshot(SnapshotTestBase):
assert super().get_main_file("test-periodic-summary.dfs")
# save every 2 seconds
@dfly_args({**BASIC_ARGS, "dbfilename": "test-periodic", "snapshot_cron": "*/2 * * * * *"})
class TestCronPeriodicSnapshot(SnapshotTestBase):
"""Test periodic snapshotting"""
@pytest.fixture(autouse=True)
def setup(self, tmp_dir: Path):
super().setup(tmp_dir)
@pytest.mark.asyncio
async def test_snapshot(self, df_seeder_factory, df_server):
seeder = df_seeder_factory.create(
port=df_server.port, keys=10, multi_transaction_probability=0
)
await seeder.run(target_deviation=0.5)
time.sleep(60)
assert super().get_main_file("test-periodic-summary.dfs")
@dfly_args({**BASIC_ARGS})
class TestPathEscapes(SnapshotTestBase):
"""Test that we don't allow path escapes. We just check that df_server.start()