More work on tiered storage.

1. Reads from external storage support now o_direct mode.
2. Simplify write unloading logic. Make pending buffer a ring buffer with
   a predefined capacity.
3. Add more tiered stats to info command
This commit is contained in:
Roman Gershman 2022-05-05 12:05:05 +03:00
parent 7f06e223f7
commit 5568205b34
15 changed files with 287 additions and 191 deletions

View file

@ -47,6 +47,8 @@ jobs:
with: with:
context: . context: .
platforms: linux/amd64,linux/arm64 platforms: linux/amd64,linux/arm64
build-args: |
QEMU_CPU='help'
push: ${{ github.event_name != 'pull_request' }} push: ${{ github.event_name != 'pull_request' }}
tags: | tags: |
ghcr.io/${{ github.actor }}/dragonfly-ubuntu:latest ghcr.io/${{ github.actor }}/dragonfly-ubuntu:latest

View file

@ -15,7 +15,6 @@ set(CMAKE_CXX_STANDARD 17)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/helio/cmake" ${CMAKE_MODULE_PATH}) set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/helio/cmake" ${CMAKE_MODULE_PATH})
option(BUILD_SHARED_LIBS "Build shared libraries" OFF) option(BUILD_SHARED_LIBS "Build shared libraries" OFF)
set(Boost_USE_STATIC_LIBS ON)
include(third_party) include(third_party)
include(internal) include(internal)

2
helio

@ -1 +1 @@
Subproject commit a024151f24180d493b51909b6853cfd16ae6367d Subproject commit cd13f66ea2d6b98cfa3f5441fd151f41f9dc6966

View file

@ -19,6 +19,7 @@ thread_local ServerState ServerState::state_;
atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_peak(0);
atomic_uint64_t used_mem_current(0); atomic_uint64_t used_mem_current(0);
unsigned kernel_version = 0; unsigned kernel_version = 0;
size_t max_memory_limit = 0;
ServerState::ServerState() { ServerState::ServerState() {
} }
@ -114,12 +115,12 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes) {
#define ADD(x) (x) += o.x #define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) { TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 24); static_assert(sizeof(TieredStats) == 32);
ADD(external_reads); ADD(external_reads);
ADD(external_writes); ADD(external_writes);
ADD(storage_capacity); ADD(storage_capacity);
ADD(storage_reserved);
return *this; return *this;
} }

View file

@ -75,6 +75,9 @@ struct TieredStats {
size_t external_writes = 0; size_t external_writes = 0;
size_t storage_capacity = 0; size_t storage_capacity = 0;
// how much was reserved by actively stored items.
size_t storage_reserved = 0;
TieredStats& operator+=(const TieredStats&); TieredStats& operator+=(const TieredStats&);
}; };
@ -96,6 +99,7 @@ bool ParseHumanReadableBytes(std::string_view str, int64_t* num_bytes);
// Cached values, updated frequently to represent the correct state of the system. // Cached values, updated frequently to represent the correct state of the system.
extern std::atomic_uint64_t used_mem_peak; extern std::atomic_uint64_t used_mem_peak;
extern std::atomic_uint64_t used_mem_current; extern std::atomic_uint64_t used_mem_current;
extern size_t max_memory_limit;
// version 5.11 maps to 511 etc. // version 5.11 maps to 511 etc.
// set upon server start. // set upon server start.

View file

@ -86,20 +86,32 @@ class PrimeEvictionPolicy {
#define ADD(x) (x) += o.x #define ADD(x) (x) += o.x
PerDbStats& PerDbStats::operator+=(const PerDbStats& o) {
constexpr size_t kDbSz = sizeof(PerDbStats);
static_assert(kDbSz == 56);
ADD(inline_keys);
ADD(obj_memory_usage);
ADD(strval_memory_usage);
ADD(listpack_blob_cnt);
ADD(listpack_bytes);
ADD(external_entries);
ADD(external_size);
return *this;
}
DbStats& DbStats::operator+=(const DbStats& o) { DbStats& DbStats::operator+=(const DbStats& o) {
static_assert(sizeof(DbStats) == 80); constexpr size_t kDbSz = sizeof(DbStats);
static_assert(kDbSz == 96);
PerDbStats::operator+=(o);
ADD(key_count); ADD(key_count);
ADD(expire_count); ADD(expire_count);
ADD(bucket_count); ADD(bucket_count);
ADD(inline_keys);
ADD(obj_memory_usage);
ADD(table_mem_usage); ADD(table_mem_usage);
ADD(small_string_bytes); ADD(small_string_bytes);
ADD(listpack_blob_cnt);
ADD(listpack_bytes);
ADD(external_entries);
return *this; return *this;
} }
@ -150,12 +162,8 @@ auto DbSlice::GetStats() const -> Stats {
s.db.key_count += db->prime_table.size(); s.db.key_count += db->prime_table.size();
s.db.bucket_count += db->prime_table.bucket_count(); s.db.bucket_count += db->prime_table.bucket_count();
s.db.expire_count += db->expire_table.size(); s.db.expire_count += db->expire_table.size();
s.db.obj_memory_usage += db->stats.obj_memory_usage;
s.db.inline_keys += db->stats.inline_keys;
s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage()); s.db.table_mem_usage += (db->prime_table.mem_usage() + db->expire_table.mem_usage());
s.db.listpack_blob_cnt += db->stats.listpack_blob_cnt; s.db += db->stats;
s.db.listpack_bytes += db->stats.listpack_bytes;
s.db.external_entries += db->stats.external_entries;
} }
s.db.small_string_bytes = CompactObj::GetStats().small_string_bytes; s.db.small_string_bytes = CompactObj::GetStats().small_string_bytes;
@ -275,7 +283,11 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<PrimeIterator
} }
// Keep the entry but reset the object. // Keep the entry but reset the object.
db->stats.obj_memory_usage -= existing->second.MallocUsed(); size_t value_heap_size = existing->second.MallocUsed();
db->stats.obj_memory_usage -= value_heap_size;
if (existing->second.ObjType() == OBJ_STRING)
db->stats.obj_memory_usage -= value_heap_size;
existing->second.Reset(); existing->second.Reset();
events_.expired_keys++; events_.expired_keys++;
@ -313,8 +325,12 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
CHECK_EQ(1u, db->mcflag_table.Erase(it->first)); CHECK_EQ(1u, db->mcflag_table.Erase(it->first));
} }
size_t value_heap_size = it->second.MallocUsed();
db->stats.inline_keys -= it->first.IsInline(); db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
if (it->second.ObjType() == OBJ_STRING)
db->stats.obj_memory_usage -= value_heap_size;
db->prime_table.Erase(it); db->prime_table.Erase(it);
return true; return true;
@ -413,7 +429,11 @@ pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr
auto& db = *db_arr_[db_ind]; auto& db = *db_arr_[db_ind];
auto& new_it = res.first; auto& new_it = res.first;
db.stats.obj_memory_usage += obj.MallocUsed(); size_t value_heap_size = obj.MallocUsed();
db.stats.obj_memory_usage += value_heap_size;
if (obj.ObjType() == OBJ_STRING)
db.stats.strval_memory_usage += value_heap_size;
new_it->second = std::move(obj); new_it->second = std::move(obj);
if (expire_at_ms) { if (expire_at_ms) {
@ -514,13 +534,20 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
for (const auto& ccb : change_cb_) { for (const auto& ccb : change_cb_) {
ccb.second(db_ind, ChangeReq{it}); ccb.second(db_ind, ChangeReq{it});
} }
db->stats.obj_memory_usage -= it->second.MallocUsed(); size_t value_heap_size = it->second.MallocUsed();
db->stats.obj_memory_usage -= value_heap_size;
if (it->second.ObjType() == OBJ_STRING)
db->stats.strval_memory_usage -= value_heap_size;
it.SetVersion(NextVersion()); it.SetVersion(NextVersion());
} }
void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it) { void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it) {
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
db->stats.obj_memory_usage += it->second.MallocUsed(); size_t value_heap_size = it->second.MallocUsed();
db->stats.obj_memory_usage += value_heap_size;
if (it->second.ObjType() == OBJ_STRING)
db->stats.strval_memory_usage += value_heap_size;
} }
pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
@ -541,7 +568,10 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind,
db->expire_table.Erase(expire_it); db->expire_table.Erase(expire_it);
db->stats.inline_keys -= it->first.IsInline(); db->stats.inline_keys -= it->first.IsInline();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed()); size_t value_heap_size = it->second.MallocUsed();
db->stats.obj_memory_usage -= (it->first.MallocUsed() + value_heap_size);
if (it->second.ObjType() == OBJ_STRING)
db->stats.strval_memory_usage -= value_heap_size;
db->prime_table.Erase(it); db->prime_table.Erase(it);
++events_.expired_keys; ++events_.expired_keys;

View file

@ -20,7 +20,23 @@ namespace dfly {
using facade::OpResult; using facade::OpResult;
struct DbStats { struct PerDbStats {
// Number of inline keys.
uint64_t inline_keys = 0;
// Object memory usage besides hash-table capacity.
// Applies for any non-inline objects.
size_t obj_memory_usage = 0;
size_t strval_memory_usage = 0;
size_t listpack_blob_cnt = 0;
size_t listpack_bytes = 0;
size_t external_entries = 0;
size_t external_size = 0;
PerDbStats& operator+=(const PerDbStats& o);
};
struct DbStats : public PerDbStats {
// number of active keys. // number of active keys.
size_t key_count = 0; size_t key_count = 0;
@ -30,23 +46,12 @@ struct DbStats {
// number of buckets in dictionary (key capacity) // number of buckets in dictionary (key capacity)
size_t bucket_count = 0; size_t bucket_count = 0;
// Number of inline keys.
size_t inline_keys = 0;
// Object memory usage besides hash-table capacity.
// Applies for any non-inline objects.
size_t obj_memory_usage = 0;
// Memory used by dictionaries. // Memory used by dictionaries.
size_t table_mem_usage = 0; size_t table_mem_usage = 0;
size_t small_string_bytes = 0; size_t small_string_bytes = 0;
size_t listpack_blob_cnt = 0; using PerDbStats::operator+=;
size_t listpack_bytes = 0;
size_t external_entries = 0;
DbStats& operator+=(const DbStats& o); DbStats& operator+=(const DbStats& o);
}; };
@ -65,24 +70,13 @@ class DbSlice {
void operator=(const DbSlice&) = delete; void operator=(const DbSlice&) = delete;
public: public:
using PerDbStats = ::dfly::PerDbStats;
struct Stats { struct Stats {
DbStats db; DbStats db;
SliceEvents events; SliceEvents events;
}; };
struct PerDbStats {
// Number of inline keys.
uint64_t inline_keys = 0;
// Object memory usage besides hash-table capacity.
// Applies for any non-inline objects.
size_t obj_memory_usage = 0;
size_t listpack_blob_cnt = 0;
size_t listpack_bytes = 0;
size_t external_entries = 0;
};
DbSlice(uint32_t index, EngineShard* owner); DbSlice(uint32_t index, EngineShard* owner);
~DbSlice(); ~DbSlice();

View file

@ -89,8 +89,12 @@ int main(int argc, char* argv[]) {
LOG(INFO) << "Found " << HumanReadableNumBytes(available) LOG(INFO) << "Found " << HumanReadableNumBytes(available)
<< " available memory. Setting maxmemory to " << HumanReadableNumBytes(maxmemory); << " available memory. Setting maxmemory to " << HumanReadableNumBytes(maxmemory);
FLAGS_maxmemory = maxmemory; FLAGS_maxmemory = maxmemory;
} else {
LOG(INFO) << "Max memory limit is: " << HumanReadableNumBytes(FLAGS_maxmemory);
} }
dfly::max_memory_limit = FLAGS_maxmemory;
if (FLAGS_use_large_pages) { if (FLAGS_use_large_pages) {
mi_option_enable(mi_option_large_os_pages); mi_option_enable(mi_option_large_os_pages);
} }

View file

@ -5,6 +5,7 @@
#include "server/io_mgr.h" #include "server/io_mgr.h"
#include <fcntl.h> #include <fcntl.h>
#include <mimalloc.h>
#include "base/logging.h" #include "base/logging.h"
#include "facade/facade_types.h" #include "facade/facade_types.h"
@ -21,6 +22,15 @@ using uring::FiberCall;
using uring::Proactor; using uring::Proactor;
namespace this_fiber = ::boost::this_fiber; namespace this_fiber = ::boost::this_fiber;
namespace {
constexpr inline size_t alignup(size_t num, size_t align) {
size_t amask = align - 1;
return (num + amask) & (~amask);
}
} // namespace
IoMgr::IoMgr() { IoMgr::IoMgr() {
flags_val = 0; flags_val = 0;
} }
@ -99,6 +109,27 @@ error_code IoMgr::WriteAsync(size_t offset, string_view blob, WriteCb cb) {
} }
error_code IoMgr::Read(size_t offset, io::MutableBytes dest) { error_code IoMgr::Read(size_t offset, io::MutableBytes dest) {
DCHECK(!dest.empty());
if (FLAGS_backing_file_direct) {
size_t read_offs = offset & ~4095ULL;
size_t end_range = alignup(offset + dest.size(), 4096);
size_t space_needed = end_range - read_offs;
DCHECK_EQ(0u, space_needed % 4096);
uint8_t* space = (uint8_t*)mi_malloc_aligned(space_needed, 4096);
iovec v{.iov_base = space, .iov_len = space_needed};
error_code ec = backing_file_->Read(&v, 1, read_offs, 0);
if (ec) {
mi_free(space);
return ec;
}
memcpy(dest.data(), space + offset - read_offs, dest.size());
mi_free_size_aligned(space, space_needed, 4096);
return ec;
}
iovec v{.iov_base = dest.data(), .iov_len = dest.size()}; iovec v{.iov_base = dest.data(), .iov_len = dest.size()};
return backing_file_->Read(&v, 1, offset, 0); return backing_file_->Read(&v, 1, offset, 0);
} }

View file

@ -36,7 +36,8 @@ class IoMgr {
std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb); std::error_code WriteAsync(size_t offset, std::string_view blob, WriteCb cb);
std::error_code Read(size_t offset, io::MutableBytes dest); std::error_code Read(size_t offset, io::MutableBytes dest);
size_t Size() const { // Total file span
size_t Span() const {
return sz_; return sz_;
} }

View file

@ -515,6 +515,7 @@ tcp_port:)";
append("num_entries:", m.db.key_count); append("num_entries:", m.db.key_count);
append("inline_keys:", m.db.inline_keys); append("inline_keys:", m.db.inline_keys);
append("small_string_bytes:", m.db.small_string_bytes); append("small_string_bytes:", m.db.small_string_bytes);
append("strval_bytes:", m.db.strval_memory_usage);
append("listpack_blobs:", m.db.listpack_blob_cnt); append("listpack_blobs:", m.db.listpack_blob_cnt);
append("listpack_bytes:", m.db.listpack_bytes); append("listpack_bytes:", m.db.listpack_bytes);
} }
@ -540,10 +541,12 @@ tcp_port:)";
} }
if (should_enter("TIERED", true)) { if (should_enter("TIERED", true)) {
ADD_HEADER("# TIERED_STORAGE"); ADD_HEADER("# TIERED");
append("external_entries:", m.db.external_entries); append("external_entries:", m.db.external_entries);
append("external_bytes:", m.db.external_size);
append("external_reads:", m.tiered_stats.external_reads); append("external_reads:", m.tiered_stats.external_reads);
append("external_writes:", m.tiered_stats.external_writes); append("external_writes:", m.tiered_stats.external_writes);
append("external_reserved:", m.tiered_stats.storage_reserved);
append("external_capacity:", m.tiered_stats.storage_capacity); append("external_capacity:", m.tiered_stats.storage_capacity);
} }

