feat(server): Redesign and simplify tiered storage module. (#589)

1. Allow offloading blobs larger than 2KB.
2. Totally redesign the offloading algorithm for blobs smaller than 2KB.
3. Fix bugs around IO request cancelations.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-22 03:58:58 +02:00 committed by GitHub
parent b5f6629d55
commit adb3266825
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 422 additions and 371 deletions

View file

@ -89,8 +89,8 @@ class CompactObj {
enum MaskBit {
REF_BIT = 1,
EXPIRE_BIT = 2,
FLAG_BIT = 4,
EXPIRE_BIT = 2, // Mark objects that have expiry timestamp assigned.
FLAG_BIT = 4, // Used to mark keys that have memcache flags assigned.
// ascii encoding is not an injective function. it compresses 8 bytes to 7 but also 7 to 7.
// therefore, in order to know the original length we introduce 2 flags that
@ -266,6 +266,7 @@ class CompactObj {
bool IsExternal() const {
return taglen_ == EXTERNAL_TAG;
}
void SetExternal(size_t offset, size_t sz);
std::pair<size_t, size_t> GetExternalPtr() const;
@ -273,7 +274,7 @@ class CompactObj {
// for that blob. Otherwise returns 0.
size_t MallocUsed() const;
// Resets the object to empty state.
// Resets the object to empty state (string).
void Reset();
bool IsInline() const {
@ -346,9 +347,9 @@ class CompactObj {
//
static_assert(sizeof(u_) == 16, "");
// Maybe it's possible to merge those 2 together and gain another byte
// but lets postpone it to 2023.
mutable uint8_t mask_ = 0;
// We currently reserve 5 bits for tags and 3 bits for extending the mask. currently reserved.
uint8_t taglen_ = 0;
};
@ -362,4 +363,45 @@ inline bool CompactObj::operator==(std::string_view sv) const {
return EqualNonInline(sv);
}
class CompactObjectView {
public:
CompactObjectView(const CompactObj& src) : obj_(src.AsRef()) {
}
CompactObjectView(const CompactObjectView& o) : obj_(o.obj_.AsRef()) {
}
CompactObjectView(CompactObjectView&& o) = default;
operator CompactObj() const {
return obj_.AsRef();
}
const CompactObj* operator->() const {
return &obj_;
}
bool operator==(const CompactObjectView& o) const {
return obj_ == o.obj_;
}
uint64_t Hash() const {
return obj_.HashCode();
}
CompactObjectView& operator=(const CompactObjectView& o) {
obj_ = o.obj_.AsRef();
return *this;
}
bool defined() const {
return obj_.IsRef();
}
void Reset() {
obj_.Reset();
}
private:
CompactObj obj_;
};
} // namespace dfly

View file

@ -10,13 +10,12 @@ ALIGN = 1 << 10 # 1KB alignment
def print_small_bins():
prev_val = 0
for i in range(64, 1, -1):
val = 4096 // i
val = (val // 16)*16 # make it 16 bytes aligned
if val != prev_val:
print(val, end=', ')
prev_val = val
for i in range(56, 1, -1):
len = (4096 - i*8) # reduce by size of hashes
len = (len // 8)*8 # make it 8 bytes aligned
if len != prev_val:
print(i, len)
prev_val = len
print()

View file

@ -193,12 +193,13 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 32);
static_assert(sizeof(TieredStats) == 40);
ADD(external_reads);
ADD(external_writes);
ADD(tiered_reads);
ADD(tiered_writes);
ADD(storage_capacity);
ADD(storage_reserved);
ADD(aborted_offloads);
return *this;
}

View file

@ -91,13 +91,14 @@ struct OpArgs {
};
struct TieredStats {
size_t external_reads = 0;
size_t external_writes = 0;
size_t tiered_reads = 0;
size_t tiered_writes = 0;
size_t storage_capacity = 0;
// how much was reserved by actively stored items.
size_t storage_reserved = 0;
size_t aborted_offloads = 0;
TieredStats& operator+=(const TieredStats&);
};

View file

@ -703,8 +703,11 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
tiered->Free(offset, size);
it->second.Reset();
stats->external_entries -= 1;
stats->external_size -= size;
stats->tiered_entries -= 1;
stats->tiered_size -= size;
} else if (it->second.HasIoPending()) {
TieredStorage* tiered = shard_owner()->tiered_storage();
tiered->CancelIo(db_ind, it);
}
}

View file

@ -1313,12 +1313,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (should_enter("TIERED", true)) {
ADD_HEADER("# TIERED");
append("external_entries", total.external_entries);
append("external_bytes", total.external_size);
append("external_reads", m.tiered_stats.external_reads);
append("external_writes", m.tiered_stats.external_writes);
append("external_reserved", m.tiered_stats.storage_reserved);
append("external_capacity", m.tiered_stats.storage_capacity);
append("tiered_entries", total.tiered_entries);
append("tiered_bytes", total.tiered_size);
append("tiered_reads", m.tiered_stats.tiered_reads);
append("tiered_writes", m.tiered_stats.tiered_writes);
append("tiered_reserved", m.tiered_stats.storage_reserved);
append("tiered_capacity", m.tiered_stats.storage_capacity);
append("tiered_aborted_writes", m.tiered_stats.aborted_offloads);
}
if (should_enter("PERSISTENCE", true)) {

View file

@ -413,7 +413,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
TieredStorage::EligibleForOffload(value)) { // external storage enabled.
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
// afterwards.
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
}
return OpStatus::OK;
@ -458,14 +458,14 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
// overwrite existing entry.
prime_value.SetString(value);
DCHECK(!prime_value.HasIoPending());
if (value.size() >= kMinTieredLen) { // external storage enabled.
// TODO: if UnloadItem can block the calling fiber, then we have the bug because then "it"
// can be invalid after the function returns and the functions that follow may access invalid
// entry.
if (shard->tiered_storage()) {
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
}
}

View file

@ -23,16 +23,15 @@ DbTableStats& DbTableStats::operator+=(const DbTableStats& o) {
ADD(update_value_amount);
ADD(listpack_blob_cnt);
ADD(listpack_bytes);
ADD(external_entries);
ADD(external_size);
ADD(tiered_entries);
ADD(tiered_size);
return *this;
}
DbTable::DbTable(std::pmr::memory_resource* mr)
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr) {
expire(0, detail::ExpireTablePolicy{}, mr), mcflag(0, detail::ExpireTablePolicy{}, mr) {
}
DbTable::~DbTable() {

View file

@ -48,8 +48,8 @@ struct DbTableStats {
ssize_t update_value_amount = 0;
size_t listpack_blob_cnt = 0;
size_t listpack_bytes = 0;
size_t external_entries = 0;
size_t external_size = 0;
size_t tiered_entries = 0;
size_t tiered_size = 0;
DbTableStats& operator+=(const DbTableStats& o);
};

View file

@ -13,6 +13,7 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "util/proactor_base.h"
ABSL_FLAG(uint32_t, tiered_storage_max_pending_writes, 32,
@ -23,213 +24,257 @@ namespace dfly {
using namespace std;
using absl::GetFlag;
string BackingFileName(string_view base, unsigned index) {
constexpr size_t kBlockLen = 4096;
constexpr size_t kBlockAlignment = 4096;
constexpr unsigned kSmallBinLen = 34;
constexpr unsigned kMaxSmallBin = 2032;
constexpr unsigned kSmallBins[kSmallBinLen] = {
72, 80, 88, 96, 104, 112, 120, 128, 136, 144, 152, 160, 168, 176, 184, 192, 200,
216, 232, 248, 264, 280, 304, 328, 360, 400, 440, 504, 576, 672, 808, 1016, 1352, 2040,
};
constexpr unsigned SmallToBin(unsigned len) {
unsigned indx = (len + 7) / 8;
if (indx <= 9)
return 0;
indx -= 9;
if (indx < 18)
return indx;
unsigned rev_indx = (kBlockLen / len) - 1;
indx = kSmallBinLen - rev_indx;
if (kSmallBins[indx] < len)
++indx;
return indx;
}
// Compile-time tests for SmallToBin.
constexpr bool CheckBins() {
for (unsigned i = 64; i <= 2032; ++i) {
unsigned indx = SmallToBin(i);
if (kSmallBins[indx] < i)
return false;
if (indx > 0 && kSmallBins[indx - 1] > i)
return false;
}
for (unsigned j = 0; j < kSmallBinLen; ++j) {
if (SmallToBin(kSmallBins[j]) != j)
return false;
}
return true;
}
static_assert(CheckBins());
static_assert(SmallToBin(kMaxSmallBin) == kSmallBinLen - 1);
constexpr unsigned NumEntriesInSmallBin(unsigned bin_size) {
return kBlockLen / (bin_size + 8); // 8 for the hash value.
}
static_assert(NumEntriesInSmallBin(72) == 51);
static string BackingFileName(string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".ssd");
}
#if 0
struct IndexKey {
DbIndex db_indx;
PrimeKey key;
static size_t ExternalizeEntry(size_t item_offset, DbTableStats* stats, PrimeValue* entry) {
CHECK(entry->HasIoPending());
IndexKey() {
}
entry->SetIoPending(false);
// We define here a weird copy constructor because map uses pair<const PrimeKey,..>
// and "const" prevents moving IndexKey.
IndexKey(const IndexKey& o) : db_indx(o.db_indx), key(o.key.AsRef()) {
}
size_t heap_size = entry->MallocUsed();
size_t item_size = entry->Size();
IndexKey(IndexKey&&) = default;
stats->obj_memory_usage -= heap_size;
if (entry->ObjType() == OBJ_STRING)
stats->strval_memory_usage -= heap_size;
IndexKey(DbIndex i, PrimeKey k) : db_indx(i), key(std::move(k)) {
}
entry->SetExternal(item_offset, item_size);
bool operator==(const IndexKey& ik) const {
return ik.db_indx == db_indx && ik.key == key;
}
stats->tiered_entries += 1;
stats->tiered_size += item_size;
// IndexKey& operator=(IndexKey&&) {}
// IndexKey& operator=(const IndexKey&) =delete;
};
return item_size;
}
struct EntryHash {
size_t operator()(const IndexKey& ik) const {
return ik.key.HashCode() ^ (size_t(ik.db_indx) << 16);
struct PrimeHasher {
size_t operator()(const PrimeKey& o) const {
return o.HashCode();
}
};
#endif
const size_t kBatchSize = 4096;
const size_t kPageAlignment = 4096;
struct TieredStorage::PerDb {
PerDb(const PerDb&) = delete;
PerDb& operator=(const PerDb&) = delete;
PerDb() = default;
// we must support defragmentation of small entries.
// This is similar to in-memory external defragmentation:
// some of the values are deleted but the page is still used.
// In order to allow moving entries to another pages we keep hash id of each
// serialized entry (8 bytes) as a "back reference" to its PrimeTable key.
// DashTable can uniquely determines segment id and the home bucket id based on the hash value
// but at the end it can not identify uniquely the entry based only its hash value
// (problematic use-case: when different keys with the same hash are hosted).
// It's fine because in that case
// we can check each candidate whether it points back to the hosted entry in the page.
// Each 4k batch will contain at most 56 entries (56*64 + 56*8 = 4032).
// Our header will be:
// 1 byte for number of items N (1-56)
// N*8 - hash values of the items
//
// To allow serializing dynamic number of entries we serialize them backwards
// from the end of the page and this is why batch_offs_ starts from kBatchSize.
// we will need maximum 1+56*8=449 bytes for the header.
// constexpr size_t kMaxHeaderSize = 448;
using InflightMap = absl::flat_hash_map<string_view, InflightWriteRequest*>;
class TieredStorage::ActiveIoRequest {
static constexpr unsigned kMaxEntriesCount = 56;
struct BinRecord {
// Those that wait to be serialized. Must be less than NumEntriesInSmallBin for each bin.
absl::flat_hash_set<CompactObjectView, PrimeHasher> pending_entries;
// Entries that were scheduled to write but have not completed yet.
InflightMap enqueued_entries;
};
BinRecord bin_map[kSmallBinLen];
};
class TieredStorage::InflightWriteRequest {
public:
explicit ActiveIoRequest(DbIndex db_index, size_t file_offs)
: db_index_(db_index), file_offset_(file_offs), batch_offs_(kBatchSize) {
block_ptr_ = (char*)mi_malloc_aligned(kBatchSize, kPageAlignment);
DCHECK_EQ(0u, intptr_t(block_ptr_) % kPageAlignment);
InflightWriteRequest(DbIndex db_index, unsigned bin_index, uint32_t page_index);
~InflightWriteRequest();
InflightWriteRequest(const InflightWriteRequest&) = delete;
InflightWriteRequest& operator=(const InflightWriteRequest&) = delete;
void Add(const PrimeKey& pk, const PrimeValue& pv);
// returns how many entries were offloaded.
unsigned ExternalizeEntries(PerDb::BinRecord* bin_record, DbSlice* db_slice);
void Undo(PerDb::BinRecord* bin_record, DbSlice* db_slice);
string_view block() const {
return string_view{block_start_, kBlockLen};
}
~ActiveIoRequest() {
mi_free(block_ptr_);
}
bool CanAccommodate(size_t length) const {
return batch_offs_ >= length + 8 + HeaderLength();
}
void Serialize(PrimeKey pkey, const PrimeValue& co);
void WriteAsync(IoMgr* iomgr, std::function<void(int)> cb);
void Undo(DbSlice* db_slice);
// Returns total number of bytes being offloaded by externalized values.
unsigned ExternalizeEntries(DbSlice* db_slice);
ActiveIoRequest(const ActiveIoRequest&) = delete;
ActiveIoRequest& operator=(const ActiveIoRequest&) = delete;
const auto& entries() const {
return entries_;
}
size_t page_index() const {
return file_offset_ / kBatchSize;
uint32_t page_index() const {
return page_index_;
}
DbIndex db_index() const {
return db_index_;
}
size_t serialized_len() const {
return kBatchSize - batch_offs_;
unsigned bin_index() const {
return bin_index_;
}
const vector<string_view>& entries() const {
return entries_;
}
void SetKeyBlob(size_t len) {
key_blob_.resize(len);
next_key_ = key_blob_.data();
}
private:
size_t HeaderLength() const {
return 1 + 8 * entries_.size();
}
DbIndex db_index_;
uint16_t used_size_ = 0;
size_t file_offset_;
uint32_t bin_index_;
uint32_t page_index_;
size_t batch_offs_;
char* block_ptr_;
char* block_start_;
char* next_key_ = nullptr;
std::vector<char> key_blob_;
uint64_t hash_values_[kMaxEntriesCount];
// key -> offset
absl::flat_hash_map<string, size_t> entries_;
vector<string_view> entries_;
};
void TieredStorage::ActiveIoRequest::Serialize(PrimeKey pkey, const PrimeValue& co) {
DCHECK(!co.HasIoPending());
DCHECK_LT(entries_.size(), ABSL_ARRAYSIZE(hash_values_));
size_t item_size = co.Size();
DCHECK_LE(item_size + HeaderLength(), batch_offs_);
used_size_ += item_size;
batch_offs_ -= item_size; // advance backwards
co.GetString(block_ptr_ + batch_offs_); // serialize the object
string keystr;
pkey.GetString(&keystr);
uint64_t keyhash = CompactObj::HashCode(keystr);
hash_values_[entries_.size()] = keyhash;
bool added = entries_.emplace(std::move(keystr), file_offset_ + batch_offs_).second;
CHECK(added);
TieredStorage::InflightWriteRequest::InflightWriteRequest(DbIndex db_index, unsigned bin_index,
uint32_t page_index)
: db_index_(db_index), bin_index_(bin_index), page_index_(page_index) {
block_start_ = (char*)mi_malloc_aligned(kBlockLen, kBlockAlignment);
DCHECK_EQ(0u, intptr_t(block_start_) % kBlockAlignment);
}
void TieredStorage::ActiveIoRequest::WriteAsync(IoMgr* io_mgr, std::function<void(int)> cb) {
DCHECK_LE(HeaderLength(), batch_offs_);
DCHECK_LE(entries_.size(), kMaxEntriesCount);
block_ptr_[0] = entries_.size();
for (unsigned i = 0; i < entries_.size(); ++i) {
absl::little_endian::Store64(block_ptr_ + 1 + i * 8, hash_values_[i]);
}
string_view sv{block_ptr_, kBatchSize};
io_mgr->WriteAsync(file_offset_, sv, move(cb));
TieredStorage::InflightWriteRequest::~InflightWriteRequest() {
mi_free(block_start_);
}
void TieredStorage::ActiveIoRequest::Undo(DbSlice* db_slice) {
PrimeTable* pt = db_slice->GetTables(db_index_).first;
for (const auto& [pkey, _] : entries_) {
PrimeIterator it = pt->Find(pkey);
void TieredStorage::InflightWriteRequest::Add(const PrimeKey& pk, const PrimeValue& pv) {
DCHECK(!pv.IsExternal());
// TODO: what happens when if the entry was deleted meanwhile
// or it has been serialized again?
CHECK(it->second.HasIoPending()) << "TBD: fix inconsistencies";
unsigned bin_size = kSmallBins[bin_index_];
unsigned max_entries = NumEntriesInSmallBin(bin_size);
it->second.SetIoPending(false);
}
char* next_hash = block_start_ + entries_.size();
char* next_data = block_start_ + max_entries * 8 + entries_.size() * bin_size;
DCHECK_LE(pv.Size(), bin_size);
DCHECK_LE(next_data + bin_size, block_start_ + kBlockLen);
uint64_t hash = pk.HashCode();
absl::little_endian::Store64(next_hash, hash);
pv.GetString(next_data);
size_t key_size = pk.Size();
char* end = key_blob_.data() + key_blob_.size();
DCHECK_LE(next_key_ + key_size, end);
pk.GetString(next_key_);
// preserves the order.
entries_.push_back(string_view{next_key_, key_size});
next_key_ += key_size;
}
unsigned TieredStorage::ActiveIoRequest::ExternalizeEntries(DbSlice* db_slice) {
unsigned TieredStorage::InflightWriteRequest::ExternalizeEntries(PerDb::BinRecord* bin_record,
DbSlice* db_slice) {
PrimeTable* pt = db_slice->GetTables(db_index_).first;
DbTableStats* stats = db_slice->MutableStats(db_index_);
unsigned total_used = 0;
unsigned externalized = 0;
for (const auto& k_v : entries_) {
const auto& pkey = k_v.first;
unsigned bin_size = kSmallBins[bin_index_];
unsigned max_entries = NumEntriesInSmallBin(bin_size);
size_t offset = max_entries * 8;
size_t item_offset = k_v.second;
PrimeIterator it = pt->Find(pkey);
// TODO: the key may be deleted or overriden. The last one is especially dangerous.
// we should update active pending request with any change we make to the entry.
// it should not be a problem since we have HasIoPending tag that mean we must
// update the inflight request (or mark the entry as cancelled).
CHECK(!it.is_done()) << "TBD";
CHECK(it->second.HasIoPending());
it->second.SetIoPending(false);
PrimeValue& pv = it->second;
size_t heap_size = pv.MallocUsed();
size_t item_size = pv.Size();
total_used += item_size;
stats->obj_memory_usage -= heap_size;
if (pv.ObjType() == OBJ_STRING)
stats->strval_memory_usage -= heap_size;
VLOG(2) << "SetExternal: " << pkey << " " << item_offset;
pv.SetExternal(item_offset, item_size);
stats->external_entries += 1;
stats->external_size += item_size;
for (size_t i = 0; i < entries_.size(); ++i) {
string_view pkey = entries_[i];
auto it = bin_record->enqueued_entries.find(pkey);
if (it != bin_record->enqueued_entries.end() && it->second == this) {
++externalized;
}
}
return total_used;
if (externalized <= entries_.size() / 2) {
Undo(bin_record, db_slice);
return 0;
}
for (size_t i = 0; i < entries_.size(); ++i) {
string_view pkey = entries_[i];
auto it = bin_record->enqueued_entries.find(pkey);
if (it != bin_record->enqueued_entries.end() && it->second == this) {
PrimeIterator pit = pt->Find(pkey);
size_t item_offset = page_index_ * 4096 + offset + i * bin_size;
// TODO: the key may be deleted or overriden. The last one is especially dangerous.
// we should update active pending request with any change we make to the entry.
// it should not be a problem since we have HasIoPending tag that mean we must
// update the inflight request (or mark the entry as cancelled).
CHECK(!pit.is_done()) << "TBD";
ExternalizeEntry(item_offset, stats, &pit->second);
bin_record->enqueued_entries.erase(it);
}
}
return externalized;
}
bool TieredStorage::PerDb::ShouldFlush() const {
return bucket_cursors.size() > bucket_cursors.capacity() / 2;
void TieredStorage::InflightWriteRequest::Undo(PerDb::BinRecord* bin_record, DbSlice* db_slice) {
PrimeTable* pt = db_slice->GetTables(db_index_).first;
for (const auto& pkey : entries_) {
auto it = bin_record->enqueued_entries.find(pkey);
if (it != bin_record->enqueued_entries.end() && it->second == this) {
PrimeIterator pit = pt->Find(pkey);
// TODO: what happens when if the entry was deleted meanwhile
// or it has been serialized again?
CHECK(pit->second.HasIoPending()) << "TBD: fix inconsistencies";
pit->second.SetIoPending(false);
bin_record->enqueued_entries.erase(it);
}
}
}
TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) {
@ -254,26 +299,24 @@ error_code TieredStorage::Open(const string& base) {
}
std::error_code TieredStorage::Read(size_t offset, size_t len, char* dest) {
stats_.external_reads++;
stats_.tiered_reads++;
DVLOG(1) << "Read " << offset << " " << len;
return io_mgr_.Read(offset, io::MutableBytes{reinterpret_cast<uint8_t*>(dest), len});
}
void TieredStorage::Free(size_t offset, size_t len) {
if (offset % 4096 == 0) {
if (offset % kBlockLen == 0) {
alloc_.Free(offset, len);
} else {
size_t offs_page = offset / 4096;
auto it = multi_cnt_.find(offs_page);
CHECK(it != multi_cnt_.end()) << offs_page;
MultiBatch& mb = it->second;
CHECK_GE(mb.used, len);
mb.used -= len;
if (mb.used == 0) {
alloc_.Free(offs_page * 4096, ExternalAllocator::kMinBlockSize);
VLOG(1) << "multi_cnt_ erase " << it->first;
multi_cnt_.erase(it);
uint32_t offs_page = offset / kBlockLen;
auto it = page_refcnt_.find(offs_page);
CHECK(it != page_refcnt_.end()) << offs_page;
CHECK_GT(it->second, 0u);
if (--it->second == 0) {
alloc_.Free(offs_page * kBlockLen, kBlockLen);
page_refcnt_.erase(it);
}
}
}
@ -290,56 +333,45 @@ TieredStats TieredStorage::GetStats() const {
return res;
}
void TieredStorage::SendIoRequest(ActiveIoRequest* req) {
#if 1
// static string tmp(4096, 'x');
// string_view sv{tmp};
active_req_sem_.await(
[this] { return num_active_requests_ <= GetFlag(FLAGS_tiered_storage_max_pending_writes); });
auto cb = [this, req](int res) { FinishIoRequest(res, req); };
++num_active_requests_;
req->WriteAsync(&io_mgr_, move(cb));
++stats_.external_writes;
#else
FinishIoRequest(0, req);
#endif
}
void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) {
PerDb* db = db_arr_[req->db_index()];
auto& bin_record = db->bin_map[req->bin_index()];
if (io_res < 0) {
LOG(ERROR) << "Error writing into ssd file: " << util::detail::SafeErrorMessage(-io_res);
req->Undo(&db_slice_);
alloc_.Free(req->page_index() * kBlockLen, kBlockLen);
req->Undo(&bin_record, &db_slice_);
++stats_.aborted_offloads;
} else {
uint16_t used_total = req->ExternalizeEntries(&db_slice_);
// Also removes the entries from bin_record.
uint16_t entries_serialized = req->ExternalizeEntries(&bin_record, &db_slice_);
CHECK_GT(req->entries().size(), 1u); // multi-item batch
MultiBatch mb{used_total};
VLOG(1) << "multi_cnt_ emplace " << req->page_index();
multi_cnt_.emplace(req->page_index(), mb);
if (entries_serialized == 0) { // aborted
++stats_.aborted_offloads;
alloc_.Free(req->page_index() * kBlockLen, kBlockLen);
} else { // succeeded.
VLOG(2) << "page_refcnt emplace " << req->page_index();
auto res = page_refcnt_.emplace(req->page_index(), entries_serialized);
CHECK(res.second);
}
}
delete req;
--num_active_requests_;
if (num_active_requests_ == GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
active_req_sem_.notifyAll();
}
VLOG_IF(1, num_active_requests_ == 0) << "Finished active requests";
VLOG_IF(2, num_active_requests_ == 0) << "Finished active requests";
}
error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
CHECK_EQ(OBJ_STRING, it->second.ObjType());
DCHECK(!it->second.IsExternal());
DCHECK(!it->second.HasIoPending());
// Relevant only for OBJ_STRING, see CHECK above.
size_t blob_len = it->second.Size();
if (blob_len >= kBatchSize / 2 &&
num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
WriteSingle(db_index, it, blob_len);
if (blob_len > kMaxSmallBin) {
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
WriteSingle(db_index, it, blob_len);
} // otherwise skip
return error_code{};
}
@ -352,24 +384,63 @@ error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
}
PerDb* db = db_arr_[db_index];
db->bucket_cursors.EmplaceOrOverride(it.bucket_cursor().value());
// db->pending_upload[it.bucket_cursor().value()] += blob_len;
// size_t grow_size = 0;
if (db->ShouldFlush()) {
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
FlushPending(db_index);
unsigned bin_index = SmallToBin(blob_len);
// if we reached high utilization of the file range - try to grow the file.
if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
InitiateGrow(1ULL << 28);
}
DCHECK_LT(bin_index, kSmallBinLen);
unsigned max_entries = NumEntriesInSmallBin(kSmallBins[bin_index]);
auto& bin_record = db->bin_map[bin_index];
// TODO: we need to track in stats all the cases where we omit offloading attempt.
CHECK_LT(bin_record.pending_entries.size(), max_entries);
bin_record.pending_entries.insert(it->first);
if (bin_record.pending_entries.size() < max_entries)
return error_code{}; // gather more.
bool flush_succeeded = false;
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
flush_succeeded = FlushPending(db_index, bin_index);
// if we reached high utilization of the file range - try to grow the file.
if (alloc_.allocated_bytes() > size_t(alloc_.capacity() * 0.85)) {
InitiateGrow(1ULL << 28);
}
}
if (!flush_succeeded) {
// we could not flush because I/O is saturated, so lets remove the last item.
bin_record.pending_entries.erase(it->first.AsRef());
}
return error_code{};
}
void TieredStorage::CancelIo(DbIndex db_index, PrimeIterator it) {
DCHECK_EQ(OBJ_STRING, it->second.ObjType());
auto& prime_value = it->second;
DCHECK(!prime_value.IsExternal());
DCHECK(prime_value.HasIoPending());
prime_value.SetIoPending(false); // remove io flag.
PerDb* db = db_arr_[db_index];
size_t blob_len = prime_value.Size();
unsigned bin_index = SmallToBin(blob_len);
auto& bin_record = db->bin_map[bin_index];
auto pending_it = bin_record.pending_entries.find(it->first);
if (pending_it != bin_record.pending_entries.end()) {
bin_record.pending_entries.erase(pending_it);
return;
}
string key = it->first.ToString();
CHECK(bin_record.enqueued_entries.erase(key));
}
bool IsObjFitToUnload(const PrimeValue& pv) {
return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending();
};
@ -383,11 +454,11 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
return;
}
constexpr size_t kMask = kPageAlignment - 1;
constexpr size_t kMask = kBlockAlignment - 1;
size_t page_size = (blob_len + kMask) & (~kMask);
DCHECK_GE(page_size, blob_len);
DCHECK_EQ(0u, page_size % kPageAlignment);
DCHECK_EQ(0u, page_size % kBlockAlignment);
struct SingleRequest {
char* block_ptr = nullptr;
@ -397,7 +468,7 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
string key;
} req;
char* block_ptr = (char*)mi_malloc_aligned(page_size, kPageAlignment);
char* block_ptr = (char*)mi_malloc_aligned(page_size, kBlockAlignment);
req.blob_len = blob_len;
req.offset = res;
@ -428,90 +499,58 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
io_mgr_.WriteAsync(res, string_view{block_ptr, page_size}, std::move(cb));
}
void TieredStorage::FlushPending(DbIndex db_index) {
bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
PerDb* db = db_arr_[db_index];
DCHECK(!io_mgr_.grow_pending() && !db->bucket_cursors.empty());
vector<uint64_t> canonic_req;
canonic_req.reserve(db->bucket_cursors.size());
for (size_t i = 0; i < db->bucket_cursors.size(); ++i) {
canonic_req.push_back(*db->bucket_cursors.GetItem(i));
}
db->bucket_cursors.ConsumeHead(canonic_req.size());
// remove duplicates and sort.
{
sort(canonic_req.begin(), canonic_req.end());
auto it = unique(canonic_req.begin(), canonic_req.end());
canonic_req.resize(it - canonic_req.begin());
int64_t res = alloc_.Malloc(kBlockLen);
if (res < 0) {
InitiateGrow(-res);
return false;
}
// TODO: we could add item size and sort from largest to smallest before
// the aggregation.
constexpr size_t kMaxBatchLen = 64;
PrimeTable::iterator single_batch[kMaxBatchLen];
unsigned batch_len = 0;
DCHECK_EQ(res % kBlockLen, 0u);
auto tr_cb = [&](PrimeTable::iterator it) {
if (IsObjFitToUnload(it->second)) {
CHECK_LT(batch_len, kMaxBatchLen);
single_batch[batch_len++] = it;
off64_t file_offset = res;
PrimeTable* pt = db_slice_.GetTables(db_index).first;
auto& bin_record = db->bin_map[bin_index];
DCHECK_EQ(bin_record.pending_entries.size(), NumEntriesInSmallBin(kSmallBins[bin_index]));
DbSlice::Context db_context{db_index, GetCurrentTimeMs()};
DCHECK_LT(bin_record.pending_entries.size(), 60u);
InflightWriteRequest* req = new InflightWriteRequest(db_index, bin_index, res / kBlockLen);
size_t keys_size = 0;
for (auto key_view : bin_record.pending_entries) {
keys_size += key_view->Size();
}
req->SetKeyBlob(keys_size);
for (auto key_view : bin_record.pending_entries) {
PrimeIterator it = pt->Find(key_view);
DCHECK(IsValid(it));
if (it->second.HasExpire()) {
auto [pit, exp_it] = db_slice_.ExpireIfNeeded(db_context, it);
CHECK(!pit.is_done()) << "TBD: should abort in case of expired keys";
}
};
ActiveIoRequest* active_req = nullptr;
req->Add(it->first, it->second);
it->second.SetIoPending(true);
for (size_t i = 0; i < canonic_req.size(); ++i) {
uint64_t cursor_val = canonic_req[i];
PrimeTable::Cursor curs(cursor_val);
db_slice_.GetTables(db_index).first->Traverse(curs, tr_cb);
for (unsigned j = 0; j < batch_len; ++j) {
PrimeIterator it = single_batch[j];
size_t item_size = it->second.Size();
DCHECK_GT(item_size, 0u);
if (!active_req || !active_req->CanAccommodate(item_size)) {
if (active_req) { // need to close
// save the block asynchronously.
++submitted_io_writes_;
submitted_io_write_size_ += kBatchSize;
SendIoRequest(active_req);
active_req = nullptr;
}
int64_t res = alloc_.Malloc(item_size);
if (res < 0) {
InitiateGrow(-res);
return;
}
size_t batch_size = ExternalAllocator::GoodSize(item_size);
DCHECK_EQ(batch_size, ExternalAllocator::GoodSize(batch_size));
active_req = new ActiveIoRequest(db_index, res);
}
active_req->Serialize(it->first.AsRef(), it->second);
it->second.SetIoPending(true);
}
batch_len = 0;
auto res = bin_record.enqueued_entries.emplace(req->entries().back(), req);
CHECK(res.second);
}
// flush or undo the pending request.
if (active_req) {
if (active_req->serialized_len() >= kBatchSize / 2) {
SendIoRequest(active_req);
} else {
// data is too small. rollback active_req.
active_req->Undo(&db_slice_);
// TODO: we could enqueue those back to pending_req.
delete active_req;
}
}
auto cb = [this, req](int io_res) { this->FinishIoRequest(io_res, req); };
++num_active_requests_;
io_mgr_.WriteAsync(file_offset, req->block(), move(cb));
++stats_.tiered_writes;
bin_record.pending_entries.clear();
return true;
}
void TieredStorage::InitiateGrow(size_t grow_size) {

View file

@ -5,7 +5,6 @@
#include <absl/container/flat_hash_map.h>
#include "base/ring_buffer.h"
#include "core/external_alloc.h"
#include "server/common.h"
#include "server/io_mgr.h"
@ -28,7 +27,9 @@ class TieredStorage {
std::error_code Read(size_t offset, size_t len, char* dest);
// Schedules unloading of the item, pointed by the iterator.
std::error_code UnloadItem(DbIndex db_index, PrimeIterator it);
std::error_code ScheduleOffload(DbIndex db_index, PrimeIterator it);
void CancelIo(DbIndex db_index, PrimeIterator it);
static bool EligibleForOffload(std::string_view val) {
return val.size() >= kMinBlobLen;
@ -41,33 +42,15 @@ class TieredStorage {
TieredStats GetStats() const;
private:
class ActiveIoRequest;
struct Hasher {
size_t operator()(const PrimeKey& o) const {
return o.HashCode();
}
};
struct PerDb {
base::RingBuffer<uint64_t> bucket_cursors; // buckets cursors pending for unloading.
absl::flat_hash_map<PrimeKey, ActiveIoRequest*, Hasher> active_requests;
PerDb(const PerDb&) = delete;
PerDb& operator=(const PerDb&) = delete;
PerDb() : bucket_cursors(256) {
}
bool ShouldFlush() const;
};
class InflightWriteRequest;
void WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len);
void FlushPending(DbIndex db_index);
bool FlushPending(DbIndex db_index, unsigned bin_index);
void InitiateGrow(size_t size);
void SendIoRequest(ActiveIoRequest* req);
void FinishIoRequest(int io_res, ActiveIoRequest* req);
void FinishIoRequest(int io_res, InflightWriteRequest* req);
void SetExternal(DbIndex db_index, size_t item_offset, PrimeValue* dest);
DbSlice& db_slice_;
@ -77,30 +60,13 @@ class TieredStorage {
size_t submitted_io_writes_ = 0;
size_t submitted_io_write_size_ = 0;
uint32_t num_active_requests_ = 0;
util::fibers_ext::EventCount active_req_sem_;
struct PerDb;
std::vector<PerDb*> db_arr_;
/*struct PendingReq {
uint64_t cursor;
DbIndex db_indx = kInvalidDbId;
};*/
// map of cursor -> pending size
// absl::flat_hash_map<uint64_t, size_t> pending_upload;
// multi_cnt_ - counts how many unloaded items exists in the batch at specified page offset.
// here multi_cnt_.first is (file_offset in 4k pages) and
// multi_cnt_.second is MultiBatch object storing number of allocated records in the batch
// and its capacity (/ 4k).
struct MultiBatch {
uint16_t used; // number of used bytes
uint16_t reserved; // in 4k pages.
MultiBatch(uint16_t mem_used) : used(mem_used) {
}
};
absl::flat_hash_map<uint32_t, MultiBatch> multi_cnt_;
absl::flat_hash_map<uint32_t, uint8_t> page_refcnt_;
TieredStats stats_;
};