Refactor code in tiered storage

This commit is contained in:
Roman Gershman 2022-05-06 13:36:09 +03:00
parent 2d6251ca83
commit 3a4c36c1f2
2 changed files with 37 additions and 31 deletions

View file

@ -14,8 +14,7 @@ extern "C" {
#include "server/db_slice.h"
#include "util/proactor_base.h"
DEFINE_uint32(tiered_storage_max_pending_writes, 512,
"Maximal number of pending writes per thread");
DEFINE_uint32(tiered_storage_max_pending_writes, 32, "Maximal number of pending writes per thread");
namespace dfly {
using namespace std;
@ -53,6 +52,8 @@ struct EntryHash {
struct TieredStorage::ActiveIoRequest {
size_t file_offset;
size_t batch_size;
size_t batch_offs;
char* block_ptr;
// entry -> offset
@ -60,7 +61,8 @@ struct TieredStorage::ActiveIoRequest {
mi_stl_allocator<std::pair<const IndexKey, size_t>>>*/
absl::flat_hash_map<IndexKey, size_t, EntryHash, std::equal_to<>> entries;
ActiveIoRequest(size_t file_offs, size_t sz) : file_offset(file_offs) {
ActiveIoRequest(size_t file_offs, size_t sz)
: file_offset(file_offs), batch_size(sz), batch_offs(0) {
DCHECK_EQ(0u, sz % 4096);
block_ptr = (char*)mi_malloc_aligned(sz, 4096);
DCHECK_EQ(0, intptr_t(block_ptr) % 4096);
@ -69,13 +71,31 @@ struct TieredStorage::ActiveIoRequest {
~ActiveIoRequest() {
mi_free(block_ptr);
}
bool CanAccomodate(size_t length) const {
return batch_offs + length <= batch_size;
}
void Serialize(IndexKey ikey, const CompactObj& co);
};
void TieredStorage::SendIoRequest(size_t req_size, ActiveIoRequest* req) {
void TieredStorage::ActiveIoRequest::Serialize(IndexKey ikey, const CompactObj& co) {
DCHECK(!co.HasIoPending());
size_t item_size = co.Size();
DCHECK_LE(item_size + batch_offs, batch_size);
co.GetString(block_ptr + batch_offs);
bool added = entries.emplace(move(ikey), file_offset + batch_offs).second;
CHECK(added);
batch_offs += item_size; // saved into opened block.
}
void TieredStorage::SendIoRequest(ActiveIoRequest* req) {
#if 1
// static string tmp(4096, 'x');
// string_view sv{tmp};
string_view sv{req->block_ptr, req_size};
string_view sv{req->block_ptr, req->batch_size};
active_req_sem_.await(
[this] { return num_active_requests_ <= FLAGS_tiered_storage_max_pending_writes; });
@ -232,8 +252,6 @@ void TieredStorage::FlushPending() {
}
};
size_t active_batch_size = 0;
size_t batch_offset = 0;
ActiveIoRequest* active_req = nullptr;
for (size_t i = 0; i < canonic_req.size(); ++i) {
@ -247,49 +265,37 @@ void TieredStorage::FlushPending() {
size_t item_size = it->second.Size();
DCHECK_GT(item_size, 0u);
if (item_size + batch_offset > active_batch_size) {
if (active_batch_size > 0) { // need to close
if (!active_req || !active_req->CanAccomodate(item_size)) {
if (active_req) { // need to close
// save the block asynchronously.
++submitted_io_writes_;
submitted_io_write_size_ += active_batch_size;
submitted_io_write_size_ += active_req->batch_size;
SendIoRequest(active_batch_size, active_req);
active_batch_size = 0;
SendIoRequest(active_req);
active_req = nullptr;
}
DCHECK_EQ(0u, active_batch_size);
int64_t res = alloc_.Malloc(item_size);
if (res < 0) {
InitiateGrow(-res);
return;
}
active_batch_size = ExternalAllocator::GoodSize(item_size);
active_req = new ActiveIoRequest(res, active_batch_size);
stats_.storage_reserved += active_batch_size;
size_t batch_size = ExternalAllocator::GoodSize(item_size);
active_req = new ActiveIoRequest(res, batch_size);
stats_.storage_reserved += batch_size;
batch_offset = 0;
++num_active_requests_;
}
DCHECK_LE(item_size + batch_offset, active_batch_size);
it->second.GetString(active_req->block_ptr + batch_offset);
DCHECK(!it->second.HasIoPending());
active_req->Serialize(IndexKey{db_ind, it->first.AsRef()}, it->second);
it->second.SetIoPending(true);
IndexKey key(db_ind, it->first.AsRef());
bool added =
active_req->entries.emplace(move(key), active_req->file_offset + batch_offset).second;
CHECK(added);
batch_offset += item_size; // saved into opened block.
}
batch_len = 0;
}
if (active_batch_size > 0) {
SendIoRequest(active_batch_size, active_req);
if (active_req) {
SendIoRequest(active_req);
}
}

View file

@ -42,7 +42,7 @@ class TieredStorage {
void FlushPending();
void InitiateGrow(size_t size);
void SendIoRequest(size_t req_size, ActiveIoRequest* req);
void SendIoRequest(ActiveIoRequest* req);
void FinishIoRequest(int io_res, ActiveIoRequest* req);
DbSlice& db_slice_;