mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat: Implement rdb snapshot write directly into s3. (#1205)
1. Load flow reorganized - most of the logic is now in InferLoadFile function. S3 read is not yet supported. 2. Write path is implemented. Specifically, you can use undocumented (by design) option to save like: `SAVE rdb s3://bucket/path/file`. 3. When using `--dir=s3://bucket/path/` it also saves into s3 on shutdown. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
39aed005b5
commit
4e96c56969
3 changed files with 171 additions and 42 deletions
|
@ -25,8 +25,8 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
|
|||
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc)
|
||||
|
||||
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib
|
||||
absl::random_random TRDP::jsoncons zstd TRDP::lz4)
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib aws_lib strings_lib html_lib
|
||||
http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4)
|
||||
|
||||
add_library(dfly_test_lib test_utils.cc)
|
||||
cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext)
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include <absl/strings/match.h>
|
||||
#include <absl/strings/str_join.h>
|
||||
#include <absl/strings/str_replace.h>
|
||||
#include <absl/strings/strip.h>
|
||||
#include <sys/resource.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
@ -45,6 +46,8 @@ extern "C" {
|
|||
#include "server/version.h"
|
||||
#include "strings/human_readable.h"
|
||||
#include "util/accept_server.h"
|
||||
#include "util/cloud/aws.h"
|
||||
#include "util/cloud/s3.h"
|
||||
#include "util/fibers/fiber_file.h"
|
||||
#include "util/uring/uring_file.h"
|
||||
|
||||
|
@ -82,8 +85,10 @@ using strings::HumanReadableNumBytes;
|
|||
namespace {
|
||||
|
||||
const auto kRedisVersion = "6.2.11";
|
||||
constexpr string_view kS3Prefix = "s3://"sv;
|
||||
|
||||
const auto kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT;
|
||||
const size_t kBucketConnectMs = 2000;
|
||||
|
||||
using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
|
@ -114,12 +119,74 @@ void SubstituteFilenameTsPlaceholder(fs::path* filename, std::string_view replac
|
|||
*filename = absl::StrReplaceAll(filename->string(), {{"{timestamp}", replacement}});
|
||||
}
|
||||
|
||||
string InferLoadFile(fs::path data_dir) {
|
||||
bool IsCloudPath(string_view path) {
|
||||
return absl::StartsWith(path, kS3Prefix);
|
||||
}
|
||||
|
||||
// Returns bucket_name, obj_path for an s3 path.
|
||||
optional<pair<string, string>> GetBucketPath(string_view path) {
|
||||
string_view clean = absl::StripPrefix(path, kS3Prefix);
|
||||
|
||||
size_t pos = clean.find('/');
|
||||
if (pos == string_view::npos)
|
||||
return nullopt;
|
||||
|
||||
string bucket_name{clean.substr(0, pos)};
|
||||
string obj_path{clean.substr(pos + 1)};
|
||||
return make_pair(move(bucket_name), move(obj_path));
|
||||
}
|
||||
|
||||
string InferLoadFile(string_view dir, cloud::AWS* aws) {
|
||||
fs::path data_folder;
|
||||
string_view bucket_name, obj_path;
|
||||
|
||||
if (dir.empty()) {
|
||||
data_folder = fs::current_path();
|
||||
} else {
|
||||
if (IsCloudPath(dir)) {
|
||||
CHECK(aws);
|
||||
auto res = GetBucketPath(dir);
|
||||
if (!res) {
|
||||
LOG(ERROR) << "Invalid S3 path: " << dir;
|
||||
return {};
|
||||
}
|
||||
data_folder = dir;
|
||||
bucket_name = res->first;
|
||||
obj_path = res->second;
|
||||
} else {
|
||||
error_code file_ec;
|
||||
data_folder = fs::canonical(dir, file_ec);
|
||||
if (file_ec) {
|
||||
LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir;
|
||||
return {};
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
LOG(INFO) << "Data directory is " << data_folder;
|
||||
|
||||
const auto& dbname = GetFlag(FLAGS_dbfilename);
|
||||
if (dbname.empty())
|
||||
return string{};
|
||||
|
||||
fs::path fl_path = data_dir.append(dbname);
|
||||
if (IsCloudPath(dir)) {
|
||||
cloud::S3Bucket bucket(*aws, bucket_name);
|
||||
ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
||||
auto ec = proactor->Await([&] { return bucket.Connect(kBucketConnectMs); });
|
||||
if (ec) {
|
||||
LOG(ERROR) << "Couldn't connect to S3 bucket: " << ec.message();
|
||||
return {};
|
||||
}
|
||||
|
||||
fs::path fl_path{obj_path};
|
||||
fl_path.append(dbname);
|
||||
|
||||
LOG(INFO) << "Loading from s3 path s3://" << bucket_name << "/" << fl_path;
|
||||
// TODO: to load from S3 file.
|
||||
return {};
|
||||
}
|
||||
|
||||
fs::path fl_path = data_folder.append(dbname);
|
||||
if (fs::exists(fl_path))
|
||||
return fl_path.generic_string();
|
||||
|
||||
|
@ -204,9 +271,18 @@ class RdbSnapshot {
|
|||
return started_ || (saver_ && saver_->Mode() == SaveMode::SUMMARY);
|
||||
}
|
||||
|
||||
// Sets a pointer to global aws object that provides an auth key.
|
||||
// The ownership stays with the caller.
|
||||
void SetAWS(cloud::AWS* aws) {
|
||||
aws_ = aws;
|
||||
}
|
||||
|
||||
private:
|
||||
bool started_ = false;
|
||||
FiberQueueThreadPool* fq_tp_;
|
||||
bool is_linux_file_ = false;
|
||||
FiberQueueThreadPool* fq_tp_ = nullptr;
|
||||
cloud::AWS* aws_ = nullptr;
|
||||
|
||||
unique_ptr<io::Sink> io_sink_;
|
||||
unique_ptr<RdbSaver> saver_;
|
||||
RdbTypeFreqMap freq_map_;
|
||||
|
@ -226,6 +302,28 @@ io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
|
|||
GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
|
||||
const StringVec& lua_scripts) {
|
||||
bool is_direct = false;
|
||||
VLOG(1) << "Saving RDB " << path;
|
||||
|
||||
if (IsCloudPath(path)) {
|
||||
DCHECK(aws_);
|
||||
|
||||
optional<pair<string_view, string_view>> bucket_path = GetBucketPath(path);
|
||||
if (!bucket_path) {
|
||||
return GenericError("Invalid S3 path");
|
||||
}
|
||||
auto [bucket_name, obj_path] = *bucket_path;
|
||||
|
||||
cloud::S3Bucket bucket(*aws_, bucket_name);
|
||||
error_code ec = bucket.Connect(kBucketConnectMs);
|
||||
if (ec) {
|
||||
return GenericError(ec, "Couldn't connect to S3 bucket");
|
||||
}
|
||||
auto res = bucket.OpenWriteFile(obj_path);
|
||||
if (!res) {
|
||||
return GenericError(res.error(), "Couldn't open file for writing");
|
||||
}
|
||||
io_sink_.reset(*res);
|
||||
} else {
|
||||
if (fq_tp_) { // EPOLL
|
||||
auto res = util::OpenFiberWriteFile(path, fq_tp_);
|
||||
if (!res)
|
||||
|
@ -238,9 +336,11 @@ GenericError RdbSnapshot::Start(SaveMode save_mode, const std::string& path,
|
|||
res.error(),
|
||||
"Couldn't open file for writing (is direct I/O supported by the file system?)");
|
||||
}
|
||||
is_linux_file_ = true;
|
||||
io_sink_.reset(new LinuxWriteWrapper(res->release()));
|
||||
is_direct = kRdbWriteFlags & O_DIRECT;
|
||||
}
|
||||
}
|
||||
|
||||
saver_.reset(new RdbSaver(io_sink_.get(), save_mode, is_direct));
|
||||
|
||||
|
@ -252,11 +352,10 @@ error_code RdbSnapshot::SaveBody() {
|
|||
}
|
||||
|
||||
error_code RdbSnapshot::Close() {
|
||||
// TODO: to solve it in a more elegant way.
|
||||
if (fq_tp_) {
|
||||
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
|
||||
}
|
||||
if (is_linux_file_) {
|
||||
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
|
||||
}
|
||||
return static_cast<io::WriteFile*>(io_sink_.get())->Close();
|
||||
}
|
||||
|
||||
void RdbSnapshot::StartInShard(EngineShard* shard) {
|
||||
|
@ -279,7 +378,9 @@ void ExtendDfsFilenameWithShard(int shard, fs::path* filename) {
|
|||
}
|
||||
|
||||
GenericError ValidateFilename(const fs::path& filename, bool new_version) {
|
||||
if (!filename.parent_path().empty()) {
|
||||
bool is_cloud_path = IsCloudPath(filename.string());
|
||||
|
||||
if (!filename.parent_path().empty() && !is_cloud_path) {
|
||||
return {absl::StrCat("filename may not contain directory separators (Got \"", filename.c_str(),
|
||||
"\")")};
|
||||
}
|
||||
|
@ -446,23 +547,18 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
|
|||
stats_caching_task_ =
|
||||
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });
|
||||
|
||||
fs::path data_folder = fs::current_path();
|
||||
const auto& dir = GetFlag(FLAGS_dir);
|
||||
|
||||
error_code file_ec;
|
||||
if (!dir.empty()) {
|
||||
data_folder = fs::canonical(dir, file_ec);
|
||||
string flag_dir = GetFlag(FLAGS_dir);
|
||||
if (IsCloudPath(flag_dir)) {
|
||||
aws_ = make_unique<cloud::AWS>("s3");
|
||||
if (auto ec = aws_->Init(); ec) {
|
||||
LOG(FATAL) << "Failed to initialize AWS " << ec;
|
||||
}
|
||||
}
|
||||
|
||||
if (!file_ec) {
|
||||
LOG(INFO) << "Data directory is " << data_folder;
|
||||
string load_path = InferLoadFile(data_folder);
|
||||
string load_path = InferLoadFile(flag_dir, aws_.get());
|
||||
if (!load_path.empty()) {
|
||||
load_result_ = Load(load_path);
|
||||
}
|
||||
} else {
|
||||
LOG(ERROR) << "Data directory error: " << file_ec.message();
|
||||
}
|
||||
|
||||
string save_time = GetFlag(FLAGS_save_schedule);
|
||||
if (!save_time.empty()) {
|
||||
|
@ -918,7 +1014,7 @@ GenericError DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,
|
|||
|
||||
// Start rdb saving.
|
||||
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
|
||||
GenericError local_ec = snapshot->Start(mode, full_path.generic_string(), scripts);
|
||||
GenericError local_ec = snapshot->Start(mode, full_path.string(), scripts);
|
||||
|
||||
if (!local_ec && mode == SaveMode::SINGLE_SHARD) {
|
||||
snapshot->StartInShard(shard);
|
||||
|
@ -933,10 +1029,10 @@ GenericError ServerFamily::DoSave() {
|
|||
boost::intrusive_ptr<Transaction> trans(
|
||||
new Transaction{cid, ServerState::tlocal()->thread_index()});
|
||||
trans->InitByArgs(0, {});
|
||||
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), trans.get());
|
||||
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get());
|
||||
}
|
||||
|
||||
GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
||||
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) {
|
||||
fs::path dir_path(GetFlag(FLAGS_dir));
|
||||
AggregateGenericError ec;
|
||||
|
||||
|
@ -959,7 +1055,13 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
|||
|
||||
absl::Time start = absl::Now();
|
||||
|
||||
fs::path filename = GetFlag(FLAGS_dbfilename);
|
||||
fs::path filename;
|
||||
|
||||
if (basename.empty())
|
||||
filename = GetFlag(FLAGS_dbfilename);
|
||||
else
|
||||
filename = basename;
|
||||
|
||||
if (auto ec = ValidateFilename(filename, new_version); ec) {
|
||||
return ec;
|
||||
}
|
||||
|
@ -1014,6 +1116,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
|||
auto& snapshot = snapshots[shard_set->size()];
|
||||
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
|
||||
|
||||
snapshot->SetAWS(aws_.get());
|
||||
if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) {
|
||||
ec = local_ec;
|
||||
snapshot.reset();
|
||||
|
@ -1024,6 +1127,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
|||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto& snapshot = snapshots[shard->shard_id()];
|
||||
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
|
||||
snapshot->SetAWS(aws_.get());
|
||||
if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) {
|
||||
ec = local_ec;
|
||||
snapshot.reset();
|
||||
|
@ -1042,8 +1146,21 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
|||
|
||||
snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
|
||||
auto lua_scripts = get_scripts();
|
||||
string path_str = path.string();
|
||||
|
||||
ec = snapshots[0]->Start(SaveMode::RDB, path.generic_string(), lua_scripts);
|
||||
if (IsCloudPath(path_str)) {
|
||||
if (!aws_) {
|
||||
aws_ = make_unique<cloud::AWS>("s3");
|
||||
if (auto ec = aws_->Init(); ec) {
|
||||
LOG(ERROR) << "Failed to initialize AWS " << ec;
|
||||
aws_.reset();
|
||||
return GenericError(ec, "Couldn't initialize AWS");
|
||||
}
|
||||
}
|
||||
snapshots[0]->SetAWS(aws_.get());
|
||||
}
|
||||
|
||||
ec = snapshots[0]->Start(SaveMode::RDB, path.string(), lua_scripts);
|
||||
|
||||
if (!ec) {
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1425,7 +1542,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
if (args.size() == 1) {
|
||||
if (args.size() >= 1) {
|
||||
ToUpper(&args[0]);
|
||||
string_view sub_cmd = ArgS(args, 0);
|
||||
if (sub_cmd == "DF") {
|
||||
|
@ -1437,7 +1554,12 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
GenericError ec = DoSave(new_version, cntx->transaction);
|
||||
string_view basename;
|
||||
if (args.size() == 2) {
|
||||
basename = ArgS(args, 1);
|
||||
}
|
||||
|
||||
GenericError ec = DoSave(new_version, basename, cntx->transaction);
|
||||
if (ec) {
|
||||
(*cntx)->SendError(ec.Format());
|
||||
} else {
|
||||
|
|
|
@ -16,6 +16,11 @@ namespace util {
|
|||
class AcceptServer;
|
||||
class ListenerInterface;
|
||||
class HttpListenerBase;
|
||||
|
||||
namespace cloud {
|
||||
class AWS;
|
||||
} // namespace cloud
|
||||
|
||||
} // namespace util
|
||||
|
||||
namespace dfly {
|
||||
|
@ -94,7 +99,8 @@ class ServerFamily {
|
|||
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);
|
||||
|
||||
// if new_version is true, saves DF specific, non redis compatible snapshot.
|
||||
GenericError DoSave(bool new_version, Transaction* transaction);
|
||||
// if basename is not empty it will override dbfilename flag.
|
||||
GenericError DoSave(bool new_version, std::string_view basename, Transaction* transaction);
|
||||
|
||||
// Calls DoSave with a default generated transaction and with the format
|
||||
// specified in --df_snapshot_format
|
||||
|
@ -203,6 +209,7 @@ class ServerFamily {
|
|||
|
||||
Done schedule_done_;
|
||||
std::unique_ptr<FiberQueueThreadPool> fq_threadpool_;
|
||||
std::unique_ptr<util::cloud::AWS> aws_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue