fix(tiering): fix crash when item was deleted before offloaded (#2225)

* fix(teiring): fix crash when item was deleted before offloaded

The bug: On insert key is inserted to bin peinding entries, if this key
was deleted we would check fail on finding this item when tring to flush
bin entries.

The fix: On every insert to bin pending entries erase pending entreis
which were delted or expired

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-12-07 20:38:23 +02:00 committed by GitHub
parent 74cc58dc4c
commit be82c11428
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 279 additions and 97 deletions

View file

@ -69,6 +69,10 @@ void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* s
TieredStorage* tiered = shard->tiered_storage();
tiered->Free(offset, size);
}
if (pv.HasIoPending()) {
TieredStorage* tiered = shard->tiered_storage();
tiered->CancelIo(table->index, del_it);
}
size_t value_heap_size = pv.MallocUsed();
stats.inline_keys -= del_it->first.IsInline();
@ -598,60 +602,56 @@ void DbSlice::FlushSlots(SlotSet slot_ids) {
}).Detach();
}
void DbSlice::FlushDb(DbIndex db_ind) {
void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
// TODO: to add preeemptiveness by yielding inside clear.
DbTableArray flush_db_arr(db_arr_.size());
for (DbIndex index : indexes) {
auto& db = db_arr_[index];
CHECK(db);
InvalidateDbWatches(index);
flush_db_arr[index] = std::move(db);
if (db_ind != kDbAll) {
auto& db = db_arr_[db_ind];
if (db) {
InvalidateDbWatches(db_ind);
CreateDb(index);
db_arr_[index]->trans_locks.swap(flush_db_arr[index]->trans_locks);
if (TieredStorage* tiered = shard_owner()->tiered_storage(); tiered) {
tiered->CancelAllIos(index);
}
}
auto db_ptr = std::move(db);
DCHECK(!db);
CreateDb(db_ind);
db_arr_[db_ind]->trans_locks.swap(db_ptr->trans_locks);
auto cb = [this, db_ptr = std::move(db_ptr)]() mutable {
if (db_ptr->stats.tiered_entries > 0) {
auto cb = [this, flush_db_arr = std::move(flush_db_arr)]() mutable {
for (auto& db_ptr : flush_db_arr) {
if (db_ptr && db_ptr->stats.tiered_entries > 0) {
for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) {
if (it->second.IsExternal()) {
PerformDeletion(it, shard_owner(), db_ptr.get());
}
}
DCHECK_EQ(0u, db_ptr->stats.tiered_entries);
db_ptr.reset();
}
}
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
};
DCHECK_EQ(0u, db_ptr->stats.tiered_entries);
db_ptr.reset();
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
};
fb2::Fiber("flush_db", std::move(cb)).Detach();
fb2::Fiber("flush_dbs", std::move(cb)).Detach();
}
void DbSlice::FlushDb(DbIndex db_ind) {
if (db_ind != kDbAll) {
// Flush a single database if a specific index is provided
FlushDbIndexes({db_ind});
return;
}
for (size_t i = 0; i < db_arr_.size(); i++) {
std::vector<DbIndex> indexes;
indexes.reserve(db_arr_.size());
for (DbIndex i = 0; i < db_arr_.size(); ++i) {
if (db_arr_[i]) {
InvalidateDbWatches(i);
indexes.push_back(i);
}
}
auto all_dbs = std::move(db_arr_);
db_arr_.resize(all_dbs.size());
for (size_t i = 0; i < db_arr_.size(); ++i) {
if (all_dbs[i]) {
CreateDb(i);
db_arr_[i]->trans_locks.swap(all_dbs[i]->trans_locks);
}
}
// Explicitly drop reference counted pointers in place.
// If snapshotting is currently in progress, they will keep alive until it finishes.
for (auto& db : all_dbs)
db.reset();
mi_heap_collect(ServerState::tlocal()->data_heap(), true);
FlushDbIndexes(indexes);
}
void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) {
@ -1203,7 +1203,7 @@ finish:
void DbSlice::CreateDb(DbIndex db_ind) {
auto& db = db_arr_[db_ind];
if (!db) {
db.reset(new DbTable{owner_->memory_resource()});
db.reset(new DbTable{owner_->memory_resource(), db_ind});
}
}

View file

@ -344,6 +344,7 @@ class DbSlice {
bool force_update) noexcept(false);
void FlushSlotsFb(const SlotSet& slot_ids);
void FlushDbIndexes(const std::vector<DbIndex>& indexes);
// Invalidate all watched keys in database. Used on FLUSH.
void InvalidateDbWatches(DbIndex db_indx);

View file

@ -42,10 +42,12 @@ SlotStats& SlotStats::operator+=(const SlotStats& o) {
return *this;
}
DbTable::DbTable(PMR_NS::memory_resource* mr)
DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
expire(0, detail::ExpireTablePolicy{}, mr), mcflag(0, detail::ExpireTablePolicy{}, mr),
top_keys({.enabled = absl::GetFlag(FLAGS_enable_top_keys_tracking)}) {
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr),
top_keys({.enabled = absl::GetFlag(FLAGS_enable_top_keys_tracking)}),
index(db_index) {
if (ClusterConfig::IsEnabled()) {
slots_stats.resize(ClusterConfig::kMaxSlotNum + 1);
}

View file

@ -82,8 +82,9 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
ExpireTable::Cursor expire_cursor;
TopKeys top_keys;
DbIndex index;
explicit DbTable(PMR_NS::memory_resource* mr);
explicit DbTable(PMR_NS::memory_resource* mr, DbIndex index);
~DbTable();
void Clear();

View file

@ -10,6 +10,7 @@ extern "C" {
#include <mimalloc.h>
#include "absl/cleanup/cleanup.h"
#include "base/flags.h"
#include "base/logging.h"
#include "server/db_slice.h"
@ -106,10 +107,28 @@ struct PrimeHasher {
}
};
struct SingleRequest {
SingleRequest(size_t blob_len, int64 offset, string key)
: blob_len(blob_len), offset(offset), key(std::move(key)) {
constexpr size_t kMask = kBlockAlignment - 1;
page_size = (blob_len + kMask) & (~kMask);
DCHECK_GE(page_size, blob_len);
DCHECK_EQ(0u, page_size % kBlockAlignment);
block_ptr = (char*)mi_malloc_aligned(page_size, kBlockAlignment);
}
char* block_ptr;
size_t blob_len;
size_t page_size;
off_t offset;
string key;
bool cancel = false;
};
struct TieredStorage::PerDb {
PerDb(const PerDb&) = delete;
PerDb& operator=(const PerDb&) = delete;
PerDb() = default;
void CancelAll();
using InflightMap = absl::flat_hash_map<string_view, InflightWriteRequest*>;
@ -120,10 +139,26 @@ struct TieredStorage::PerDb {
// Entries that were scheduled to write but have not completed yet.
InflightMap enqueued_entries;
};
// Big bin entries that were scheduled to write but have not completed yet.
absl::flat_hash_map<string_view, SingleRequest*> bigbin_enqueued_entries;
BinRecord bin_map[kSmallBinLen];
};
void TieredStorage::PerDb::CancelAll() {
for (size_t i = 0; i < kSmallBinLen; ++i) {
bin_map[i].pending_entries.clear();
// It is safe to clear enqueued_entries, because when we will finish writing to disk
// InflightWriteRequest::ExternalizeEntries will be executed and it will undo the externalize of
// the entries and free the allocated page.
bin_map[i].enqueued_entries.clear();
}
for (auto& req : bigbin_enqueued_entries) {
req.second->cancel = true;
}
bigbin_enqueued_entries.clear();
}
class TieredStorage::InflightWriteRequest {
public:
InflightWriteRequest(DbIndex db_index, unsigned bin_index, uint32_t page_index);
@ -251,6 +286,7 @@ unsigned TieredStorage::InflightWriteRequest::ExternalizeEntries(PerDb::BinRecor
CHECK(!pit.is_done()) << "TBD";
ExternalizeEntry(item_offset, stats, &pit->second);
VLOG(2) << "ExternalizeEntry: " << it->first;
bin_record->enqueued_entries.erase(it);
}
}
@ -268,7 +304,7 @@ void TieredStorage::InflightWriteRequest::Undo(PerDb::BinRecord* bin_record, DbS
// TODO: what happens when if the entry was deleted meanwhile
// or it has been serialized again?
CHECK(pit->second.HasIoPending()) << "TBD: fix inconsistencies";
VLOG(2) << "Undo key:" << pkey;
pit->second.SetIoPending(false);
bin_record->enqueued_entries.erase(it);
@ -312,7 +348,6 @@ void TieredStorage::Free(size_t offset, size_t len) {
auto it = page_refcnt_.find(offs_page);
CHECK(it != page_refcnt_.end()) << offs_page;
CHECK_GT(it->second, 0u);
if (--it->second == 0) {
alloc_.Free(offs_page * kBlockLen, kBlockLen);
page_refcnt_.erase(it);
@ -353,7 +388,6 @@ void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) {
CHECK(res.second);
}
}
delete req;
--num_active_requests_;
VLOG_IF(2, num_active_requests_ == 0) << "Finished active requests";
@ -367,21 +401,22 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
// Relevant only for OBJ_STRING, see CHECK above.
size_t blob_len = it->second.Size();
if (blob_len > kMaxSmallBin) {
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
WriteSingle(db_index, it, blob_len);
} // otherwise skip
return error_code{};
}
if (db_arr_.size() <= db_index) {
db_arr_.resize(db_index + 1);
}
if (db_arr_[db_index] == nullptr) {
db_arr_[db_index] = new PerDb;
}
if (blob_len > kMaxSmallBin) {
if (num_active_requests_ < GetFlag(FLAGS_tiered_storage_max_pending_writes)) {
WriteSingle(db_index, it, blob_len);
} else {
VLOG(2) << "Skip WriteSingle for: " << it->first.ToString();
}
return error_code{};
}
PerDb* db = db_arr_[db_index];
unsigned bin_index = SmallToBin(blob_len);
@ -394,7 +429,9 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
// TODO: we need to track in stats all the cases where we omit offloading attempt.
CHECK_LT(bin_record.pending_entries.size(), max_entries);
VLOG(2) << "ScheduleOffload:" << it->first.ToString();
bin_record.pending_entries.insert(it->first);
it->second.SetIoPending(true);
if (bin_record.pending_entries.size() < max_entries)
return error_code{}; // gather more.
@ -410,8 +447,10 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
}
if (!flush_succeeded) {
VLOG(2) << "flush failed remove entry: " << it->first.ToString();
// we could not flush because I/O is saturated, so lets remove the last item.
bin_record.pending_entries.erase(it->first.AsRef());
it->second.SetIoPending(false);
++stats_.flush_skip_cnt;
}
@ -420,32 +459,58 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
void TieredStorage::CancelIo(DbIndex db_index, PrimeIterator it) {
DCHECK_EQ(OBJ_STRING, it->second.ObjType());
VLOG(2) << "CancelIo: " << it->first.ToString();
auto& prime_value = it->second;
DCHECK(!prime_value.IsExternal());
DCHECK(prime_value.HasIoPending());
prime_value.SetIoPending(false); // remove io flag.
PerDb* db = db_arr_[db_index];
size_t blob_len = prime_value.Size();
PerDb* db = db_arr_[db_index];
if (blob_len > kMaxSmallBin) {
string key = it->first.ToString();
auto& enqueued_entries = db->bigbin_enqueued_entries;
auto entry_it = enqueued_entries.find(key);
CHECK(entry_it != enqueued_entries.end());
entry_it->second->cancel = true;
CHECK(enqueued_entries.erase(key));
return;
}
unsigned bin_index = SmallToBin(blob_len);
auto& bin_record = db->bin_map[bin_index];
auto pending_it = bin_record.pending_entries.find(it->first);
if (pending_it != bin_record.pending_entries.end()) {
VLOG(2) << "CancelIo from pending: " << it->first.ToString();
bin_record.pending_entries.erase(pending_it);
return;
}
string key = it->first.ToString();
VLOG(2) << "CancelIo from enqueue: " << key;
CHECK(bin_record.enqueued_entries.erase(key));
}
void TieredStorage::CancelAllIos(DbIndex db_index) {
VLOG(2) << "CancelAllIos " << db_index;
if (db_index >= db_arr_.size()) {
return;
}
PerDb* db = db_arr_[db_index];
if (db) {
VLOG(2) << "Clear db " << db_index;
db->CancelAll();
}
}
bool IsObjFitToUnload(const PrimeValue& pv) {
return pv.ObjType() == OBJ_STRING && !pv.IsExternal() && pv.Size() >= 64 && !pv.HasIoPending();
};
void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_len) {
VLOG(2) << "WriteSingle " << blob_len;
DCHECK(!it->second.HasIoPending());
int64_t res = alloc_.Malloc(blob_len);
@ -454,55 +519,61 @@ void TieredStorage::WriteSingle(DbIndex db_index, PrimeIterator it, size_t blob_
return;
}
constexpr size_t kMask = kBlockAlignment - 1;
size_t page_size = (blob_len + kMask) & (~kMask);
SingleRequest* req = new SingleRequest(blob_len, res, it->first.ToString());
DCHECK_GE(page_size, blob_len);
DCHECK_EQ(0u, page_size % kBlockAlignment);
auto& enqueued_entries = db_arr_[db_index]->bigbin_enqueued_entries;
auto emplace_res = enqueued_entries.emplace(req->key, req);
CHECK(emplace_res.second);
struct SingleRequest {
char* block_ptr = nullptr;
PrimeTable* pt = nullptr;
size_t blob_len = 0;
off_t offset = 0;
string key;
} req;
char* block_ptr = (char*)mi_malloc_aligned(page_size, kBlockAlignment);
req.blob_len = blob_len;
req.offset = res;
req.key = it->first.ToString();
req.pt = db_slice_.GetTables(db_index).first;
req.block_ptr = block_ptr;
it->second.GetString(block_ptr);
it->second.GetString(req->block_ptr);
it->second.SetIoPending(true);
auto cb = [req = std::move(req)](int io_res) {
PrimeIterator it = req.pt->Find(req.key);
CHECK(!it.is_done());
auto cb = [this, req, db_index](int io_res) {
PrimeTable* pt = db_slice_.GetTables(db_index).first;
// TODO: what happens when if the entry was deleted meanwhile
// or it has been serialized again?
CHECK(it->second.HasIoPending()) << "TBD: fix inconsistencies";
it->second.SetIoPending(false);
absl::Cleanup cleanup = [this, req]() {
mi_free(req->block_ptr);
delete req;
--num_active_requests_;
};
// In case entry was canceled free allocated.
if (req->cancel) {
alloc_.Free(req->offset, req->blob_len);
return;
}
PrimeIterator it = pt->Find(req->key);
CHECK(!it.is_done());
CHECK(it->second.HasIoPending());
auto& enqueued_entries = db_arr_[db_index]->bigbin_enqueued_entries;
auto req_it = enqueued_entries.find(req->key);
CHECK(req_it != enqueued_entries.end());
CHECK_EQ(req_it->second, req);
if (io_res < 0) {
LOG(ERROR) << "Error writing to ssd storage " << util::detail::SafeErrorMessage(-io_res);
it->second.SetIoPending(false);
alloc_.Free(req->offset, req->blob_len);
enqueued_entries.erase(req->key);
return;
}
it->second.SetExternal(req.offset, req.blob_len);
mi_free(req.block_ptr);
};
io_mgr_.WriteAsync(res, string_view{block_ptr, page_size}, std::move(cb));
enqueued_entries.erase(req->key);
ExternalizeEntry(req->offset, db_slice_.MutableStats(db_index), &it->second);
VLOG_IF(2, num_active_requests_ == 0) << "Finished active requests";
};
++num_active_requests_;
io_mgr_.WriteAsync(res, string_view{req->block_ptr, req->page_size}, std::move(cb));
}
bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
PerDb* db = db_arr_[db_index];
int64_t res = alloc_.Malloc(kBlockLen);
VLOG(2) << "FlushPending Malloc:" << res;
if (res < 0) {
InitiateGrow(-res);
return false;
@ -530,14 +601,14 @@ bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
for (auto key_view : bin_record.pending_entries) {
PrimeIterator it = pt->Find(key_view);
DCHECK(IsValid(it));
if (it->second.HasExpire()) {
auto [pit, exp_it] = db_slice_.ExpireIfNeeded(db_context, it);
CHECK(!pit.is_done()) << "TBD: should abort in case of expired keys";
}
req->Add(it->first, it->second);
it->second.SetIoPending(true);
VLOG(2) << "add to enqueued_entries: " << req->entries().back();
auto res = bin_record.enqueued_entries.emplace(req->entries().back(), req);
CHECK(res.second);
}
@ -545,7 +616,7 @@ bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
auto cb = [this, req](int io_res) { this->FinishIoRequest(io_res, req); };
++num_active_requests_;
io_mgr_.WriteAsync(file_offset, req->block(), move(cb));
io_mgr_.WriteAsync(file_offset, req->block(), std::move(cb));
++stats_.tiered_writes;
bin_record.pending_entries.clear();

View file

@ -43,6 +43,8 @@ class TieredStorage {
TieredStats GetStats() const;
void CancelAllIos(DbIndex db_index);
private:
class InflightWriteRequest;
@ -59,12 +61,8 @@ class TieredStorage {
IoMgr io_mgr_;
ExternalAllocator alloc_;
size_t submitted_io_writes_ = 0;
size_t submitted_io_write_size_ = 0;
uint32_t num_active_requests_ = 0;
EventCount active_req_sem_;
struct PerDb;
std::vector<PerDb*> db_arr_;

View file

@ -24,7 +24,7 @@ class TieredStorageTest : public BaseFamilyTest {
num_threads_ = 1;
}
void FillExternalKeys(unsigned count);
void FillExternalKeys(unsigned count, int val_size = 256);
static void SetUpTestSuite();
};
@ -41,8 +41,8 @@ void TieredStorageTest::SetUpTestSuite() {
}
}
void TieredStorageTest::FillExternalKeys(unsigned count) {
string val(256, 'a');
void TieredStorageTest::FillExternalKeys(unsigned count, int val_size) {
string val(val_size, 'a');
unsigned batch_cnt = count / 50;
for (unsigned i = 0; i < batch_cnt; ++i) {
@ -50,7 +50,7 @@ void TieredStorageTest::FillExternalKeys(unsigned count) {
cmd.push_back("mset");
for (unsigned j = 0; j < 50; ++j) {
string key = StrCat("k", i * 100 + j);
string key = StrCat("k", i * 50 + j);
cmd.push_back(key);
cmd.push_back(val);
}
@ -89,4 +89,113 @@ TEST_F(TieredStorageTest, Basic) {
EXPECT_EQ(m.db_stats[0].tiered_entries, tiered_entries - 1);
}
TEST_F(TieredStorageTest, DelBeforeOffload) {
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
Metrics m = GetMetrics();
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
EXPECT_LT(m.db_stats[0].tiered_entries, 100);
for (unsigned i = 0; i < 100; ++i) {
Run({"del", StrCat("k", i)});
}
m = GetMetrics();
EXPECT_EQ(m.db_stats[0].tiered_entries, 0u);
FillExternalKeys(100);
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
EXPECT_LT(m.db_stats[0].tiered_entries, 100);
}
TEST_F(TieredStorageTest, AddMultiDb) {
Run({"select", "1"});
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
Run({"select", "5"});
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
Metrics m = GetMetrics();
EXPECT_GT(m.db_stats[1].tiered_entries, 0u);
EXPECT_LT(m.db_stats[1].tiered_entries, 100);
EXPECT_GT(m.db_stats[5].tiered_entries, 0u);
EXPECT_LT(m.db_stats[4].tiered_entries, 100);
}
TEST_F(TieredStorageTest, FlushDBAfterSet) {
Run({"select", "5"});
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
Run({"flushdb"});
Metrics m = GetMetrics();
EXPECT_EQ(m.db_stats[5].tiered_entries, 0u);
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
EXPECT_GT(m.db_stats[5].tiered_entries, 0u);
EXPECT_LT(m.db_stats[5].tiered_entries, 100);
}
TEST_F(TieredStorageTest, FlushAllAfterSet) {
Run({"select", "5"});
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
Run({"flushall"});
Metrics m = GetMetrics();
EXPECT_EQ(m.db_stats[5].tiered_entries, 0u);
FillExternalKeys(100);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
EXPECT_GT(m.db_stats[5].tiered_entries, 0u);
EXPECT_LT(m.db_stats[5].tiered_entries, 100);
}
TEST_F(TieredStorageTest, AddBigValues) {
FillExternalKeys(100, 5000);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
Run({"flushall"});
Metrics m = GetMetrics();
EXPECT_EQ(m.db_stats[0].tiered_entries, 0u);
FillExternalKeys(100, 5000);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
}
TEST_F(TieredStorageTest, DelBigValues) {
FillExternalKeys(100, 5000);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
for (unsigned i = 0; i < 100; ++i) {
Run({"del", StrCat("k", i)});
}
Metrics m = GetMetrics();
EXPECT_EQ(m.db_stats[0].tiered_entries, 0u);
FillExternalKeys(100, 5000);
EXPECT_EQ(100, CheckedInt({"dbsize"}));
usleep(20000); // 0.02 milliseconds
m = GetMetrics();
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
}
} // namespace dfly