Optional feature (POC) - store values on the external storage (SSD).

1. Add ExternalAllocator that provides files ranges to write to.
2. Add IoMgr that wraps files and allows sending asynchronous requests.
3. Add POC that writes string values when a new entry is added.
   The original values kept are still kept in memory.
This commit is contained in:
Roman Gershman 2022-04-13 22:11:18 +03:00
parent 6e5de7ac59
commit ad3bdbf499
13 changed files with 825 additions and 5 deletions

View file

@ -1,9 +1,10 @@
add_library(dfly_core compact_object.cc dragonfly_core.cc interpreter.cc
add_library(dfly_core compact_object.cc dragonfly_core.cc external_alloc.cc interpreter.cc
segment_allocator.cc small_string.cc tx_queue.cc)
cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua
Boost::fiber crypto)
cxx_test(dfly_core_test dfly_core LABELS DFLY)
cxx_test(compact_object_test dfly_core LABELS DFLY)
cxx_test(external_alloc_test dfly_core LABELS DFLY)
cxx_test(dash_test dfly_core LABELS DFLY)
cxx_test(interpreter_test dfly_core LABELS DFLY)

433
src/core/external_alloc.cc Normal file
View file

@ -0,0 +1,433 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/external_alloc.h"
#include <mimalloc.h>
#include <bitset>
#include <cstring>
#include "base/logging.h"
namespace dfly {
using namespace std;
using detail::PageClass;
using BinIdx = uint8_t;
namespace {
constexpr inline unsigned long long operator""_MB(unsigned long long x) {
return x << 20U;
}
constexpr inline unsigned long long operator""_KB(unsigned long long x) {
return x << 10U;
}
constexpr size_t kMediumObjSize = 1_MB;
constexpr size_t kSmallPageShift = 20;
constexpr size_t kMediumPageShift = 23;
constexpr size_t kSegmentAlignment = 256_MB;
constexpr size_t kSegmentDefaultSize = 256_MB;
constexpr inline size_t wsize_from_size(size_t size) {
return (size + sizeof(uintptr_t) - 1) / sizeof(uintptr_t);
}
// TODO: we may want to round it up to the nearst 512 multiplier so that all the allocated
// blocks will be multipliers of 4kb.
constexpr size_t kBinLens[detail::kNumSizeBins] = {
512, 512, 640, 768, 896, 1024, 1280, 1536, 1792, 2048, 2560, 3072,
3584, 4096, 5120, 6144, 7168, 8192, 10240, 12288, 14336, 16384, 20480, 24576,
28672, 32768, 40960, 49152, 57344, 65536, 81920, 98304, 114688, 131072, UINT64_MAX};
static_assert(kBinLens[detail::kLargeSizeBin] == UINT64_MAX);
constexpr inline BinIdx ToBinIdx(size_t size) {
size_t wsize = wsize_from_size(size);
if (wsize <= 512) {
return 1;
}
if (wsize > kMediumObjSize) {
return detail::kLargeSizeBin;
}
// to correct rounding up of size to words that the last word will be within the range.
--wsize;
// find the highest bit
uint8_t b = 63 - __builtin_clzl(wsize);
return (b << 2) + ((wsize >> (b - 2)) & 3) - 34;
}
static_assert(ToBinIdx(4096) == 1);
static_assert(ToBinIdx(4097) == 2);
static_assert(ToBinIdx(5120) == 2);
static_assert(ToBinIdx(5121) == 3);
static_assert(ToBinIdx(6144) == 3);
static_assert(ToBinIdx(6145) == 4);
PageClass ClassFromBlockSize(size_t sz) {
if (sz <= 128_KB)
return PageClass::SMALL_P;
if (sz <= 1_MB)
return PageClass::MEDIUM_P;
return PageClass::LARGE_P;
}
size_t ToBlockSize(BinIdx idx) {
return kBinLens[idx] * 8;
}
unsigned NumPagesInClass(PageClass pc) {
switch (pc) {
case PageClass::SMALL_P:
return kSegmentDefaultSize >> kSmallPageShift;
case PageClass::MEDIUM_P:
return kSegmentDefaultSize >> kMediumPageShift;
break;
case PageClass::LARGE_P:
DLOG(FATAL) << "TBD";
break;
}
return 0;
}
} // namespace
/*
block 4Kb or more, page - 1MB (256 blocks) or bigger.
Block sizes grow exponentially - by factor ~1.25. See MI_PAGE_QUEUES_EMPTY definition
for sizes example.
*/
namespace detail {
struct Page {
std::bitset<256> free_blocks; // bitmask of free blocks (32 bytes).
uint8_t id; // index inside the Segment.pages array.
// need some mapping function to map from block_size to real_block_size given Page class.
BinIdx block_size_bin;
uint8_t segment_inuse : 1; // true if segment allocated this page.
uint8_t reserved1;
// number of available free blocks. Note: we could get rid of this field and use
// free_blocks.count instead.
uint16_t available;
uint8_t reserved2[2];
Page() {
memset(&id, 0, sizeof(Page) - offsetof(Page, id));
static_assert(sizeof(Page) == 40);
}
void Init(PageClass pc, BinIdx bin_id) {
DCHECK_EQ(available, 0);
DCHECK(segment_inuse);
size_t page_size = 1UL << (pc == PageClass::SMALL_P ? kSmallPageShift : kMediumPageShift);
block_size_bin = bin_id;
available = page_size / ToBlockSize(bin_id);
free_blocks.reset();
for (unsigned i = 0; i < available; ++i) {
free_blocks.set(i, true);
}
}
};
static_assert(sizeof(std::bitset<256>) == 32);
} // namespace detail
//
/**
* SegmentDescr denotes a 256MB segment on external storage -
* holds upto 256 pages (in case of small pages).
* Each segment has pages of the same type, but each page can host blocks of
* differrent sizes upto maximal block size for that page class.
* SegmentDescr points to the range within external storage space.
* By using the page.id together with segment->page_shift and segment->offset
* one can know where the page is located in the storage.
* Opposite direction: by giving an offset to the file, segment_id = offset / 256MB.
* Moreover (offset % 256MB) >> segment.page_shift gives us the page id and subsequently
* page_start. segment.pages[page_id].block_size gives us the block size and that in turn gives us
* block id within the page. We can also know block_size if the originally allocated
size is provided by using round_up function that was used to allocate the block.
* SegmentDescr be aligned by 16KB boundaries - ToSegDescr relies on that.
*/
class ExternalAllocator::SegmentDescr {
SegmentDescr(const SegmentDescr&) = delete;
void operator=(const SegmentDescr&) = delete;
friend class ExternalAllocator;
public:
explicit SegmentDescr(PageClass pc, size_t offs, uint16_t capacity);
Page* FindPageSegment();
Page* GetPage(unsigned i) {
return pages_ + i;
}
size_t BlockOffset(const Page* page, unsigned blockpos) {
return offset_ + page->id * (1 << page_shift_) + ToBlockSize(page->block_size_bin) * blockpos;
}
bool HasFreePages() const {
return capacity_ > used_;
}
unsigned capacity() const {
return capacity_;
}
unsigned used() const {
return used_;
}
unsigned page_shift() const {
return page_shift_;
}
PageClass page_class() const {
return page_class_;
}
SegmentDescr *next, *prev;
private:
uint64_t offset_;
uint16_t capacity_, used_;
PageClass page_class_;
uint8_t page_shift_;
Page pages_[1]; // must be the last field. Can be 1-256 pages.
};
ExternalAllocator::SegmentDescr::SegmentDescr(PageClass pc, size_t offs, uint16_t capacity)
: offset_(offs), capacity_(capacity), used_(0), page_class_(pc), page_shift_(kSmallPageShift) {
next = prev = this;
DCHECK(pc != PageClass::LARGE_P);
if (pc == PageClass::MEDIUM_P)
page_shift_ = kMediumPageShift;
for (unsigned i = 0; i < capacity; ++i) {
pages_[i].id = i;
}
}
auto ExternalAllocator::SegmentDescr::FindPageSegment() -> Page* {
for (uint32_t i = 0; i < capacity_; ++i) {
if (!pages_[i].segment_inuse) {
pages_[i].segment_inuse = 1;
++used_;
return pages_ + i;
}
}
return nullptr;
}
static detail::Page empty_page;
ExternalAllocator::ExternalAllocator() {
std::fill(sq_, sq_ + 3, nullptr);
std::fill(free_pages_, free_pages_ + detail::kNumSizeBins, &empty_page);
}
int64_t ExternalAllocator::Malloc(size_t sz) {
uint8_t bin_idx = ToBinIdx(sz);
Page* page = free_pages_[bin_idx];
if (page->available == 0) { // empty page.
PageClass pc = ClassFromBlockSize(sz);
CHECK_NE(pc, PageClass::LARGE_P) << "not supported, TBD";
size_t seg_size = 0;
page = FindPage(pc, &seg_size);
if (!page)
return -int64_t(seg_size);
page->Init(pc, bin_idx);
free_pages_[bin_idx] = page;
}
DCHECK(page->available);
size_t pos = page->free_blocks._Find_first();
page->free_blocks.flip(pos);
--page->available;
SegmentDescr* seg = ToSegDescr(page);
return seg->BlockOffset(page, pos);
}
void ExternalAllocator::Free(size_t offset, size_t sz) {
size_t idx = offset / 256_MB;
size_t delta = offset % 256_MB;
CHECK_LT(idx, segments_.size());
CHECK(segments_[idx]);
SegmentDescr* seg = segments_[idx];
unsigned page_id = delta >> seg->page_shift();
CHECK_LT(page_id, seg->capacity());
Page* page = seg->GetPage(page_id);
unsigned page_size = (1 << seg->page_shift());
unsigned block_offs = delta % page_size;
unsigned block_size = ToBlockSize(page->block_size_bin);
unsigned block_id = block_offs / block_size;
unsigned blocks_num = page_size / block_size;
CHECK_LE(sz, block_size);
DCHECK_LT(block_id, blocks_num);
DCHECK(!page->free_blocks[block_id]);
page->free_blocks.set(block_id);
++page->available;
DCHECK_EQ(page->available, page->free_blocks.count());
if (page->available == blocks_num) {
FreePage(page, seg, block_size);
}
}
void ExternalAllocator::AddStorage(size_t offset, size_t size) {
CHECK_EQ(256_MB, size);
CHECK_EQ(0u, offset % 256_MB);
size_t idx = offset / 256_MB;
CHECK_LE(segments_.size(), idx);
auto [it, added] = added_segs_.emplace(offset, size);
CHECK(added);
if (it != added_segs_.begin()) {
auto prev = it;
--prev;
CHECK_LE(prev->first + prev->second, offset);
}
auto next = it;
++next;
if (next != added_segs_.end()) {
CHECK_LE(offset + size, next->first);
}
}
size_t ExternalAllocator::GoogSize(size_t sz) {
uint8_t bin_idx = ToBinIdx(sz);
return ToBlockSize(bin_idx);
}
/**
*
_____ _ _ __ _ _
| __ \ (_) | | / _| | | (_)
| |__) | __ ___ ____ _| |_ ___ | |_ _ _ _ __ ___| |_ _ ___ _ __ ___
| ___/ '__| \ \ / / _` | __/ _ \ | _| | | | '_ \ / __| __| |/ _ \| '_ \/ __|
| | | | | |\ V / (_| | || __/ | | | |_| | | | | (__| |_| | (_) | | | \__ \
|_| |_| |_| \_/ \__,_|\__\___| |_| \__,_|_| |_|\___|\__|_|\___/|_| |_|___/
src: https://patorjk.com/software/taag/#f=Big
*/
// private functions
auto ExternalAllocator::FindPage(PageClass pc, size_t* seg_size) -> Page* {
DCHECK_NE(pc, PageClass::LARGE_P);
SegmentDescr* seg = sq_[pc];
if (seg) {
while (true) {
if (seg->HasFreePages()) {
return seg->FindPageSegment();
}
// remove head.
SegmentDescr* next = seg->next;
if (next == seg->prev) {
sq_[pc] = nullptr;
DCHECK(next == seg);
break;
}
sq_[pc] = next;
next->prev = seg->prev;
seg->prev->next = next;
seg->next = seg->prev = seg;
seg = next;
}
}
if (!added_segs_.empty()) {
unsigned num_pages = NumPagesInClass(pc);
auto it = added_segs_.begin();
size_t seg_idx = it->first / kSegmentAlignment;
CHECK_LE(segments_.size(), seg_idx);
segments_.resize(seg_idx + 1);
void* ptr = mi_malloc_aligned(sizeof(SegmentDescr) + (num_pages - 1) * sizeof(Page), 16_KB);
SegmentDescr* seg = new (ptr) SegmentDescr(pc, it->first, num_pages);
segments_[seg_idx] = seg;
added_segs_.erase(it);
DCHECK(sq_[pc] == NULL);
DCHECK(seg->next == seg->prev && seg == seg->next);
sq_[pc] = seg;
return seg->FindPageSegment();
}
*seg_size = kSegmentDefaultSize;
return nullptr;
}
void ExternalAllocator::FreePage(Page* page, SegmentDescr* owner, size_t block_size) {
// page is fully free. Return it to the segment even if it's
// referenced via free_pages_. The allows more elasticity by potentially reassigning
// it to other bin sizes.
BinIdx bidx = ToBinIdx(block_size);
// Remove fast allocation reference.
if (free_pages_[bidx] == page) {
free_pages_[bidx] = &empty_page;
}
page->segment_inuse = 0;
page->available = 0;
if (!owner->HasFreePages()) {
// Segment was fully booked but now it has a free page.
// Add it to the tail of segment queue.
DCHECK(owner->next == owner->prev);
auto& sq = sq_[owner->page_class()];
if (sq == nullptr) {
sq = owner;
} else {
SegmentDescr* last = sq->prev;
last->next = owner;
owner->prev = last;
owner->next = sq;
sq->prev = owner;
}
}
++owner->used_;
}
inline auto ExternalAllocator::ToSegDescr(Page* page) -> SegmentDescr* {
uintptr_t ptr = (uintptr_t)page;
uintptr_t seg_ptr = ptr & ~uintptr_t(16_KB - 1); // align to 16KB boundary.
SegmentDescr* res = reinterpret_cast<SegmentDescr*>(seg_ptr);
DCHECK(res->GetPage(page->id) == page);
return res;
}
} // namespace dfly

