chore: Track db_slice table memory instantly (#3375)

We update table_memory upon each deletion and insertion of an element.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-24 14:13:08 +03:00 committed by GitHub
parent f73c7d0e42
commit 03b3f86aed
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 31 additions and 7 deletions

View file

@ -323,7 +323,7 @@ auto DbSlice::GetStats() const -> Stats {
stats.key_count = db_wrap.prime.size(); stats.key_count = db_wrap.prime.size();
stats.bucket_count = db_wrap.prime.bucket_count(); stats.bucket_count = db_wrap.prime.bucket_count();
stats.expire_count = db_wrap.expire.size(); stats.expire_count = db_wrap.expire.size();
stats.table_mem_usage = (db_wrap.prime.mem_usage() + db_wrap.expire.mem_usage()); stats.table_mem_usage = db_wrap.table_memory();
} }
s.small_string_bytes = CompactObj::GetStats().small_string_bytes; s.small_string_bytes = CompactObj::GetStats().small_string_bytes;
@ -599,6 +599,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
PrimeIterator it; PrimeIterator it;
// I try/catch just for sake of having a convenient place to set a breakpoint. // I try/catch just for sake of having a convenient place to set a breakpoint.
size_t table_before = db.prime.mem_usage();
try { try {
it = db.prime.InsertNew(std::move(co_key), PrimeValue{}, evp); it = db.prime.InsertNew(std::move(co_key), PrimeValue{}, evp);
} catch (bad_alloc& e) { } catch (bad_alloc& e) {
@ -607,6 +608,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
return OpStatus::OUT_OF_MEMORY; return OpStatus::OUT_OF_MEMORY;
} }
table_memory_ += (db.prime.mem_usage() - table_before);
size_t evicted_obj_bytes = 0; size_t evicted_obj_bytes = 0;
// We may still reach the state when our memory usage is above the limit even if we // We may still reach the state when our memory usage is above the limit even if we
@ -749,11 +751,11 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
ClearOffloadedEntries(indexes, db_arr_); ClearOffloadedEntries(indexes, db_arr_);
DbTableArray flush_db_arr(db_arr_.size()); DbTableArray flush_db_arr(db_arr_.size());
for (DbIndex index : indexes) { for (DbIndex index : indexes) {
auto& db = db_arr_[index]; table_memory_ -= db_arr_[index]->table_memory();
CHECK(db);
InvalidateDbWatches(index); InvalidateDbWatches(index);
flush_db_arr[index] = std::move(db); flush_db_arr[index] = std::move(db_arr_[index]);
CreateDb(index); CreateDb(index);
std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks); std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks);
@ -792,14 +794,20 @@ void DbSlice::FlushDb(DbIndex db_ind) {
void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) { void DbSlice::AddExpire(DbIndex db_ind, Iterator main_it, uint64_t at) {
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
CHECK(db_arr_[db_ind]->expire.Insert(main_it->first.AsRef(), ExpirePeriod(delta)).second); auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
CHECK(db.expire.Insert(main_it->first.AsRef(), ExpirePeriod(delta)).second);
table_memory_ += (db.expire.mem_usage() - table_before);
main_it->second.SetExpire(true); main_it->second.SetExpire(true);
} }
bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) { bool DbSlice::RemoveExpire(DbIndex db_ind, Iterator main_it) {
if (main_it->second.HasExpire()) { if (main_it->second.HasExpire()) {
CHECK_EQ(1u, db_arr_[db_ind]->expire.Erase(main_it->first)); auto& db = *db_arr_[db_ind];
size_t table_before = db.expire.mem_usage();
CHECK_EQ(1u, db.expire.Erase(main_it->first));
main_it->second.SetExpire(false); main_it->second.SetExpire(false);
table_memory_ += (db.expire.mem_usage() - table_before);
return true; return true;
} }
return false; return false;
@ -933,8 +941,10 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdateInternal(const Context& c
if (IsValid(res.exp_it) && force_update) { if (IsValid(res.exp_it) && force_update) {
res.exp_it->second = ExpirePeriod(delta); res.exp_it->second = ExpirePeriod(delta);
} else { } else {
size_t table_before = db.expire.mem_usage();
auto exp_it = db.expire.InsertNew(it->first.AsRef(), ExpirePeriod(delta)); auto exp_it = db.expire.InsertNew(it->first.AsRef(), ExpirePeriod(delta));
res.exp_it = ExpIterator(exp_it, StringOrView::FromView(key)); res.exp_it = ExpIterator(exp_it, StringOrView::FromView(key));
table_memory_ += (db.expire.mem_usage() - table_before);
} }
} }
@ -1296,6 +1306,7 @@ void DbSlice::CreateDb(DbIndex db_ind) {
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
if (!db) { if (!db) {
db.reset(new DbTable{owner_->memory_resource(), db_ind}); db.reset(new DbTable{owner_->memory_resource(), db_ind});
table_memory_ += db->table_memory();
} }
} }
@ -1498,6 +1509,7 @@ void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
} }
void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table) { void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* table) {
size_t table_before = table->table_memory();
if (!exp_it.is_done()) { if (!exp_it.is_done()) {
table->expire.Erase(exp_it.GetInnerIt()); table->expire.Erase(exp_it.GetInnerIt());
} }
@ -1533,6 +1545,8 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
} }
table->prime.Erase(del_it.GetInnerIt()); table->prime.Erase(del_it.GetInnerIt());
table_memory_ += (table->table_memory() - table_before);
SendInvalidationTrackingMessage(del_it.key()); SendInvalidationTrackingMessage(del_it.key());
} }

View file

@ -412,6 +412,10 @@ class DbSlice {
return version_; return version_;
} }
size_t table_memory() const {
return table_memory_;
}
using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>; using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;
//! Registers the callback to be called for each change. //! Registers the callback to be called for each change.
@ -575,6 +579,7 @@ class DbSlice {
ssize_t memory_budget_ = SSIZE_MAX; ssize_t memory_budget_ = SSIZE_MAX;
size_t bytes_per_object_ = 0; size_t bytes_per_object_ = 0;
size_t soft_budget_limit_ = 0; size_t soft_budget_limit_ = 0;
size_t table_memory_ = 0;
mutable SliceEvents events_; // we may change this even for const operations. mutable SliceEvents events_; // we may change this even for const operations.

View file

@ -715,9 +715,10 @@ void EngineShard::CacheStats() {
DbTable* table = db_slice.GetDBTable(i); DbTable* table = db_slice.GetDBTable(i);
if (table) { if (table) {
entries += table->prime.size(); entries += table->prime.size();
table_memory += (table->prime.mem_usage() + table->expire.mem_usage()); table_memory += table->table_memory();
} }
} }
DCHECK_EQ(table_memory, db_slice.table_memory());
size_t obj_memory = table_memory <= used_mem ? used_mem - table_memory : 0; size_t obj_memory = table_memory <= used_mem ? used_mem - table_memory : 0;
size_t bytes_per_obj = entries > 0 ? obj_memory / entries : 0; size_t bytes_per_obj = entries > 0 ? obj_memory / entries : 0;

View file

@ -139,6 +139,10 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
void Clear(); void Clear();
PrimeIterator Launder(PrimeIterator it, std::string_view key); PrimeIterator Launder(PrimeIterator it, std::string_view key);
size_t table_memory() const {
return expire.mem_usage() + prime.mem_usage();
}
}; };
// We use reference counting semantics of DbTable when doing snapshotting. // We use reference counting semantics of DbTable when doing snapshotting.