mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: introduce GcsSnapshotStorage (#4004)
It's not been used yet so no functional changes. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
c2710604de
commit
b9c1aaec90
4 changed files with 148 additions and 7 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit 9dd56595b6b4cff71338b8c728eb12a8017c6b97
|
||||
Subproject commit f102aa0371c1e157c3cbeb3e59e196b05ea39a88
|
|
@ -84,7 +84,7 @@ endif()
|
|||
|
||||
cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float)
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib ${AWS_LIB} jsonpath
|
||||
strings_lib html_lib
|
||||
strings_lib html_lib gcp_lib
|
||||
http_client_lib absl::random_random TRDP::jsoncons ${ZSTD_LIB} TRDP::lz4
|
||||
TRDP::croncpp TRDP::flatbuffers)
|
||||
|
||||
|
|
|
@ -24,6 +24,7 @@
|
|||
#include "base/logging.h"
|
||||
#include "io/file_util.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "util/cloud/gcp/gcs_file.h"
|
||||
#include "util/fibers/fiber_file.h"
|
||||
|
||||
namespace dfly {
|
||||
|
@ -32,9 +33,19 @@ namespace detail {
|
|||
using namespace util;
|
||||
using namespace std;
|
||||
|
||||
// Returns bucket_name, obj_path for an s3 path.
|
||||
optional<pair<string, string>> GetBucketPath(string_view path) {
|
||||
std::string_view clean = absl::StripPrefix(path, kS3Prefix);
|
||||
inline bool IsGcsPath(string_view path) {
|
||||
return absl::StartsWith(path, kGCSPrefix);
|
||||
}
|
||||
|
||||
constexpr string_view kSummarySuffix = "summary.dfs"sv;
|
||||
|
||||
pair<string, string> GetBucketPath(string_view path) {
|
||||
string_view clean = path;
|
||||
if (absl::StartsWith(clean, kS3Prefix)) {
|
||||
clean = absl::StripPrefix(clean, kS3Prefix);
|
||||
} else {
|
||||
clean = absl::StripPrefix(clean, kGCSPrefix);
|
||||
}
|
||||
|
||||
size_t pos = clean.find('/');
|
||||
if (pos == string_view::npos) {
|
||||
|
@ -43,7 +54,8 @@ optional<pair<string, string>> GetBucketPath(string_view path) {
|
|||
|
||||
string bucket_name{clean.substr(0, pos)};
|
||||
string obj_path{clean.substr(pos + 1)};
|
||||
return std::make_pair(std::move(bucket_name), std::move(obj_path));
|
||||
|
||||
return make_pair(std::move(bucket_name), std::move(obj_path));
|
||||
}
|
||||
|
||||
#ifdef __linux__
|
||||
|
@ -156,7 +168,7 @@ io::Result<std::string, GenericError> FileSnapshotStorage::LoadPath(std::string_
|
|||
return std::difftime(l.last_modified, r.last_modified) < 0;
|
||||
});
|
||||
auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) {
|
||||
return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs");
|
||||
return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, kSummarySuffix);
|
||||
});
|
||||
if (it != short_vec->rend())
|
||||
return it->name;
|
||||
|
@ -192,6 +204,109 @@ error_code FileSnapshotStorage::CheckPath(const string& path) {
|
|||
return ec;
|
||||
}
|
||||
|
||||
GcsSnapshotStorage::~GcsSnapshotStorage() {
|
||||
}
|
||||
|
||||
error_code GcsSnapshotStorage::Init(unsigned connect_ms) {
|
||||
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
|
||||
CHECK(proactor);
|
||||
error_code ec = creds_provider_.Init(connect_ms, proactor);
|
||||
if (ec)
|
||||
return ec;
|
||||
|
||||
ctx_ = util::http::TlsClient::CreateSslContext();
|
||||
return ec;
|
||||
}
|
||||
|
||||
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> GcsSnapshotStorage::OpenWriteFile(
|
||||
const std::string& path) {
|
||||
CHECK(ctx_);
|
||||
|
||||
pair<string, string> bucket_path = GetBucketPath(path);
|
||||
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
|
||||
unique_ptr<http::ClientPool> conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor);
|
||||
cloud::GcsWriteFileOptions opts;
|
||||
opts.creds_provider = &creds_provider_;
|
||||
opts.pool = conn_pool.release();
|
||||
opts.pool_owned = true;
|
||||
|
||||
io::Result<io::WriteFile*> dest_res =
|
||||
cloud::OpenWriteGcsFile(bucket_path.first, bucket_path.second, opts);
|
||||
if (!dest_res) {
|
||||
return nonstd::make_unexpected(GenericError(dest_res.error(), "Could not open file"));
|
||||
}
|
||||
|
||||
return std::pair(*dest_res, FileType::CLOUD);
|
||||
}
|
||||
|
||||
io::ReadonlyFileOrError GcsSnapshotStorage::OpenReadFile(const std::string& path) {
|
||||
if (!IsGcsPath(path))
|
||||
return nonstd::make_unexpected(GenericError("Invalid GCS path"));
|
||||
|
||||
auto [bucket, key] = GetBucketPath(path);
|
||||
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
|
||||
unique_ptr<http::ClientPool> conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor);
|
||||
cloud::GcsReadFileOptions opts;
|
||||
opts.creds_provider = &creds_provider_;
|
||||
opts.pool = conn_pool.release();
|
||||
opts.pool_owned = true;
|
||||
|
||||
return cloud::OpenReadGcsFile(bucket, key, opts);
|
||||
}
|
||||
|
||||
io::Result<std::string, GenericError> GcsSnapshotStorage::LoadPath(std::string_view dir,
|
||||
std::string_view dbfilename) {
|
||||
LOG(ERROR) << "Load snapshot: Searching for snapshot in GCS path: " << dir << "/" << dbfilename;
|
||||
return nonstd::make_unexpected(GenericError("Not supported"));
|
||||
}
|
||||
|
||||
io::Result<vector<string>, GenericError> GcsSnapshotStorage::ExpandFromPath(
|
||||
const string& load_path) {
|
||||
if (!IsGcsPath(load_path))
|
||||
return nonstd::make_unexpected(
|
||||
GenericError(make_error_code(errc::invalid_argument), "Invalid GCS path"));
|
||||
|
||||
if (!absl::EndsWith(load_path, kSummarySuffix))
|
||||
return vector<string>{};
|
||||
|
||||
const auto [bucket_name, obj_path] = GetBucketPath(load_path);
|
||||
regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));
|
||||
string_view prefix = absl::StripSuffix(obj_path, kSummarySuffix);
|
||||
|
||||
// Find snapshot shard files if we're loading DFS.
|
||||
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
|
||||
auto paths = proactor->Await([&]() -> io::Result<vector<string>, GenericError> {
|
||||
vector<string> res;
|
||||
cloud::GCS gcs(&creds_provider_, ctx_, proactor);
|
||||
|
||||
error_code ec = gcs.List(bucket_name, prefix, false, [&](const cloud::GCS::ObjectItem& item) {
|
||||
std::smatch m;
|
||||
string key{item.key};
|
||||
if (std::regex_match(key, m, re)) {
|
||||
res.push_back(absl::StrCat(kGCSPrefix, bucket_name, "/", item.key));
|
||||
}
|
||||
});
|
||||
|
||||
if (ec) {
|
||||
return nonstd::make_unexpected(ec);
|
||||
}
|
||||
|
||||
return res;
|
||||
});
|
||||
|
||||
if (!paths || paths->empty()) {
|
||||
return nonstd::make_unexpected(
|
||||
GenericError{std::make_error_code(std::errc::no_such_file_or_directory),
|
||||
"Cound not find DFS snapshot shard files"});
|
||||
}
|
||||
|
||||
return *paths;
|
||||
}
|
||||
|
||||
error_code GcsSnapshotStorage::CheckPath(const std::string& path) {
|
||||
return {};
|
||||
}
|
||||
|
||||
#ifdef WITH_AWS
|
||||
AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
|
||||
bool ec2_metadata, bool sign_payload) {
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include "io/io.h"
|
||||
#include "server/common.h"
|
||||
#include "util/cloud/gcp/gcp_creds_provider.h"
|
||||
#include "util/cloud/gcp/gcs.h"
|
||||
#include "util/fibers/fiberqueue_threadpool.h"
|
||||
#include "util/fibers/uring_file.h"
|
||||
|
||||
|
@ -23,6 +25,7 @@ namespace detail {
|
|||
namespace fs = std::filesystem;
|
||||
|
||||
constexpr std::string_view kS3Prefix = "s3://";
|
||||
constexpr std::string_view kGCSPrefix = "gs://";
|
||||
|
||||
const size_t kBucketConnectMs = 2000;
|
||||
|
||||
|
@ -77,6 +80,29 @@ class FileSnapshotStorage : public SnapshotStorage {
|
|||
util::fb2::FiberQueueThreadPool* fq_threadpool_;
|
||||
};
|
||||
|
||||
class GcsSnapshotStorage : public SnapshotStorage {
|
||||
public:
|
||||
~GcsSnapshotStorage();
|
||||
|
||||
std::error_code Init(unsigned connect_ms);
|
||||
|
||||
io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenWriteFile(
|
||||
const std::string& path) override;
|
||||
|
||||
io::ReadonlyFileOrError OpenReadFile(const std::string& path) override;
|
||||
|
||||
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
|
||||
std::string_view dbfilename) override;
|
||||
|
||||
private:
|
||||
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;
|
||||
|
||||
std::error_code CheckPath(const std::string& path) final;
|
||||
|
||||
util::cloud::GCPCredsProvider creds_provider_;
|
||||
SSL_CTX* ctx_ = NULL;
|
||||
};
|
||||
|
||||
#ifdef WITH_AWS
|
||||
class AwsS3SnapshotStorage : public SnapshotStorage {
|
||||
public:
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue