chore: support load/save from GCS (#4006)

Not everything is supported but manual load save is supported.

1. Run dragonfly with `--dir gs://bucket/path`
2. In redis-cli:
   a. SET foo bar
   b. SAVE DF gsdump
   c. DFLY LOAD gs://bucket/path/gsdump-summary.dfs

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-30 13:57:58 +02:00 committed by GitHub
parent daf8604175
commit d10e76b408
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 40 additions and 16 deletions

View file

@ -297,13 +297,14 @@ void SinkReplyBuilder2::Send() {
uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.io_write_cnt++;
reply_stats.io_write_bytes += total_size_;
DVLOG(2) << "Writing " << total_size_ << " bytes";
if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec)
ec_ = ec;
uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs();
reply_stats.send_stats.count++;
reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000;
DVLOG(2) << "Finished writing " << total_size_ << " bytes";
send_active_ = false;
}

View file

@ -36,7 +36,7 @@ namespace fs = std::filesystem;
namespace {
bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
return absl::StartsWith(path, kS3Prefix) || absl::StartsWith(path, kGCSPrefix);
}
// Create a directory and all its parents if they don't exist.
@ -240,7 +240,7 @@ RdbSaver::SnapshotStats SaveStagesController::GetCurrentSnapshotProgress() const
// Summary file is always last in snapshots array.
void SaveStagesController::SaveDfs() {
// Extend all filenames with -{sid} or -summary and append .dfs.tmp
const string_view ext = is_cloud_ ? ".dfs" : ".dfs.tmp";
const string_view ext = snapshot_storage_->IsCloud() ? ".dfs" : ".dfs.tmp";
ShardId sid = 0;
for (auto& [_, filename] : snapshots_) {
filename = full_path_;
@ -286,7 +286,7 @@ void SaveStagesController::SaveRdb() {
filename = full_path_;
if (!filename.has_extension())
filename += ".rdb";
if (!is_cloud_)
if (!snapshot_storage_->IsCloud())
filename += ".tmp";
if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) {
@ -342,7 +342,7 @@ void SaveStagesController::InitResources() {
// Remove .tmp extension or delete files in case of error
GenericError SaveStagesController::FinalizeFileMovement() {
if (is_cloud_)
if (snapshot_storage_->IsCloud())
return {};
DVLOG(1) << "FinalizeFileMovement start";
@ -391,7 +391,7 @@ GenericError SaveStagesController::BuildFullPath() {
dest_buf.resize(len);
full_path_ = dir_path / dest_buf;
is_cloud_ = IsCloudPath(full_path_.string());
return {};
}

View file

@ -124,7 +124,6 @@ struct SaveStagesController : public SaveStagesInputs {
private:
time_t start_time_;
std::filesystem::path full_path_;
bool is_cloud_;
AggregateGenericError shared_err_;
std::vector<std::pair<std::unique_ptr<RdbSnapshot>, std::filesystem::path>> snapshots_;

View file

@ -41,11 +41,8 @@ constexpr string_view kSummarySuffix = "summary.dfs"sv;
pair<string, string> GetBucketPath(string_view path) {
string_view clean = path;
if (absl::StartsWith(clean, kS3Prefix)) {
clean = absl::StripPrefix(clean, kS3Prefix);
} else {
clean = absl::StripPrefix(clean, kGCSPrefix);
}
auto prefix = absl::StartsWith(clean, kS3Prefix) ? kS3Prefix : kGCSPrefix;
clean = absl::StripPrefix(clean, prefix);
size_t pos = clean.find('/');
if (pos == string_view::npos) {

View file

@ -54,6 +54,10 @@ class SnapshotStorage {
// Searches for all the relevant snapshot files given the RDB file or DFS summary file path.
io::Result<std::vector<std::string>, GenericError> ExpandSnapshot(const std::string& load_path);
virtual bool IsCloud() const {
return false;
}
protected:
virtual io::Result<std::vector<std::string>, GenericError> ExpandFromPath(
const std::string& path) = 0;
@ -94,6 +98,10 @@ class GcsSnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;
bool IsCloud() const final {
return true;
}
private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;
@ -117,6 +125,10 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;
bool IsCloud() const final {
return true;
}
private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

View file

@ -228,7 +228,6 @@ using strings::HumanReadableNumBytes;
namespace {
const auto kRedisVersion = "6.2.11";
constexpr string_view kS3Prefix = "s3://"sv;
using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder, ConnectionContext* cntx);
@ -252,8 +251,12 @@ string UnknownCmd(string cmd, CmdArgList args) {
absl::StrJoin(args.begin(), args.end(), ", ", CmdArgListFormatter()));
}
bool IsCloudPath(string_view path) {
return absl::StartsWith(path, kS3Prefix);
bool IsS3Path(string_view path) {
return absl::StartsWith(path, detail::kS3Prefix);
}
bool IsGCSPath(string_view path) {
return absl::StartsWith(path, detail::kGCSPrefix);
}
// Check that if TLS is used at least one form of client authentication is
@ -866,7 +869,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
}
string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
if (IsS3Path(flag_dir)) {
#ifdef WITH_AWS
shard_set->pool()->GetNextProactor()->Await([&] { util::aws::Init(); });
snapshot_storage_ = std::make_shared<detail::AwsS3SnapshotStorage>(
@ -875,6 +878,14 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
#else
LOG(ERROR) << "Compiled without AWS support";
#endif
} else if (IsGCSPath(flag_dir)) {
auto gcs = std::make_shared<detail::GcsSnapshotStorage>();
auto ec = shard_set->pool()->GetNextProactor()->Await([&] { return gcs->Init(3000); });
if (ec) {
LOG(ERROR) << "Failed to initialize GCS snapshot storage: " << ec.message();
exit(1);
}
snapshot_storage_ = std::move(gcs);
} else if (fq_threadpool_) {
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(fq_threadpool_.get());
} else {
@ -1174,6 +1185,8 @@ void ServerFamily::SnapshotScheduling() {
io::Result<size_t> ServerFamily::LoadRdb(const std::string& rdb_file,
LoadExistingKeys existing_keys) {
VLOG(1) << "Loading data from " << rdb_file;
error_code ec;
io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file);
if (res) {
@ -1635,6 +1648,8 @@ GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view bas
"SAVING - can not save database"};
}
VLOG(1) << "Saving snapshot to " << basename;
save_controller_ = make_unique<SaveStagesController>(detail::SaveStagesInputs{
new_version, basename, trans, &service_, fq_threadpool_.get(), snapshot_storage_});