feat(tiering): DiskStorage (#2770)

* feat(tiering): DiskStorage

Simple DiskStorage for future tiering purposes

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-03-27 21:31:35 +03:00 committed by GitHub
parent cd20c4003d
commit ad13cc6b9c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 304 additions and 20 deletions

View file

@ -49,7 +49,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
cluster/cluster_shard_migration.cc cluster/outgoing_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)
acl/validator.cc acl/helpers.cc
tiering/disk_storage.cc)
if (DF_ENABLE_MEMORY_TRACKING)
target_compile_definitions(dragonfly_lib PRIVATE DFLY_ENABLE_MEMORY_TRACKING)
@ -102,6 +103,7 @@ cxx_test(acl/user_registry_test dfly_test_lib LABELS DFLY)
cxx_test(acl/acl_family_test dfly_test_lib LABELS DFLY)
cxx_test(engine_shard_set_test dfly_test_lib LABELS DFLY)
cxx_test(search/search_family_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
if (WITH_ASAN OR WITH_USAN)
target_compile_definitions(stream_family_test PRIVATE SANITIZERS)
target_compile_definitions(multi_test PRIVATE SANITIZERS)

View file

@ -33,13 +33,9 @@ constexpr inline size_t alignup(size_t num, size_t align) {
} // namespace
IoMgr::IoMgr() {
flags_val = 0;
}
constexpr size_t kInitialSize = 1UL << 28; // 256MB
error_code IoMgr::Open(const string& path) {
error_code IoMgr::Open(std::string_view path) {
CHECK(!backing_file_);
int kFlags = O_CREAT | O_RDWR | O_TRUNC | O_CLOEXEC;
@ -71,10 +67,30 @@ error_code IoMgr::Open(const string& path) {
return error_code{};
}
error_code IoMgr::Grow(size_t len) {
Proactor* proactor = (Proactor*)ProactorBase::me();
if (exchange(grow_progress_, true))
return make_error_code(errc::operation_in_progress);
fb2::FiberCall fc(proactor);
fc->PrepFallocate(backing_file_->fd(), 0, sz_, len);
Proactor::IoResult res = fc.Get();
grow_progress_ = false;
if (res == 0) {
sz_ += len;
return {};
} else {
return std::error_code(-res, std::iostream_category());
}
}
error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
DCHECK_EQ(0u, len % (1 << 20));
if (flags.grow_progress) {
if (exchange(grow_progress_, true)) {
return make_error_code(errc::operation_in_progress);
}
@ -82,14 +98,13 @@ error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
SubmitEntry entry = proactor->GetSubmitEntry(
[this, len, cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t) {
this->flags.grow_progress = 0;
this->grow_progress_ = false;
sz_ += (res == 0 ? len : 0);
cb(res);
},
0);
entry.PrepFallocate(backing_file_->fd(), 0, sz_, len);
flags.grow_progress = 1;
return error_code{};
}
@ -146,8 +161,22 @@ error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
return ec;
}
std::error_code IoMgr::ReadAsync(size_t offset, absl::Span<uint8_t> buffer, ReadCb cb) {
DCHECK(!buffer.empty());
VLOG(1) << "Read " << offset << "/" << buffer.size();
Proactor* proactor = (Proactor*)ProactorBase::me();
auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };
SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
se.PrepRead(backing_file_->fd(), buffer.data(), buffer.size(), offset);
return error_code{};
}
void IoMgr::Shutdown() {
while (flags_val) {
while (grow_progress_) {
ThisFiber::SleepFor(200us); // TODO: hacky for now.
}
}

View file

@ -22,12 +22,15 @@ class IoMgr {
// (io_res, )
using GrowCb = std::function<void(int)>;
IoMgr();
using ReadCb = std::function<void(int)>;
// blocks until all the pending requests are finished.
void Shutdown();
std::error_code Open(const std::string& path);
std::error_code Open(std::string_view path);
// Try growing file by that length. Return error if growth failed.
std::error_code Grow(size_t len);
// Grows file by that length. len must be divided by 1MB.
// passing other values will check-fail.
@ -36,15 +39,20 @@ class IoMgr {
// Returns error if submission failed. Otherwise - returns the io result
// via cb. A caller must make sure that the blob exists until cb is called.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
// Read synchronously into dest
std::error_code Read(size_t offset, io::MutableBytes dest);
// Read into dest and call cb once read
std::error_code ReadAsync(size_t offset, io::MutableBytes dest, ReadCb cb);
// Total file span
size_t Span() const {
return sz_;
}
bool grow_pending() const {
return flags.grow_progress;
return grow_progress_;
}
const IoMgrStats& GetStats() const {
@ -55,13 +63,7 @@ class IoMgr {
std::unique_ptr<util::fb2::LinuxFile> backing_file_;
size_t sz_ = 0;
union {
uint8_t flags_val;
struct {
uint8_t grow_progress : 1;
} flags;
};
bool grow_progress_ = false;
IoMgrStats stats_;
};

View file

@ -0,0 +1,73 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/tiering/disk_storage.h"
#include "base/io_buf.h"
#include "base/logging.h"
#include "server/error.h"
namespace dfly::tiering {
std::error_code DiskStorage::Open(std::string_view path) {
RETURN_ON_ERR(io_mgr_.Open(path));
alloc_.AddStorage(0, io_mgr_.Span());
return {};
}
void DiskStorage::Close() {
io_mgr_.Shutdown();
}
void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % 4096, 0u);
// TODO: use registered buffers (UringProactor::RegisterBuffers)
uint8_t* buf = new uint8_t[segment.length];
auto io_cb = [cb, buf, segment](int res) {
cb(std::string_view{reinterpret_cast<char*>(buf), segment.length});
delete buf; // because std::function needs to be copyable, unique_ptr can't be used
};
io_mgr_.ReadAsync(segment.offset, {buf, segment.length}, std::move(io_cb));
}
void DiskStorage::MarkAsFree(DiskSegment segment) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % 4096, 0u);
alloc_.Free(segment.offset, segment.length);
}
std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
DCHECK_GT(bytes.length(), 0u);
int64_t offset = alloc_.Malloc(bytes.size());
// If we've run out of space, block and grow as much as needed
if (offset < 0) {
size_t start = io_mgr_.Span();
size_t grow_size = -offset;
RETURN_ON_ERR(io_mgr_.Grow(grow_size));
alloc_.AddStorage(start, grow_size);
offset = alloc_.Malloc(bytes.size());
if (offset < 0) // we can't fit it even after resizing
return std::make_error_code(std::errc::file_too_large);
}
auto io_cb = [this, cb, offset, bytes, len = bytes.size()](int io_res) {
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({});
} else {
cb({size_t(offset), len});
}
};
return io_mgr_.WriteAsync(offset, io::View(bytes), std::move(io_cb));
}
} // namespace dfly::tiering