View file

@ -136,7 +136,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
EngineShard* shard = db_slice_.shard_owner(); EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) { // external storage enabled. if (shard->tiered_storage()) { // external storage enabled.
if (value.size() >= 64 && value.size() < 2_MB) { if (value.size() >= 64) {
shard->tiered_storage()->UnloadItem(params.db_index, it); shard->tiered_storage()->UnloadItem(params.db_index, it);
} }
} }
@ -869,7 +869,6 @@ OpResult<string> StringFamily::OpGet(const OpArgs& op_args, string_view key) {
auto [offset, size] = pv.GetExternalPtr(); auto [offset, size] = pv.GetExternalPtr();
val.resize(size); val.resize(size);
// TODO: can not work with O_DIRECT
error_code ec = tiered->Read(offset, size, val.data()); error_code ec = tiered->Read(offset, size, val.data());
CHECK(!ec) << "TBD: " << ec; CHECK(!ec) << "TBD: " << ec;
} else { } else {

View file

@ -52,6 +52,7 @@ struct EntryHash {
}; };
struct TieredStorage::ActiveIoRequest { struct TieredStorage::ActiveIoRequest {
size_t file_offset;
char* block_ptr; char* block_ptr;
// entry -> offset // entry -> offset
@ -59,7 +60,7 @@ struct TieredStorage::ActiveIoRequest {
mi_stl_allocator<std::pair<const IndexKey, size_t>>>*/ mi_stl_allocator<std::pair<const IndexKey, size_t>>>*/
absl::flat_hash_map<IndexKey, size_t, EntryHash, std::equal_to<>> entries; absl::flat_hash_map<IndexKey, size_t, EntryHash, std::equal_to<>> entries;
ActiveIoRequest(size_t sz) { ActiveIoRequest(size_t file_offs, size_t sz) : file_offset(file_offs) {
DCHECK_EQ(0u, sz % 4096); DCHECK_EQ(0u, sz % 4096);
block_ptr = (char*)mi_malloc_aligned(sz, 4096); block_ptr = (char*)mi_malloc_aligned(sz, 4096);
DCHECK_EQ(0, intptr_t(block_ptr) % 4096); DCHECK_EQ(0, intptr_t(block_ptr) % 4096);
@ -70,7 +71,7 @@ struct TieredStorage::ActiveIoRequest {
} }
}; };
void TieredStorage::SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req) { void TieredStorage::SendIoRequest(size_t req_size, ActiveIoRequest* req) {
#if 1 #if 1
// static string tmp(4096, 'x'); // static string tmp(4096, 'x');
// string_view sv{tmp}; // string_view sv{tmp};
@ -80,7 +81,7 @@ void TieredStorage::SendIoRequest(size_t offset, size_t req_size, ActiveIoReques
[this] { return num_active_requests_ <= FLAGS_tiered_storage_max_pending_writes; }); [this] { return num_active_requests_ <= FLAGS_tiered_storage_max_pending_writes; });
auto cb = [this, req](int res) { FinishIoRequest(res, req); }; auto cb = [this, req](int res) { FinishIoRequest(res, req); };
io_mgr_.WriteAsync(offset, sv, move(cb)); io_mgr_.WriteAsync(req->file_offset, sv, move(cb));
++stats_.external_writes; ++stats_.external_writes;
#else #else
@ -104,9 +105,18 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
it->second.SetIoPending(false); it->second.SetIoPending(false);
if (success) { if (success) {
auto* stats = db_slice_.MutableStats(ikey.db_indx);
size_t heap_size = it->second.MallocUsed();
size_t item_size = it->second.Size(); size_t item_size = it->second.Size();
stats->obj_memory_usage -= heap_size;
stats->strval_memory_usage -= heap_size;
it->second.SetExternal(k_v.second, item_size); it->second.SetExternal(k_v.second, item_size);
++db_slice_.MutableStats(ikey.db_indx)->external_entries;
stats->external_entries += 1;
stats->external_size += item_size;
} }
} }
delete req; delete req;
@ -118,7 +128,7 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
VLOG_IF(1, num_active_requests_ == 0) << "Finished active requests"; VLOG_IF(1, num_active_requests_ == 0) << "Finished active requests";
} }
TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) { TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice), pending_req_(256) {
} }
TieredStorage::~TieredStorage() { TieredStorage::~TieredStorage() {
@ -129,8 +139,9 @@ TieredStorage::~TieredStorage() {
error_code TieredStorage::Open(const string& path) { error_code TieredStorage::Open(const string& path) {
error_code ec = io_mgr_.Open(path); error_code ec = io_mgr_.Open(path);
if (!ec) { if (!ec) {
if (io_mgr_.Size()) { // Add initial storage. if (io_mgr_.Span()) { // Add initial storage.
alloc_.AddStorage(0, io_mgr_.Size()); alloc_.AddStorage(0, io_mgr_.Span());
stats_.storage_capacity = io_mgr_.Span();
} }
} }
return ec; return ec;
@ -146,6 +157,13 @@ void TieredStorage::Shutdown() {
io_mgr_.Shutdown(); io_mgr_.Shutdown();
} }
bool TieredStorage::ShouldFlush() {
if (num_active_requests_ >= FLAGS_tiered_storage_max_pending_writes)
return false;
return pending_req_.size() > pending_req_.capacity() / 2;
}
error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) { error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
CHECK_EQ(OBJ_STRING, it->second.ObjType()); CHECK_EQ(OBJ_STRING, it->second.ObjType());
@ -153,148 +171,146 @@ error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
error_code ec; error_code ec;
pending_unload_bytes_ += blob_len; pending_unload_bytes_ += blob_len;
if (db_index >= db_arr_.size()) { /*if (db_index >= db_arr_.size()) {
db_arr_.resize(db_index + 1); db_arr_.resize(db_index + 1);
} }
if (db_arr_[db_index] == nullptr) { if (db_arr_[db_index] == nullptr) {
db_arr_[db_index] = new PerDb; db_arr_[db_index] = new PerDb;
}*/
// PerDb* db = db_arr_[db_index];
pending_req_.EmplaceOrOverride(PendingReq{it.bucket_cursor().value(), db_index});
// db->pending_upload[it.bucket_cursor().value()] += blob_len;
// size_t grow_size = 0;
if (ShouldFlush()) {
FlushPending();
} }
PerDb* db = db_arr_[db_index]; // if we reached high utilization of the file range - try to grow the file.
db->pending_upload[it.bucket_cursor().value()] += blob_len; if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
InitiateGrow(1ULL << 28);
size_t grow_size = 0;
if (!io_mgr_.grow_pending() && pending_unload_bytes_ >= ExternalAllocator::kMinBlockSize) {
grow_size = SerializePendingItems();
}
if (grow_size == 0 && alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
grow_size = 1ULL << 28;
}
if (grow_size && !io_mgr_.grow_pending()) {
size_t start = io_mgr_.Size();
auto cb = [start, grow_size, this](int io_res) {
if (io_res == 0) {
alloc_.AddStorage(start, grow_size);
stats_.storage_capacity += grow_size;
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}
};
ec = io_mgr_.GrowAsync(grow_size, move(cb));
} }
return ec; return ec;
} }
size_t TieredStorage::SerializePendingItems() { bool IsObjFitToUnload(const PrimeValue& pv) {
DCHECK(!io_mgr_.grow_pending()); return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending();
};
vector<pair<size_t, uint64_t>> sorted_cursors; void TieredStorage::FlushPending() {
constexpr size_t kArrLen = 64; DCHECK(!io_mgr_.grow_pending() && !pending_req_.empty());
PrimeTable::iterator iters[kArrLen]; vector<pair<DbIndex, uint64_t>> canonic_req;
unsigned iter_count = 0; canonic_req.reserve(pending_req_.size());
bool break_early = false;
auto is_good = [](const PrimeValue& pv) { for (size_t i = 0; i < pending_req_.size(); ++i) {
return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending(); const PendingReq* req = pending_req_.GetItem(i);
}; canonic_req.emplace_back(req->db_indx, req->cursor);
}
auto tr_cb = [&](PrimeTable::iterator it) { pending_req_.ConsumeHead(pending_req_.size());
if (is_good(it->second)) { // remove duplicates and sort.
CHECK_LT(iter_count, kArrLen); {
iters[iter_count++] = it; sort(canonic_req.begin(), canonic_req.end());
} auto it = unique(canonic_req.begin(), canonic_req.end());
}; canonic_req.resize(it - canonic_req.begin());
size_t open_block_size = 0;
size_t file_offset = 0;
size_t block_offset = 0;
ActiveIoRequest* active_req = nullptr;
for (size_t i = 0; i < db_arr_.size(); ++i) {
PerDb* db = db_arr_[i];
if (db == nullptr || db->pending_upload.empty())
continue;
sorted_cursors.resize(db->pending_upload.size());
size_t index = 0;
for (const auto& k_v : db->pending_upload) {
sorted_cursors[index++] = {k_v.second, k_v.first};
}
sort(sorted_cursors.begin(), sorted_cursors.end(), std::greater<>());
DbIndex db_ind = i;
for (const auto& pair : sorted_cursors) {
uint64_t cursor_val = pair.second;
PrimeTable::cursor curs(cursor_val);
db_slice_.GetTables(db_ind).first->Traverse(curs, tr_cb);
for (unsigned j = 0; j < iter_count; ++j) {
PrimeIterator it = iters[j];
size_t item_size = it->second.Size();
DCHECK_GT(item_size, 0u);
if (item_size + block_offset > open_block_size) {
if (open_block_size > 0) { // need to close
// save the block asynchronously.
++submitted_io_writes_;
submitted_io_write_size_ += open_block_size;
SendIoRequest(file_offset, open_block_size, active_req);
open_block_size = 0;
}
if (pending_unload_bytes_ < unsigned(0.8 * ExternalAllocator::kMinBlockSize)) {
break_early = true;
break;
}
DCHECK_EQ(0u, open_block_size);
int64_t res = alloc_.Malloc(item_size);
if (res < 0) {
return -res;
}
file_offset = res;
open_block_size = ExternalAllocator::GoodSize(item_size);
block_offset = 0;
++num_active_requests_;
active_req = new ActiveIoRequest(open_block_size);
}
DCHECK_LE(item_size + block_offset, open_block_size);
it->second.GetString(active_req->block_ptr + block_offset);
DCHECK(!it->second.HasIoPending());
it->second.SetIoPending(true);
IndexKey key(db_ind, it->first.AsRef());
bool added = active_req->entries.emplace(move(key), file_offset + block_offset).second;
CHECK(added);
block_offset += item_size; // saved into opened block.
pending_unload_bytes_ -= item_size;
}
iter_count = 0;
db->pending_upload.erase(cursor_val);
} // sorted_cursors
if (break_early)
break;
DCHECK(db->pending_upload.empty());
} // db_arr
if (open_block_size > 0) {
SendIoRequest(file_offset, open_block_size, active_req);
} }
return 0; // TODO: we could add item size and sort from largest to smallest before
// the aggregation.
constexpr size_t kMaxBatchLen = 64;
PrimeTable::iterator single_batch[kMaxBatchLen];
unsigned batch_len = 0;
auto tr_cb = [&](PrimeTable::iterator it) {
if (IsObjFitToUnload(it->second)) {
CHECK_LT(batch_len, kMaxBatchLen);
single_batch[batch_len++] = it;
}
};
size_t active_batch_size = 0;
size_t batch_offset = 0;
ActiveIoRequest* active_req = nullptr;
for (size_t i = 0; i < canonic_req.size(); ++i) {
DbIndex db_ind = canonic_req[i].first;
uint64_t cursor_val = canonic_req[i].second;
PrimeTable::cursor curs(cursor_val);
db_slice_.GetTables(db_ind).first->Traverse(curs, tr_cb);
for (unsigned j = 0; j < batch_len; ++j) {
PrimeIterator it = single_batch[j];
size_t item_size = it->second.Size();
DCHECK_GT(item_size, 0u);
if (item_size + batch_offset > active_batch_size) {
if (active_batch_size > 0) { // need to close
// save the block asynchronously.
++submitted_io_writes_;
submitted_io_write_size_ += active_batch_size;
SendIoRequest(active_batch_size, active_req);
active_batch_size = 0;
}
DCHECK_EQ(0u, active_batch_size);
int64_t res = alloc_.Malloc(item_size);
if (res < 0) {
InitiateGrow(-res);
return;
}
active_batch_size = ExternalAllocator::GoodSize(item_size);
active_req = new ActiveIoRequest(res, active_batch_size);
stats_.storage_reserved += active_batch_size;
batch_offset = 0;
++num_active_requests_;
}
DCHECK_LE(item_size + batch_offset, active_batch_size);
it->second.GetString(active_req->block_ptr + batch_offset);
DCHECK(!it->second.HasIoPending());
it->second.SetIoPending(true);
IndexKey key(db_ind, it->first.AsRef());
bool added =
active_req->entries.emplace(move(key), active_req->file_offset + batch_offset).second;
CHECK(added);
batch_offset += item_size; // saved into opened block.
}
batch_len = 0;
}
if (active_batch_size > 0) {
SendIoRequest(active_batch_size, active_req);
}
}
void TieredStorage::InitiateGrow(size_t grow_size) {
if (io_mgr_.grow_pending())
return;
DCHECK_GT(grow_size, 0u);
size_t start = io_mgr_.Span();
auto cb = [start, grow_size, this](int io_res) {
if (io_res == 0) {
alloc_.AddStorage(start, grow_size);
stats_.storage_capacity += grow_size;
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}
};
error_code ec = io_mgr_.GrowAsync(grow_size, move(cb));
CHECK(!ec) << "TBD"; // TODO
} }
} // namespace dfly } // namespace dfly

