feat(tiering): Registered buffers (#2967)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-04-29 10:00:07 +03:00 committed by GitHub
parent bbe6c8579a
commit 1abd7e5fd3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 170 additions and 158 deletions

2
helio

@ -1 +1 @@
Subproject commit 8095758cd5ed11bd40b76bbf20c852bde6180c95
Subproject commit 3545a85fcaff3e36a99db6fbc91c6e51e0280f14

View file

@ -41,7 +41,7 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}
// Find entry by key in db_slice and store external segment in place of original value
void SetExternal(std::string_view key, tiering::DiskSegment segment) {
void SetExternal(string_view key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats
@ -50,14 +50,27 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}
}
void ClearIoPending(std::string_view key) {
// Find bin by id and call SetExternal for all contained entries
void SetExternal(tiering::SmallBins::BinId id, tiering::DiskSegment segment) {
for (const auto& [sub_key, sub_segment] : ts_->bins_->ReportStashed(id, segment))
SetExternal(string_view{sub_key}, sub_segment);
}
// Clear IO pending flag for entry
void ClearIoPending(string_view key) {
if (auto pv = Find(key); pv)
pv->SetIoPending(false);
}
// Clear IO pending flag for all contained entries of bin
void ClearIoPending(tiering::SmallBins::BinId id) {
for (const string& key : ts_->bins_->ReportStashAborted(id))
ClearIoPending(key);
}
// Find entry by key and store it's up-to-date value in place of external segment.
// Returns false if the value is outdated, true otherwise
bool SetInMemory(std::string_view key, std::string_view value, tiering::DiskSegment segment) {
bool SetInMemory(string_view key, string_view value, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
pv->Reset(); // TODO: account for memory
pv->SetString(value);
@ -68,17 +81,16 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
return false;
}
void ReportStashed(EntryId id, tiering::DiskSegment segment) override {
if (holds_alternative<string_view>(id)) {
SetExternal(get<string_view>(id), segment);
void ReportStashed(EntryId id, tiering::DiskSegment segment, error_code ec) override {
if (ec) {
VLOG(1) << "Stash failed " << ec.message();
visit([this](auto id) { ClearIoPending(id); }, id);
} else {
for (const auto& [sub_key, sub_segment] :
ts_->bins_->ReportStashed(get<tiering::SmallBins::BinId>(id), segment))
SetExternal(string_view{sub_key}, sub_segment);
visit([this, segment](auto id) { SetExternal(id, segment); }, id);
}
}
void ReportFetched(EntryId id, std::string_view value, tiering::DiskSegment segment,
void ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
DCHECK(holds_alternative<string_view>(id)); // we never issue reads for bins
@ -104,7 +116,7 @@ class TieredStorageV2::ShardOpManager : public tiering::OpManager {
}
private:
PrimeValue* Find(std::string_view key) {
PrimeValue* Find(string_view key) {
// TODO: Get DbContext for transaction for correct dbid and time
auto it = db_slice_->FindMutable(DbContext{}, key);
return IsValid(it.it) ? &it.it->second : nullptr;
@ -126,7 +138,7 @@ TieredStorageV2::TieredStorageV2(DbSlice* db_slice)
TieredStorageV2::~TieredStorageV2() {
}
std::error_code TieredStorageV2::Open(string_view path) {
error_code TieredStorageV2::Open(string_view path) {
return op_manager_->Open(path);
}
@ -134,10 +146,10 @@ void TieredStorageV2::Close() {
op_manager_->Close();
}
util::fb2::Future<std::string> TieredStorageV2::Read(string_view key, const PrimeValue& value) {
util::fb2::Future<string> TieredStorageV2::Read(string_view key, const PrimeValue& value) {
DCHECK(value.IsExternal());
util::fb2::Future<std::string> future;
op_manager_->Enqueue(key, value.GetExternalSlice(), [future](std::string* value) mutable {
util::fb2::Future<string> future;
op_manager_->Enqueue(key, value.GetExternalSlice(), [future](string* value) mutable {
future.Resolve(*value);
return false;
});
@ -167,14 +179,19 @@ void TieredStorageV2::Stash(string_view key, PrimeValue* value) {
string_view value_sv = value->GetSlice(&buf);
value->SetIoPending(true);
tiering::OpManager::EntryId id;
error_code ec;
if (value->Size() >= kMinOccupancySize) {
if (auto ec = op_manager_->Stash(key, value_sv); ec)
value->SetIoPending(false);
id = key;
ec = op_manager_->Stash(key, value_sv);
} else if (auto bin = bins_->Stash(key, value_sv); bin) {
if (auto ec = op_manager_->Stash(bin->first, bin->second); ec) {
for (const string& key : bins_->ReportStashAborted(bin->first))
op_manager_->ClearIoPending(key); // clear IO_PENDING flag
}
id = bin->first;
ec = op_manager_->Stash(bin->first, bin->second);
}
if (ec) {
VLOG(1) << "Stash failed immediately" << ec.message();
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
}
}
void TieredStorageV2::Delete(string_view key, PrimeValue* value) {

View file

@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(string, tiered_prefix_v2);
ABSL_DECLARE_FLAG(bool, tiered_storage_v2_cache_fetched);
ABSL_DECLARE_FLAG(bool, backing_file_direct);
namespace dfly {
@ -44,6 +45,7 @@ class TieredStorageV2Test : public BaseFamilyTest {
absl::SetFlag(&FLAGS_tiered_prefix, "");
absl::SetFlag(&FLAGS_tiered_prefix_v2, "/tmp/tiered_storage_test");
absl::SetFlag(&FLAGS_tiered_storage_v2_cache_fetched, true);
absl::SetFlag(&FLAGS_backing_file_direct, true);
BaseFamilyTest::SetUp();
}

View file

@ -4,16 +4,70 @@
#include "server/tiering/disk_storage.h"
#include "base/flags.h"
#include "base/io_buf.h"
#include "base/logging.h"
#include "server/error.h"
#include "server/tiering/common.h"
#include "util/fibers/uring_proactor.h"
using namespace ::dfly::tiering::literals;
ABSL_FLAG(uint64_t, registered_buffer_size, 512_KB,
"Size of registered buffer for IoUring fixed read/writes");
namespace dfly::tiering {
using namespace ::util::fb2;
namespace {
UringBuf AllocateTmpBuf(size_t size) {
size = (size + kPageSize - 1) / kPageSize * kPageSize;
VLOG(1) << "Fallback to temporary allocation: " << size;
uint8_t* buf = new (std::align_val_t(kPageSize)) uint8_t[size];
return UringBuf{{buf, size}, std::nullopt};
}
void DestroyTmpBuf(UringBuf buf) {
DCHECK(!buf.buf_idx);
::operator delete[](buf.bytes.data(), std::align_val_t(kPageSize));
}
UringBuf PrepareBuf(size_t size) {
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
UringBuf buf;
if (auto borrowed = up->RequestBuffer(size); borrowed)
return *borrowed;
else
return AllocateTmpBuf(size);
}
void ReturnBuf(UringBuf buf) {
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
if (buf.buf_idx)
up->ReturnBuffer(buf);
else
DestroyTmpBuf(buf);
}
} // anonymous namespace
std::error_code DiskStorage::Open(std::string_view path) {
RETURN_ON_ERR(io_mgr_.Open(path));
alloc_.AddStorage(0, io_mgr_.Span());
DCHECK_EQ(ProactorBase::me()->GetKind(), ProactorBase::IOURING);
auto* up = static_cast<UringProactor*>(ProactorBase::me());
if (int io_res = up->RegisterBuffers(absl::GetFlag(FLAGS_registered_buffer_size)); io_res < 0)
return std::error_code{-io_res, std::system_category()};
return {};
}
@ -25,14 +79,15 @@ void DiskStorage::Read(DiskSegment segment, ReadCb cb) {
DCHECK_GT(segment.length, 0u);
DCHECK_EQ(segment.offset % kPageSize, 0u);
// TODO: use registered buffers (UringProactor::RegisterBuffers)
// TODO: Make it error safe, don't leak if cb isn't called
uint8_t* buf = new uint8_t[segment.length];
auto io_cb = [cb, buf, segment](int res) {
cb(std::string_view{reinterpret_cast<char*>(buf), segment.length});
delete[] buf; // because std::function needs to be copyable, unique_ptr can't be used
UringBuf buf = PrepareBuf(segment.length);
auto io_cb = [cb = std::move(cb), buf, segment](int io_res) {
if (io_res < 0)
cb("", std::error_code{-io_res, std::system_category()});
else
cb(std::string_view{reinterpret_cast<char*>(buf.bytes.data()), segment.length}, {});
ReturnBuf(buf);
};
io_mgr_.ReadAsync(segment.offset, {buf, segment.length}, std::move(io_cb));
io_mgr_.ReadAsync(segment.offset, buf, std::move(io_cb));
}
void DiskStorage::MarkAsFree(DiskSegment segment) {
@ -60,16 +115,22 @@ std::error_code DiskStorage::Stash(io::Bytes bytes, StashCb cb) {
return std::make_error_code(std::errc::file_too_large);
}
auto io_cb = [this, cb, offset, len = bytes.size()](int io_res) {
UringBuf buf = PrepareBuf(bytes.size());
memcpy(buf.bytes.data(), bytes.data(), bytes.length());
auto io_cb = [this, cb, offset, buf, len = bytes.size()](int io_res) {
VLOG(0) << "IoRes " << io_res << " " << len;
if (io_res < 0) {
MarkAsFree({size_t(offset), len});
cb({});
cb({}, std::error_code{-io_res, std::system_category()});
} else {
cb({size_t(offset), len});
cb({size_t(offset), len}, {});
}
ReturnBuf(buf);
};
return io_mgr_.WriteAsync(offset, io::View(bytes), std::move(io_cb));
io_mgr_.WriteAsync(offset, buf, std::move(io_cb));
return {};
}
DiskStorage::Stats DiskStorage::GetStats() const {

View file

@ -21,8 +21,8 @@ class DiskStorage {
size_t allocated_bytes = 0;
};
using ReadCb = std::function<void(std::string_view)>;
using StashCb = std::function<void(DiskSegment)>;
using ReadCb = std::function<void(std::string_view, std::error_code)>;
using StashCb = std::function<void(DiskSegment, std::error_code)>;
std::error_code Open(std::string_view path);
void Close();
@ -36,6 +36,7 @@ class DiskStorage {
// Request bytes to be stored, cb will be called with assigned segment on completion. Can block to
// grow backing file. Returns error code if operation failed immediately (most likely it failed
// to grow the backing file) or passes an empty segment if the final write operation failed.
// Bytes are copied and can be dropped before cb is resolved
std::error_code Stash(io::Bytes bytes, StashCb cb);
Stats GetStats() const;

View file

@ -36,7 +36,9 @@ struct DiskStorageTest : public PoolTestBase {
void Stash(size_t index, string value) {
pending_ops_++;
auto buf = make_shared<string>(value);
storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment) {
storage_->Stash(io::Buffer(*buf), [this, index, buf](DiskSegment segment, std::error_code ec) {
EXPECT_FALSE(ec);
EXPECT_GT(segment.length, 0u);
segments_[index] = segment;
pending_ops_--;
});
@ -44,7 +46,8 @@ struct DiskStorageTest : public PoolTestBase {
void Read(size_t index) {
pending_ops_++;
storage_->Read(segments_[index], [this, index](string_view value) {
storage_->Read(segments_[index], [this, index](string_view value, std::error_code ec) {
EXPECT_FALSE(ec);
last_reads_[index] = value;
pending_ops_--;
});

View file

@ -88,92 +88,31 @@ error_code IoMgr::Grow(size_t len) {
}
}
error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
DCHECK_EQ(0u, len % (1 << 20));
if (exchange(grow_progress_, true)) {
return make_error_code(errc::operation_in_progress);
}
Proactor* proactor = (Proactor*)ProactorBase::me();
SubmitEntry entry = proactor->GetSubmitEntry(
[this, len, cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t) {
this->grow_progress_ = false;
sz_ += (res == 0 ? len : 0);
cb(res);
},
0);
entry.PrepFallocate(backing_file_->fd(), 0, sz_, len);
return error_code{};
}
error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
DCHECK(!blob.empty());
VLOG(1) << "WriteAsync " << offset << "/" << blob.size();
Proactor* proactor = (Proactor*)ProactorBase::me();
void IoMgr::WriteAsync(size_t offset, util::fb2::UringBuf buf, WriteCb cb) {
DCHECK(!buf.bytes.empty());
auto* proactor = static_cast<Proactor*>(ProactorBase::me());
auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };
SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
se.PrepWrite(backing_file_->fd(), blob.data(), blob.size(), offset);
return error_code{};
if (buf.buf_idx)
se.PrepWriteFixed(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset,
*buf.buf_idx);
else
se.PrepWrite(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset);
}
error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
DCHECK(!dest.empty());
if (absl::GetFlag(FLAGS_backing_file_direct)) {
size_t read_offs = offset & ~(kPageSize - 1);
size_t end_range = alignup(offset + dest.size(), kPageSize);
size_t space_needed = end_range - read_offs;
DCHECK_EQ(0u, space_needed % kPageSize);
uint8_t* space = (uint8_t*)mi_malloc_aligned(space_needed, kPageSize);
iovec v{.iov_base = space, .iov_len = space_needed};
uint64_t from_ts = ProactorBase::GetMonotonicTimeNs();
error_code ec = backing_file_->Read(&v, 1, read_offs, 0);
uint64_t end_ts = ProactorBase::GetMonotonicTimeNs();
stats_.read_delay_usec += (end_ts - from_ts) / 1000;
++stats_.read_total;
if (ec) {
mi_free(space);
return ec;
}
memcpy(dest.data(), space + offset - read_offs, dest.size());
mi_free_size_aligned(space, space_needed, kPageSize);
return ec;
}
iovec v{.iov_base = dest.data(), .iov_len = dest.size()};
uint64_t from_ts = ProactorBase::GetMonotonicTimeNs();
auto ec = backing_file_->Read(&v, 1, offset, 0);
uint64_t end_ts = ProactorBase::GetMonotonicTimeNs();
stats_.read_delay_usec += (end_ts - from_ts) / 1000;
++stats_.read_total;
return ec;
}
std::error_code IoMgr::ReadAsync(size_t offset, absl::Span<uint8_t> buffer, ReadCb cb) {
DCHECK(!buffer.empty());
VLOG(1) << "Read " << offset << "/" << buffer.size();
Proactor* proactor = (Proactor*)ProactorBase::me();
void IoMgr::ReadAsync(size_t offset, util::fb2::UringBuf buf, ReadCb cb) {
DCHECK(!buf.bytes.empty());
auto* proactor = static_cast<Proactor*>(ProactorBase::me());
auto ring_cb = [cb = std::move(cb)](auto*, Proactor::IoResult res, uint32_t flags) { cb(res); };
SubmitEntry se = proactor->GetSubmitEntry(std::move(ring_cb), 0);
se.PrepRead(backing_file_->fd(), buffer.data(), buffer.size(), offset);
return error_code{};
if (buf.buf_idx)
se.PrepReadFixed(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset, *buf.buf_idx);
else
se.PrepRead(backing_file_->fd(), buf.bytes.data(), buf.bytes.size(), offset);
}
void IoMgr::Shutdown() {

View file

@ -9,6 +9,7 @@
#include "server/common.h"
#include "util/fibers/uring_file.h"
#include "util/fibers/uring_proactor.h"
namespace dfly::tiering {
@ -19,9 +20,6 @@ class IoMgr {
// void(int)>;
using WriteCb = std::function<void(int)>;
// (io_res, )
using GrowCb = std::function<void(int)>;
using ReadCb = std::function<void(int)>;
// blocks until all the pending requests are finished.
@ -32,19 +30,13 @@ class IoMgr {
// Try growing file by that length. Return error if growth failed.
std::error_code Grow(size_t len);
// Grows file by that length. len must be divided by 1MB.
// passing other values will check-fail.
std::error_code GrowAsync(size_t len, GrowCb cb);
// Write into offset from src and call cb once done. The callback is guaranteed to be invoked in
// any error case for cleanup. The src buffer must outlive the call, until cb is resolved.
void WriteAsync(size_t offset, util::fb2::UringBuf src, WriteCb cb);
// Returns error if submission failed. Otherwise - returns the io result
// via cb. A caller must make sure that the blob exists until cb is called.
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
// Read synchronously into dest
std::error_code Read(size_t offset, io::MutableBytes dest);
// Read into dest and call cb once read
std::error_code ReadAsync(size_t offset, io::MutableBytes dest, ReadCb cb);
// Read into dest and call cb once read. The callback is guaranteed to be invoked in any error
// case for cleanup. The dest buffer must outlive the call, until cb is resolved.
void ReadAsync(size_t offset, util::fb2::UringBuf dest, ReadCb cb);
// Total file span
size_t Span() const {

View file

@ -62,13 +62,11 @@ std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) {
auto id = ToOwned(id_ref);
unsigned version = ++pending_stash_ver_[id];
std::shared_ptr<char[]> buf(new char[value.length()]);
memcpy(buf.get(), value.data(), value.length());
io::MutableBytes buf_view{reinterpret_cast<uint8_t*>(buf.get()), value.length()};
return storage_.Stash(buf_view, [this, version, id = std::move(id), buf](DiskSegment segment) {
ProcessStashed(Borrowed(id), version, segment);
});
io::Bytes buf_view{reinterpret_cast<const uint8_t*>(value.data()), value.length()};
auto io_cb = [this, version, id = std::move(id)](DiskSegment segment, std::error_code ec) {
ProcessStashed(Borrowed(id), version, segment, ec);
};
return storage_.Stash(buf_view, std::move(io_cb));
}
OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) {
@ -77,18 +75,20 @@ OpManager::ReadOp& OpManager::PrepareRead(DiskSegment aligned_segment) {
auto [it, inserted] = pending_reads_.try_emplace(aligned_segment.offset, aligned_segment);
if (inserted) {
storage_.Read(aligned_segment, [this, aligned_segment](std::string_view value) {
auto io_cb = [this, aligned_segment](std::string_view value, std::error_code ec) {
ProcessRead(aligned_segment.offset, value);
});
};
storage_.Read(aligned_segment, io_cb);
}
return it->second;
}
void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment) {
void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment,
std::error_code ec) {
if (auto it = pending_stash_ver_.find(ToOwned(id));
it != pending_stash_ver_.end() && it->second == version) {
pending_stash_ver_.erase(it);
ReportStashed(id, segment);
ReportStashed(id, segment, ec);
} else {
storage_.MarkAsFree(segment);
}

View file

@ -48,8 +48,9 @@ class OpManager {
std::error_code Stash(EntryId id, std::string_view value);
protected:
// Report that a stash succeeded and the entry was stored at the provided segment
virtual void ReportStashed(EntryId id, DiskSegment segment) = 0;
// Report that a stash succeeded and the entry was stored at the provided segment or failed with
// given error
virtual void ReportStashed(EntryId id, DiskSegment segment, std::error_code ec) = 0;
// Report that an entry was successfully fetched.
// If modify is set, a modification was executed during the read and the stored value is outdated.
@ -88,7 +89,7 @@ class OpManager {
void ProcessRead(size_t offset, std::string_view value);
// Called once Stash finished
void ProcessStashed(EntryId id, unsigned version, DiskSegment segment);
void ProcessStashed(EntryId id, unsigned version, DiskSegment segment, std::error_code ec);
protected:
DiskStorage storage_;

View file

@ -39,7 +39,8 @@ struct OpManagerTest : PoolTestBase, OpManager {
return future;
}
void ReportStashed(EntryId id, DiskSegment segment) override {
void ReportStashed(EntryId id, DiskSegment segment, std::error_code ec) override {
EXPECT_FALSE(ec);
stashed_[id] = segment;
}

View file

@ -15,22 +15,17 @@ namespace dfly::tiering {
class PoolTestBase : public testing::Test {
protected:
static void SetUpTestSuite();
static void TearDownTestSuite();
void SetUp() override {
pp_.reset(util::fb2::Pool::IOUring(16, 2));
pp_->Run();
}
static std::unique_ptr<util::ProactorPool> pp_;
void TearDown() override {
pp_->Stop();
pp_.reset();
}
std::unique_ptr<util::ProactorPool> pp_;
};
std::unique_ptr<util::ProactorPool> PoolTestBase::pp_ = nullptr;
void PoolTestBase::SetUpTestSuite() {
pp_.reset(util::fb2::Pool::IOUring(16, 2));
pp_->Run();
}
void PoolTestBase::TearDownTestSuite() {
pp_->Stop();
pp_.reset();
}
} // namespace dfly::tiering