View file

@ -0,0 +1,46 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <string>
#include <system_error>
#include "core/external_alloc.h"
#include "io/io.h"
#include "server/io_mgr.h"
namespace dfly::tiering {
struct DiskSegment {
// Mesured in bytes, offset should be aligned to page boundaries (4kb)
size_t offset, length;
};
// Disk storage controlled by asynchronous operations.
class DiskStorage {
public:
using ReadCb = std::function<void(std::string_view)>;
using StashCb = std::function<void(DiskSegment)>;
std::error_code Open(std::string_view path);
void Close();
// Request read for segment, cb will be called on completion with read value
void Read(DiskSegment segment, ReadCb cb);
// Mark segment as free, performed immediately
void MarkAsFree(DiskSegment segment);
// Request bytes to be stored, cb will be called with assigned segment on completion. Can block to
// grow backing file. Returns error code if operation failed immediately (most likely it failed
// to grow the backing file) or passes an empty segment if the final write operation failed.
std::error_code Stash(io::Bytes bytes, StashCb cb);
private:
IoMgr io_mgr_;
ExternalAllocator alloc_;
};
}; // namespace dfly::tiering

View file

@ -0,0 +1,132 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/tiering/disk_storage.h"
#include <memory>
#include "base/gtest.h"
#include "base/logging.h"
#include "util/fibers/fibers.h"
#include "util/fibers/pool.h"
namespace dfly::tiering {
using namespace std;
using namespace std::string_literals;
class PoolTestBase : public testing::Test {
protected:
static void SetUpTestSuite();
static void TearDownTestSuite();
static unique_ptr<util::ProactorPool> pp_;
};
unique_ptr<util::ProactorPool> PoolTestBase::pp_ = nullptr;
void PoolTestBase::SetUpTestSuite() {
pp_.reset(util::fb2::Pool::IOUring(16, 2));
pp_->Run();
}
void PoolTestBase::TearDownTestSuite() {
pp_->Stop();
pp_.reset();
}
struct DiskStorageTest : public PoolTestBase {
~DiskStorageTest() {
EXPECT_EQ(pending_ops_, 0);
}
void Open() {
storage_ = make_unique<DiskStorage>();
storage_->Open("disk_storage_test_backing");
}
void Close() {
storage_->Close();
storage_.reset();
unlink("disk_storage_test_backing");
}
void Stash(size_t index, string value) {
pending_ops_++;
auto buf = make_shared<string>(value);
storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment) {
segments_[index] = segment;
pending_ops_--;
});
}
void Read(size_t index) {
pending_ops_++;
storage_->Read(segments_[index], [this, index](string_view value) {
last_reads_[index] = value;
pending_ops_--;
});
}
void Delete(size_t index) {
storage_->MarkAsFree(segments_[index]);
segments_.erase(index);
last_reads_.erase(index);
}
void Wait() {
while (pending_ops_ > 0) {
::util::ThisFiber::SleepFor(1ms);
}
}
protected:
int pending_ops_ = 0;
std::unordered_map<size_t, string> last_reads_;
std::unordered_map<size_t, DiskSegment> segments_;
std::unique_ptr<DiskStorage> storage_;
};
TEST_F(DiskStorageTest, Basic) {
pp_->at(0)->Await([this] {
// Write 100 values
Open();
for (size_t i = 0; i < 100; i++)
Stash(i, absl::StrCat("value", i));
Wait();
EXPECT_EQ(segments_.size(), 100);
// Read all 100 values
for (size_t i = 0; i < 100; i++)
Read(i);
Wait();
// Expect them to be equal to written
for (size_t i = 0; i < 100; i++)
EXPECT_EQ(last_reads_[i], absl::StrCat("value", i));
Close();
});
}
TEST_F(DiskStorageTest, ReUse) {
pp_->at(0)->Await([this] {
Open();
Stash(0, "value1");
Wait();
EXPECT_EQ(segments_[0].offset, 0u);
Delete(0);
Stash(1, "value2");
Wait();
EXPECT_EQ(segments_[1].offset, 0u);
Close();
});
}
} // namespace dfly::tiering