93
src/core/external_alloc.h Normal file
View file

@ -0,0 +1,93 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/btree_map.h>
#include <cstddef>
#include <cstdint>
#include <vector>
namespace dfly {
/**
*
* An external allocator inspired by mimalloc. Its goal is to maintain a state machine for
* bookkeeping the allocations of different sizes that are backed up by a separate
* storage. It could be a disk, SSD or another memory allocator. This class serves
* as a state machine that either returns an offset to the backign storage or the indication
* of the resource that is missing. The advantage of such design is that we can use it in
* asynchronous callbacks without blocking on any IO requests.
* The allocator uses dynanic memory internally. Should be used in a single thread.
*
*/
namespace detail {
class Page;
constexpr unsigned kLargeSizeBin = 34;
constexpr unsigned kNumSizeBins = kLargeSizeBin + 1;
/**
* pages classes can be SMALL, MEDIUM or LARGE. SMALL (1MB) for block sizes upto 128KB.
* MEDIUM (8MB) for block sizes upto 1MB. LARGE - blocks larger than 1MB.
*
*/
enum PageClass : uint8_t {
SMALL_P = 0,
MEDIUM_P = 1,
LARGE_P = 2,
};
} // namespace detail
class ExternalAllocator {
ExternalAllocator(const ExternalAllocator&) = delete;
void operator=(const ExternalAllocator&) = delete;
public:
static constexpr size_t kExtAlignment = 1ULL << 28; // 256 MB
ExternalAllocator();
// If a negative result - backing storage is required of size=-result. See AddStorage
// on how to add more storage.
// For results >= 0 Returns offset to the backing storage where we may write the data of
// size sz.
int64_t Malloc(size_t sz);
void Free(size_t offset, size_t sz);
/// Adds backing storage to the allocator.
/// offset must be aligned to kExtAlignment boundaries.
/// It is expected that storage is added in a linear fashion, without skipping ranges.
/// So if [0, 256MB) is added, then next time [256MB, 512MB) is added etc.
void AddStorage(size_t offset, size_t size);
// Similar to mi_good_size, returns the size of the underlying block as if
// were returned by Malloc. Guaranteed that the result not less than sz.
// No allocation is done.
static size_t GoogSize(size_t sz);
private:
class SegmentDescr;
using Page = detail::Page;
Page* FindPage(detail::PageClass sc, size_t* seg_size);
SegmentDescr* GetNewSegment(detail::PageClass sc);
void FreePage(Page* page, SegmentDescr* owner, size_t block_size);
static SegmentDescr* ToSegDescr(Page*);
SegmentDescr* sq_[3]; // map: PageClass -> free Segment.
Page* free_pages_[detail::kNumSizeBins];
// A segment for each 256MB range. To get a segment id from the offset, shift right by 28.
std::vector<SegmentDescr*> segments_;
// weird queue to support AddStorage interface. We can not instantiate segment
// until we know its class and that we know only when a page is demanded.
absl::btree_map<size_t, size_t> added_segs_;
};
} // namespace dfly

