From 3a4c36c1f25935835861a61a913fe9c04e6e1ddc Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 6 May 2022 13:36:09 +0300 Subject: [PATCH] Refactor code in tiered storage --- src/server/tiered_storage.cc | 66 ++++++++++++++++++++---------------- src/server/tiered_storage.h | 2 +- 2 files changed, 37 insertions(+), 31 deletions(-) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 18a465936..68534900a 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -14,8 +14,7 @@ extern "C" { #include "server/db_slice.h" #include "util/proactor_base.h" -DEFINE_uint32(tiered_storage_max_pending_writes, 512, - "Maximal number of pending writes per thread"); +DEFINE_uint32(tiered_storage_max_pending_writes, 32, "Maximal number of pending writes per thread"); namespace dfly { using namespace std; @@ -53,6 +52,8 @@ struct EntryHash { struct TieredStorage::ActiveIoRequest { size_t file_offset; + size_t batch_size; + size_t batch_offs; char* block_ptr; // entry -> offset @@ -60,7 +61,8 @@ struct TieredStorage::ActiveIoRequest { mi_stl_allocator>>*/ absl::flat_hash_map> entries; - ActiveIoRequest(size_t file_offs, size_t sz) : file_offset(file_offs) { + ActiveIoRequest(size_t file_offs, size_t sz) + : file_offset(file_offs), batch_size(sz), batch_offs(0) { DCHECK_EQ(0u, sz % 4096); block_ptr = (char*)mi_malloc_aligned(sz, 4096); DCHECK_EQ(0, intptr_t(block_ptr) % 4096); @@ -69,13 +71,31 @@ struct TieredStorage::ActiveIoRequest { ~ActiveIoRequest() { mi_free(block_ptr); } + + bool CanAccomodate(size_t length) const { + return batch_offs + length <= batch_size; + } + + void Serialize(IndexKey ikey, const CompactObj& co); }; -void TieredStorage::SendIoRequest(size_t req_size, ActiveIoRequest* req) { +void TieredStorage::ActiveIoRequest::Serialize(IndexKey ikey, const CompactObj& co) { + DCHECK(!co.HasIoPending()); + + size_t item_size = co.Size(); + DCHECK_LE(item_size + batch_offs, batch_size); + co.GetString(block_ptr + batch_offs); + + bool added = entries.emplace(move(ikey), file_offset + batch_offs).second; + CHECK(added); + batch_offs += item_size; // saved into opened block. +} + +void TieredStorage::SendIoRequest(ActiveIoRequest* req) { #if 1 // static string tmp(4096, 'x'); // string_view sv{tmp}; - string_view sv{req->block_ptr, req_size}; + string_view sv{req->block_ptr, req->batch_size}; active_req_sem_.await( [this] { return num_active_requests_ <= FLAGS_tiered_storage_max_pending_writes; }); @@ -232,8 +252,6 @@ void TieredStorage::FlushPending() { } }; - size_t active_batch_size = 0; - size_t batch_offset = 0; ActiveIoRequest* active_req = nullptr; for (size_t i = 0; i < canonic_req.size(); ++i) { @@ -247,49 +265,37 @@ void TieredStorage::FlushPending() { size_t item_size = it->second.Size(); DCHECK_GT(item_size, 0u); - if (item_size + batch_offset > active_batch_size) { - if (active_batch_size > 0) { // need to close + if (!active_req || !active_req->CanAccomodate(item_size)) { + if (active_req) { // need to close // save the block asynchronously. ++submitted_io_writes_; - submitted_io_write_size_ += active_batch_size; + submitted_io_write_size_ += active_req->batch_size; - SendIoRequest(active_batch_size, active_req); - active_batch_size = 0; + SendIoRequest(active_req); + active_req = nullptr; } - DCHECK_EQ(0u, active_batch_size); int64_t res = alloc_.Malloc(item_size); if (res < 0) { InitiateGrow(-res); return; } - active_batch_size = ExternalAllocator::GoodSize(item_size); - active_req = new ActiveIoRequest(res, active_batch_size); - stats_.storage_reserved += active_batch_size; + size_t batch_size = ExternalAllocator::GoodSize(item_size); + active_req = new ActiveIoRequest(res, batch_size); + stats_.storage_reserved += batch_size; - batch_offset = 0; ++num_active_requests_; } - DCHECK_LE(item_size + batch_offset, active_batch_size); - - it->second.GetString(active_req->block_ptr + batch_offset); - - DCHECK(!it->second.HasIoPending()); + active_req->Serialize(IndexKey{db_ind, it->first.AsRef()}, it->second); it->second.SetIoPending(true); - - IndexKey key(db_ind, it->first.AsRef()); - bool added = - active_req->entries.emplace(move(key), active_req->file_offset + batch_offset).second; - CHECK(added); - batch_offset += item_size; // saved into opened block. } batch_len = 0; } - if (active_batch_size > 0) { - SendIoRequest(active_batch_size, active_req); + if (active_req) { + SendIoRequest(active_req); } } diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 3357772d4..d885f60ed 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -42,7 +42,7 @@ class TieredStorage { void FlushPending(); void InitiateGrow(size_t size); - void SendIoRequest(size_t req_size, ActiveIoRequest* req); + void SendIoRequest(ActiveIoRequest* req); void FinishIoRequest(int io_res, ActiveIoRequest* req); DbSlice& db_slice_;