fix(tiering): Async delete for small bins (#3068)

* fix(tiering): Async delete for small bins

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-05-28 12:08:59 +03:00 committed by GitHub
parent b2213b05d1
commit 68d1a8680c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 74 additions and 47 deletions

View file

@ -1439,7 +1439,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
const PrimeValue& pv = del_it->second;
if (pv.IsExternal() && shard_owner()->tiered_storage()) {
shard_owner()->tiered_storage()->Delete(&del_it->second);
shard_owner()->tiered_storage()->Delete(table->index, &del_it->second);
}
size_t value_heap_size = pv.MallocUsed();

View file

@ -620,7 +620,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, DbSlice::Iterator it,
// If value is external, mark it as deleted
if (prime_value.IsExternal()) {
shard->tiered_storage()->Delete(&prime_value);
shard->tiered_storage()->Delete(op_args_.db_cntx.db_index, &prime_value);
}
// overwrite existing entry.

View file

@ -74,7 +74,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Update memory stats
void SetExternal(OpManager::KeyRef key, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv) {
RecordAdded(db_slice_->MutableStats(0), *pv, segment);
RecordAdded(db_slice_->MutableStats(key.first), *pv, segment);
pv->SetIoPending(false);
pv->SetExternal(segment.offset, segment.length);
@ -104,12 +104,12 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}
// Set value to be an in-memory type again, either empty or with a value. Update memory stats
void SetInMemory(PrimeValue* pv, string_view value, tiering::DiskSegment segment) {
void SetInMemory(PrimeValue* pv, DbIndex dbid, string_view value, tiering::DiskSegment segment) {
pv->Reset();
if (!value.empty())
pv->SetString(value);
RecordDeleted(db_slice_->MutableStats(0), *pv, segment);
RecordDeleted(db_slice_->MutableStats(dbid), *pv, segment);
(value.empty() ? stats_.total_deletes : stats_.total_fetches)++;
}
@ -118,7 +118,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
// Returns false if the value is outdated, true otherwise
bool SetInMemory(OpManager::KeyRef key, string_view value, tiering::DiskSegment segment) {
if (auto pv = Find(key); pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
SetInMemory(pv, value, segment);
SetInMemory(pv, key.first, value, segment);
return true;
}
return false;
@ -133,23 +133,18 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
}
}
void ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool ReportFetched(EntryId id, string_view value, tiering::DiskSegment segment,
bool modified) override {
DCHECK(holds_alternative<OpManager::KeyRef>(id)); // we never issue reads for bins
// Modified values are always cached and deleted from disk
if (!modified && !cache_fetched_)
return;
return false;
SetInMemory(get<OpManager::KeyRef>(id), value, segment);
return true;
}
// Delete value
if (OccupiesWholePages(segment.length)) {
Delete(segment);
} else {
if (auto bin_segment = ts_->bins_->Delete(segment); bin_segment)
Delete(*bin_segment);
}
bool ReportDelete(tiering::DiskSegment segment) override {
return OccupiesWholePages(segment.length) || ts_->bins_->Delete(segment);
}
private:
@ -250,15 +245,11 @@ void TieredStorage::Stash(DbIndex dbid, string_view key, PrimeValue* value) {
visit([this](auto id) { op_manager_->ClearIoPending(id); }, id);
}
}
void TieredStorage::Delete(PrimeValue* value) {
void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) {
DCHECK(value->IsExternal());
tiering::DiskSegment segment = value->GetExternalSlice();
if (OccupiesWholePages(segment.length)) {
op_manager_->Delete(segment);
} else if (auto bin = bins_->Delete(segment); bin) {
op_manager_->Delete(*bin);
}
op_manager_->SetInMemory(value, "", segment);
op_manager_->Delete(segment);
op_manager_->SetInMemory(value, dbid, "", segment);
}
void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) {

View file

@ -59,7 +59,7 @@ class TieredStorage {
void Stash(DbIndex dbid, std::string_view key, PrimeValue* value);
// Delete value, must be offloaded (external type)
void Delete(PrimeValue* value);
void Delete(DbIndex dbid, PrimeValue* value);
// Cancel pending stash for value, must have IO_PENDING flag set
void CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value);

View file

@ -51,6 +51,8 @@ class TieredStorageTest : public BaseFamilyTest {
// Perform simple series of SET, GETSET and GET
TEST_F(TieredStorageTest, SimpleGetSet) {
absl::FlagSaver saver;
absl::SetFlag(&FLAGS_tiered_offload_threshold, 1.1f); // disable offloading
const int kMin = 256;
const int kMax = tiering::kPageSize + 10;
@ -82,6 +84,10 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
auto resp = Run({"GET", absl::StrCat("k", i)});
ASSERT_EQ(resp, string(i, 'B')) << i;
}
metrics = GetMetrics();
EXPECT_EQ(metrics.db_stats[0].tiered_entries, 0);
EXPECT_EQ(metrics.db_stats[0].tiered_used_bytes, 0);
}
TEST_F(TieredStorageTest, MGET) {
@ -123,7 +129,9 @@ TEST_F(TieredStorageTest, MultiDb) {
for (size_t i = 0; i < 10; i++) {
Run({"SELECT", absl::StrCat(i)});
EXPECT_EQ(GetMetrics().db_stats[i].tiered_entries, 1);
EXPECT_EQ(Run({"GET", absl::StrCat("k", i)}), string(3000, char('A' + i)));
EXPECT_EQ(GetMetrics().db_stats[i].tiered_entries, 0);
}
}

View file

@ -11,7 +11,6 @@
#include "io/io.h"
#include "server/tiering/common.h"
#include "server/tiering/disk_storage.h"
#include "util/fibers/future.h"
namespace dfly::tiering {
@ -44,7 +43,7 @@ void OpManager::Close() {
void OpManager::Enqueue(EntryId id, DiskSegment segment, ReadCallback cb) {
// Fill pages for prepared read as it has no penalty and potentially covers more small segments
PrepareRead(segment.FillPages()).ForId(id, segment).callbacks.emplace_back(std::move(cb));
PrepareRead(segment.FillPages()).ForSegment(segment, id).callbacks.emplace_back(std::move(cb));
}
void OpManager::Delete(EntryId id) {
@ -54,19 +53,20 @@ void OpManager::Delete(EntryId id) {
}
void OpManager::Delete(DiskSegment segment) {
DCHECK_EQ(segment.offset % kPageSize, 0u);
if (auto it = pending_reads_.find(segment.offset); it != pending_reads_.end()) {
// If a read is pending, it will be deleted once the read finished
it->second.delete_requested = true;
} else {
// Otherwise, delete it immediately
storage_.MarkAsFree(segment);
EntryOps* pending_op = nullptr;
if (auto it = pending_reads_.find(segment.offset); it != pending_reads_.end())
pending_op = it->second.Find(segment);
if (pending_op) {
pending_op->deleting = true;
} else if (ReportDelete(segment)) {
storage_.MarkAsFree(segment.FillPages());
}
}
std::error_code OpManager::Stash(EntryId id_ref, std::string_view value) {
auto id = ToOwned(id_ref);
unsigned version = ++pending_stash_ver_[id];
unsigned version = pending_stash_ver_[id] = ++pending_stash_counter_;
io::Bytes buf_view{reinterpret_cast<const uint8_t*>(value.data()), value.length()};
auto io_cb = [this, version, id = std::move(id)](DiskSegment segment, std::error_code ec) {
@ -104,6 +104,7 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment
void OpManager::ProcessRead(size_t offset, std::string_view value) {
ReadOp* info = &pending_reads_.at(offset);
bool deleting_full = false;
std::string key_value;
for (auto& ko : info->key_ops) {
key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length);
@ -112,26 +113,40 @@ void OpManager::ProcessRead(size_t offset, std::string_view value) {
for (auto& cb : ko.callbacks)
modified |= cb(&key_value);
// Report item as fetched only after all action were executed, pass whether it was modified
ReportFetched(Borrowed(ko.id), key_value, ko.segment, modified);
// If the item is not being deleted, report is as fetched to be cached potentially.
// In case it's cached, we might need to delete it.
if (!ko.deleting)
ko.deleting |= ReportFetched(Borrowed(ko.id), key_value, ko.segment, modified);
// If the item is being deleted, check if the full page needs to be deleted.
if (ko.deleting)
deleting_full |= ReportDelete(ko.segment);
}
if (info->delete_requested)
if (deleting_full)
storage_.MarkAsFree(info->segment);
pending_reads_.erase(offset);
}
OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segment) {
OpManager::EntryOps& OpManager::ReadOp::ForSegment(DiskSegment key_segment, EntryId id) {
DCHECK_GE(key_segment.offset, segment.offset);
DCHECK_LE(key_segment.length, segment.length);
for (auto& ops : key_ops) {
if (Borrowed(ops.id) == id)
if (ops.segment.offset == key_segment.offset)
return ops;
}
return key_ops.emplace_back(ToOwned(id), key_segment);
}
OpManager::EntryOps* OpManager::ReadOp::Find(DiskSegment key_segment) {
for (auto& ops : key_ops) {
if (ops.segment.offset == key_segment.offset)
return &ops;
}
return nullptr;
}
OpManager::Stats OpManager::GetStats() const {
return {.disk_stats = storage_.GetStats(),
.pending_read_cnt = pending_reads_.size(),

View file

@ -65,20 +65,24 @@ class OpManager {
// given error
virtual void ReportStashed(EntryId id, DiskSegment segment, std::error_code ec) = 0;
// Report that an entry was successfully fetched.
// If modify is set, a modification was executed during the read and the stored value is outdated.
virtual void ReportFetched(EntryId id, std::string_view value, DiskSegment segment,
// Report that an entry was successfully fetched. Includes whether entry was modified.
// Returns true if value needs to be deleted.
virtual bool ReportFetched(EntryId id, std::string_view value, DiskSegment segment,
bool modified) = 0;
// Report delete. Return true if the filled segment needs to be marked as free.
virtual bool ReportDelete(DiskSegment segment) = 0;
protected:
// Describes pending futures for a single entry
struct EntryOps {
EntryOps(OwnedEntryId id, DiskSegment segment) : id{std::move(id)}, segment{segment} {
EntryOps(OwnedEntryId id, DiskSegment segment) : id(std::move(id)), segment(segment) {
}
OwnedEntryId id;
DiskSegment segment;
absl::InlinedVector<ReadCallback, 1> callbacks;
bool deleting = false;
};
// Describes an ongoing read operation for a fixed segment
@ -87,11 +91,13 @@ class OpManager {
}
// Get ops for id or create new
EntryOps& ForId(EntryId id, DiskSegment segment);
EntryOps& ForSegment(DiskSegment segment, EntryId id);
// Find if there are operations for the given segment, return nullptr otherwise
EntryOps* Find(DiskSegment segment);
DiskSegment segment; // spanning segment of whole read
absl::InlinedVector<EntryOps, 1> key_ops; // enqueued operations for different keys
bool delete_requested = false; // whether to delete after reading the segment
};
// Prepare read operation for aligned segment or return pending if it exists.
@ -109,6 +115,7 @@ class OpManager {
absl::flat_hash_map<size_t /* offset */, ReadOp> pending_reads_;
size_t pending_stash_counter_ = 0;
// todo: allow heterogeneous lookups with non owned id
absl::flat_hash_map<OwnedEntryId, unsigned /* version */> pending_stash_ver_;
};

View file

@ -47,9 +47,14 @@ struct OpManagerTest : PoolTestBase, OpManager {
stashed_[id] = segment;
}
void ReportFetched(EntryId id, std::string_view value, DiskSegment segment,
bool ReportFetched(EntryId id, std::string_view value, DiskSegment segment,
bool modified) override {
fetched_[id] = value;
return false;
}
bool ReportDelete(DiskSegment segment) override {
return true;
}
absl::flat_hash_map<EntryId, std::string> fetched_;

View file

@ -333,6 +333,7 @@ class Transaction {
// entry.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands, bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.