View file

@ -0,0 +1,70 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/external_alloc.h"
#include "base/gtest.h"
namespace dfly {
using namespace std;
class ExternalAllocatorTest : public ::testing::Test {
protected:
static void SetUpTestSuite() {
}
static void TearDownTestSuite() {
}
ExternalAllocator ext_alloc_;
};
constexpr int64_t kSegSize = 1 << 28;
TEST_F(ExternalAllocatorTest, Basic) {
int64_t res = ext_alloc_.Malloc(128);
EXPECT_EQ(-kSegSize, res);
ext_alloc_.AddStorage(0, kSegSize);
EXPECT_EQ(0, ext_alloc_.Malloc(4000));
EXPECT_EQ(4096, ext_alloc_.Malloc(4096));
EXPECT_EQ(1048576, ext_alloc_.Malloc(8192)); // another page.
ext_alloc_.Free(1048576, 8192); // should return the page to the segment.
EXPECT_EQ(1048576, ext_alloc_.Malloc(1 << 14)); // another page.
ext_alloc_.Free(0, 4000);
ext_alloc_.Free(4096, 4096);
EXPECT_EQ(0, ext_alloc_.Malloc(4097));
}
TEST_F(ExternalAllocatorTest, Invariants) {
ext_alloc_.AddStorage(0, kSegSize);
std::map<int64_t, size_t> ranges;
int64_t res = 0;
size_t sum = 0;
while (res >= 0) {
for (unsigned j = 1; j < 5; ++j) {
size_t sz = 4000 * j;
res = ext_alloc_.Malloc(sz);
if (res < 0)
break;
auto [it, added] = ranges.emplace(res, sz);
ASSERT_TRUE(added);
sum += sz;
}
}
EXPECT_GT(sum, kSegSize / 2);
off_t last = 0;
for (const auto& k_v : ranges) {
ASSERT_GE(k_v.first, last);
last = k_v.first + k_v.second;
}
}
} // namespace dfly

