Support offloading of blobs longer than 2KB (#559)

feat: Support offloading of blobs longer than 2KB
This commit is contained in:
Roman Gershman 2022-12-13 04:30:03 +02:00 committed by GitHub
parent 0efa1c92c4
commit 444f7e3f03
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 87 additions and 8 deletions

View file

@ -1,19 +1,36 @@
#!/usr/bin/env python3
import argparse
import random
import random
from array import array
# We print in 64 bit words.
ALIGN = 1 << 10 # 1KB alignment
def print_small_bins():
prev_val = 0
for i in range(64, 1, -1):
val = 4096 // i
val = (val // 16)*16 # make it 16 bytes aligned
if val != prev_val:
print(val, end=', ')
prev_val = val
print()
def main():
parser = argparse.ArgumentParser(description='')
parser.add_argument('-n', type=int, dest='num',
parser.add_argument('-n', type=int, dest='num',
help='number of quadruplets', default=9)
parser.add_argument('-small', action='store_true')
args = parser.parse_args()
if args.small:
print("small")
print_small_bins()
return
size = 512*4
print ('{512, 512*2, 512*3, ', end=' ')
@ -29,4 +46,4 @@ def main():
print('};')
if __name__ == "__main__":
main()
main()

View file

@ -424,12 +424,11 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
if (params.memcache_flags)
db_slice.SetMCFlag(op_args_.db_cntx.db_index, it->first.AsRef(), params.memcache_flags);
if (shard->tiered_storage()) { // external storage enabled.
if (shard->tiered_storage() &&
TieredStorage::EligibleForOffload(value)) { // external storage enabled.
// TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid
// afterwards.
if (value.size() >= kMinTieredLen) {
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
}
shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it);
}
RecordJournal(op_args_, key, it->second);

View file

@ -334,10 +334,13 @@ void TieredStorage::FinishIoRequest(int io_res, ActiveIoRequest* req) {
error_code TieredStorage::UnloadItem(DbIndex db_index, PrimeIterator it) {
CHECK_EQ(OBJ_STRING, it->second.ObjType());
// Relevant only for OBJ_STRING, see CHECK above.
size_t blob_len = it->second.Size();
if (blob_len >= kBatchSize / 2 &&
num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
LOG(FATAL) << "TBD: schedule unload for large values that spawn 1+ 4k pages.";
WriteSingle(db_index, it, blob_len);
return error_code{};
}
if (db_arr_.size() <= db_index) {
@ -371,6 +374,60 @@ bool IsObjFitToUnload(const PrimeValue& pv) {
return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending();
};
void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len) {
DCHECK(!it->second.HasIoPending());
int64_t res = alloc_.Malloc(blob_len);
if (res < 0) {
InitiateGrow(-res);
return;
}
constexpr size_t kMask = kPageAlignment - 1;
size_t page_size = (blob_len + kMask) & (~kMask);
DCHECK_GE(page_size, blob_len);
DCHECK_EQ(0u, page_size % kPageAlignment);
struct SingleRequest {
char* block_ptr = nullptr;
PrimeTable* pt = nullptr;
size_t blob_len = 0;
off_t offset = 0;
string key;
} req;
char* block_ptr = (char*)mi_malloc_aligned(page_size, kPageAlignment);
req.blob_len = blob_len;
req.offset = res;
req.key = it->first.ToString();
req.pt = db_slice_.GetTables(db_index).first;
req.block_ptr = block_ptr;
it->second.GetString(block_ptr);
it->second.SetIoPending(true);
auto cb = [req = std::move(req)](int io_res) {
PrimeIterator it = req.pt->Find(req.key);
CHECK(!it.is_done());
// TODO: what happens when if the entry was deleted meanwhile
// or it has been serialized again?
CHECK(it->second.HasIoPending()) << "TBD: fix inconsistencies";
it->second.SetIoPending(false);
if (io_res < 0) {
LOG(ERROR) << "Error writing to ssd storage " << util::detail::SafeErrorMessage(-io_res);
return;
}
it->second.SetExternal(req.offset, req.blob_len);
mi_free(req.block_ptr);
};
io_mgr_.WriteAsync(res, string_view{block_ptr, page_size}, std::move(cb));
}
void TieredStorage::FlushPending(DbIndex db_index) {
PerDb* db = db_arr_[db_index];

View file

@ -30,6 +30,10 @@ class TieredStorage {
// Schedules unloading of the item, pointed by the iterator.
std::error_code UnloadItem(DbIndex db_index, PrimeIterator it);
static bool EligibleForOffload(std::string_view val) {
return val.size() >= kMinBlobLen;
}
void Free(size_t offset, size_t len);
void Shutdown();
@ -58,6 +62,8 @@ class TieredStorage {
bool ShouldFlush() const;
};
void WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len);
void FlushPending(DbIndex db_index);
void InitiateGrow(size_t size);
void SendIoRequest(ActiveIoRequest* req);