View file

@ -5,6 +5,8 @@
#include <absl/container/flat_hash_map.h> #include <absl/container/flat_hash_map.h>
#include "base/ring_buffer.h"
#include "core/external_alloc.h" #include "core/external_alloc.h"
#include "server/common.h" #include "server/common.h"
#include "server/io_mgr.h" #include "server/io_mgr.h"
@ -36,10 +38,11 @@ class TieredStorage {
private: private:
struct ActiveIoRequest; struct ActiveIoRequest;
// return 0 if everything was sent. bool ShouldFlush();
// if more storage is needed returns requested size in bytes.
size_t SerializePendingItems(); void FlushPending();
void SendIoRequest(size_t offset, size_t req_size, ActiveIoRequest* req); void InitiateGrow(size_t size);
void SendIoRequest(size_t req_size, ActiveIoRequest* req);
void FinishIoRequest(int io_res, ActiveIoRequest* req); void FinishIoRequest(int io_res, ActiveIoRequest* req);
DbSlice& db_slice_; DbSlice& db_slice_;
@ -59,12 +62,21 @@ class TieredStorage {
}; };
struct PerDb { struct PerDb {
// map of cursor -> pending size
absl::flat_hash_map<uint64_t, size_t> pending_upload;
absl::flat_hash_map<PrimeKey, ActiveIoRequest*, Hasher> active_requests; absl::flat_hash_map<PrimeKey, ActiveIoRequest*, Hasher> active_requests;
}; };
std::vector<PerDb*> db_arr_; std::vector<PerDb*> db_arr_;
struct PendingReq {
uint64_t cursor;
DbIndex db_indx = kInvalidDbId;
};
base::RingBuffer<PendingReq> pending_req_;
// map of cursor -> pending size
// absl::flat_hash_map<uint64_t, size_t> pending_upload;
TieredStats stats_; TieredStats stats_;
}; };

View file

@ -6,7 +6,7 @@ COPY src/ ./src/
COPY helio/ ./helio/ COPY helio/ ./helio/
COPY patches/ ./patches/ COPY patches/ ./patches/
COPY CMakeLists.txt ./ COPY CMakeLists.txt ./
RUN ./helio/blaze.sh -release RUN ./helio/blaze.sh -release -DBoost_USE_STATIC_LIBS=ON
WORKDIR build-opt WORKDIR build-opt
RUN ninja dragonfly RUN ninja dragonfly