26
src/core/generate_bin_sizes.py Executable file
View file

@ -0,0 +1,26 @@
#!/usr/bin/env python3
import argparse
import random
from array import array
def main():
parser = argparse.ArgumentParser(description='')
parser.add_argument('-n', type=int, dest='num',
help='number of numbers', default=9)
args = parser.parse_args()
size = 512
print ('{512, ', end=' ')
for i in range(args.num):
incr = size // 4
for j in range(4):
print (f'{size}, ', end=' ')
size += incr
if i % 2 == 1:
print('')
print('};')
if __name__ == "__main__":
main()

View file

@ -3,7 +3,7 @@ cxx_link(dragonfly base dragonfly_lib)
add_library(dragonfly_lib channel_slice.cc command_registry.cc common.cc config_flags.cc
conn_context.cc db_slice.cc debugcmd.cc
engine_shard_set.cc generic_family.cc hset_family.cc
engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
snapshot.cc script_mgr.cc server_family.cc
set_family.cc

View file

@ -159,6 +159,10 @@ class DbSlice {
*/
size_t FlushDb(DbIndex db_ind);
EngineShard* shard_owner() {
return owner_;
}
ShardId shard_id() const {
return shard_id_;
}

View file

@ -10,10 +10,13 @@ extern "C" {
}
#include "base/logging.h"
#include "server/io_mgr.h"
#include "server/transaction.h"
#include "util/fiber_sched_algo.h"
#include "util/varz.h"
DEFINE_string(backing_prefix, "", "");
namespace dfly {
using namespace std;
@ -86,10 +89,17 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
}
EngineShard::~EngineShard() {
queue_.Shutdown();
fiber_q_.join();
sdsfree(tmp_str1);
sdsfree(tmp_str2);
}
void EngineShard::Shutdown() {
queue_.Shutdown();
fiber_q_.join();
if (io_mgr_) {
io_mgr_->Shutdown();
}
if (periodic_task_) {
ProactorBase::me()->CancelPeriodic(periodic_task_);
@ -108,6 +118,14 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(tlh);
if (!FLAGS_backing_prefix.empty()) {
string fn = absl::StrCat(FLAGS_backing_prefix, "-", absl::Dec(pb->GetIndex(), absl::kZeroPad4),
".back");
shard_->io_mgr_.reset(new IoMgr);
error_code ec = shard_->io_mgr_->Open(fn);
CHECK(!ec) << ec.message(); // TODO
}
}
void EngineShard::DestroyThreadLocal() {
@ -116,6 +134,9 @@ void EngineShard::DestroyThreadLocal() {
uint32_t index = shard_->db_slice_.shard_id();
mi_heap_t* tlh = shard_->mi_resource_.heap();
shard_->Shutdown();
shard_->~EngineShard();
mi_free(shard_);
shard_ = nullptr;

View file

@ -13,6 +13,7 @@ extern "C" {
#include <xxhash.h>
#include "base/string_view_sso.h"
#include "core/external_alloc.h"
#include "core/mi_memory_resource.h"
#include "core/tx_queue.h"
#include "server/channel_slice.h"
@ -23,6 +24,8 @@ extern "C" {
namespace dfly {
class IoMgr;
class EngineShard {
public:
struct Stats {
@ -126,12 +129,23 @@ class EngineShard {
// Returns used memory for this shard.
size_t UsedMemory() const;
ExternalAllocator* external_allocator() {
return &ext_alloc_;
}
IoMgr* io_mgr() {
return io_mgr_.get();
}
// for everyone to use for string transformations during atomic cpu sequences.
sds tmp_str1, tmp_str2;
private:
EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);
// blocks the calling fiber.
void Shutdown(); // called before destructing EngineShard.
struct WatchQueue;
void OnTxFinish();
@ -179,6 +193,8 @@ class EngineShard {
uint32_t periodic_task_ = 0;
uint64_t task_iters_ = 0;
std::unique_ptr<IoMgr> io_mgr_;
ExternalAllocator ext_alloc_;
static thread_local EngineShard* shard_;
};

78
src/server/io_mgr.cc Normal file
View file

@ -0,0 +1,78 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/io_mgr.h"
#include <fcntl.h>
#include "base/logging.h"
#include "facade/facade_types.h"
#include "util/uring/proactor.h"
namespace dfly {
using namespace std;
using namespace util;
using namespace facade;
using uring::FiberCall;
using uring::Proactor;
namespace this_fiber = ::boost::this_fiber;
IoMgr::IoMgr() {
flags_val = 0;
}
error_code IoMgr::Open(const string& path) {
CHECK(!backing_file_);
auto res = uring::OpenLinux(path, O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC, 0666);
if (!res)
return res.error();
backing_file_ = move(res.value());
return error_code{};
}
error_code IoMgr::GrowAsync(size_t len, GrowCb cb) {
DCHECK_EQ(0u, len % (1 << 20));
if (flags.grow_progress) {
return make_error_code(errc::operation_in_progress);
}
Proactor* proactor = (Proactor*)ProactorBase::me();
uring::SubmitEntry entry = proactor->GetSubmitEntry(
[this, cb = move(cb)](Proactor::IoResult res, uint32_t , int64_t arg) {
this->flags.grow_progress = 0;
sz_ += (res == 0 ? arg : 0);
cb(res);
}, len);
entry.PrepFallocate(backing_file_->fd(), 0, sz_, len);
flags.grow_progress = 1;
return error_code{};
}
error_code IoMgr::GetBlockAsync(string_view buf, int64_t arg, CbType cb) {
/*uring::Proactor* proactor = (uring::Proactor*)ProactorBase::me();
auto mgr_cb = [cb = move(cb)](uring::Proactor::IoResult res, uint32_t flags, int64_t payload) {
cb(res, 4096);
};
uring::SubmitEntry se = proactor->GetSubmitEntry(move(mgr_cb), 0);
se.PrepWrite(backing_file_->fd(), str_.data(), str_.size(), 4096);
*/
return error_code{};
}
void IoMgr::Shutdown() {
while (flags_val) {
this_fiber::sleep_for(20us); // TODO: hacky for now.
}
}
} // namespace dfly

56
src/server/io_mgr.h Normal file
View file

@ -0,0 +1,56 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <functional>
#include <string>
#include "util/uring/uring_file.h"
namespace dfly {
class IoMgr {
public:
// first arg - io result.
// second arg - an offset to the buffer in the backing file.
using CbType = std::function<void(int, uint64_t)>;
// (io_res, )
using GrowCb = std::function<void(int)>;
IoMgr();
// blocks until all the pending requests are finished.
void Shutdown();
std::error_code Open(const std::string& path);
// Grows file by that length. len must be divided by 1MB.
// passing other values will check-fail.
std::error_code GrowAsync(size_t len, GrowCb cb);
std::error_code Write(size_t offset, std::string_view blob) {
return backing_file_->Write(io::Buffer(blob), offset, 0);
}
// Returns error if submission failed. Otherwise - returns the error code
// via cb. if no error is returned - buf must live until cb is called.
std::error_code GetBlockAsync(std::string_view buf, int64_t arg, CbType cb);
size_t Size() const { return sz_; }
private:
std::unique_ptr<util::uring::LinuxFile> backing_file_;
size_t sz_ = 0;
union {
uint8_t flags_val;
struct {
uint8_t grow_progress : 1;
} flags;
};
};
} // namespace dfly

View file

@ -311,7 +311,7 @@ void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set_.Init(shard_num);
pp_.Await([&](uint32_t index, ProactorBase* pb) {
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
ServerState::tlocal()->Init();
if (index < shard_count()) {

View file

@ -15,6 +15,7 @@ extern "C" {
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/io_mgr.h"
#include "server/transaction.h"
#include "util/varz.h"
@ -78,6 +79,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
}
// overwrite existing entry.
prime_value.SetString(value);
db_slice_->PostUpdate(params.db_index, it);
@ -92,6 +94,26 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
tvalue.SetFlag(params.memcache_flags != 0);
it = db_slice_->AddNew(params.db_index, key, std::move(tvalue), at_ms);
EngineShard* shard = db_slice_->shard_owner();
IoMgr* io_mgr = shard->io_mgr();
if (io_mgr) { // external storage enabled.
ExternalAllocator* ext_alloc = shard->external_allocator();
int64_t res = ext_alloc->Malloc(value.size());
if (res < 0) {
size_t start = io_mgr->Size();
io_mgr->GrowAsync(-res, [start, len = -res, ext_alloc](int io_res) {
if (io_res == 0) {
ext_alloc->AddStorage(start, len);
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}
});
} else {
io_mgr->Write(res, value);
}
}
if (params.memcache_flags)
db_slice_->SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);