refactor: reduce slot calculations

This commit is contained in:
Borys 2025-05-05 15:39:28 +03:00
parent 4d07d7d053
commit 7f3ea4acbf
No known key found for this signature in database
GPG key ID: 753496F020ABFD14
6 changed files with 64 additions and 44 deletions

View file

@ -79,6 +79,10 @@ SlotId KeySlot(std::string_view key) {
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}
SlotId KeySlotOr(std::string_view key, SlotId default_value) {
return IsClusterEnabledOrEmulated() ? KeySlot(key) : default_value;
}
bool IsClusterShardedByTag() {
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
}

View file

@ -55,6 +55,9 @@ class UniqueSlotChecker {
SlotId KeySlot(std::string_view key);
// if !IsClusterEnabled() returns default_value
SlotId KeySlotOr(std::string_view key, SlotId default_value);
void InitializeCluster();
inline bool IsClusterEnabled() {

View file

@ -60,7 +60,7 @@ static_assert(kPrimeSegmentSize == 32288);
// 24576
static_assert(kExpireSegmentSize == 23528);
void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) {
void AccountObjectMemory(SlotId slot, unsigned type, int64_t size, DbTable* db) {
DCHECK_NE(db, nullptr);
if (size == 0)
return;
@ -71,9 +71,7 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable*
stats.AddTypeMemoryUsage(type, size);
if (db->slots_stats) {
db->slots_stats[KeySlot(key)].memory_bytes += size;
}
db->GetSlotStats(slot).memory_bytes += size;
}
class PrimeEvictionPolicy {
@ -416,7 +414,7 @@ auto DbSlice::GetStats() const -> Stats {
SlotStats DbSlice::GetSlotStats(SlotId sid) const {
CHECK(db_arr_[0]);
return db_arr_[0]->slots_stats[sid];
return db_arr_[0]->GetSlotStats(sid);
}
void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
@ -459,7 +457,8 @@ void DbSlice::AutoUpdater::Run() {
DCHECK(fields_.action == DestructorAction::kRun);
CHECK_NE(fields_.db_slice, nullptr);
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size);
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size,
fields_.slot);
Cancel(); // Reset to not run again
}
@ -484,7 +483,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx, string_view key,
std::optional<unsigned> req_obj_type) {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats);
SlotId key_slot = KeySlotOr(key, 0);
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats, key_slot);
if (!res.ok()) {
return res.status();
}
@ -498,6 +498,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
return {{it, exp_it,
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
@ -508,14 +509,14 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
}
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) const {
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kReadStats);
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kReadStats, KeySlotOr(key, 0));
return {ConstIterator(res->it, StringOrView::FromView(key)),
ExpConstIterator(res->exp_it, StringOrView::FromView(key))};
}
OpResult<DbSlice::ConstIterator> DbSlice::FindReadOnly(const Context& cntx, string_view key,
unsigned req_obj_type) const {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats);
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats, KeySlotOr(key, 0));
if (res.ok()) {
return ConstIterator(res->it, StringOrView::FromView(key));
}
@ -523,7 +524,8 @@ OpResult<DbSlice::ConstIterator> DbSlice::FindReadOnly(const Context& cntx, stri
}
auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const -> OpResult<PrimeItAndExp> {
UpdateStatsMode stats_mode, SlotId slot) const
-> OpResult<PrimeItAndExp> {
if (!IsDbValid(cntx.db_index)) { // Can it even happen?
LOG(DFATAL) << "Invalid db index " << cntx.db_index;
return OpStatus::KEY_NOTFOUND;
@ -567,9 +569,7 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
break;
case UpdateStatsMode::kReadStats:
events_.hits++;
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_reads++;
}
db.GetSlotStats(slot).total_reads++;
if (res.it->second.IsExternal()) {
if (res.it->second.IsCool())
events_.ram_cool_hits++;
@ -610,7 +610,9 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
DCHECK(IsDbValid(cntx.db_index));
DbTable& db = *db_arr_[cntx.db_index];
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats);
SlotId key_slot = KeySlotOr(key, 0);
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats, key_slot);
if (res.ok()) {
Iterator it(res->it, StringOrView::FromView(key));
@ -623,6 +625,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
.it = it,
.exp_it = exp_it,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
@ -718,7 +721,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
entries_count_++;
db.stats.inline_keys += it->first.IsInline();
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key
AccountObjectMemory(key_slot, it->first.ObjType(), it->first.MallocUsed(),
&db); // Account for key
DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
it.SetVersion(NextVersion());
@ -730,14 +734,12 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
events_.stash_unloaded = db.prime.stash_unloaded();
events_.evicted_keys += evp.evicted();
events_.garbage_checked += evp.checked();
if (db.slots_stats) {
SlotId sid = KeySlot(key);
db.slots_stats[sid].key_count += 1;
}
db.GetSlotStats(key_slot).key_count++;
return ItAndUpdater{.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
@ -780,7 +782,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
std::string_view key = it->first.GetSlice(&tmp);
SlotId sid = KeySlot(key);
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get());
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get(), sid);
++del_count;
}
++it;
@ -1127,9 +1129,10 @@ void DbSlice::PreUpdateBlocking(DbIndex db_ind, Iterator it) {
it.GetInnerIt().SetVersion(NextVersion());
}
void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size) {
void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size,
SlotId slot) {
int64_t delta = static_cast<int64_t>(it->second.MallocUsed()) - static_cast<int64_t>(orig_size);
AccountObjectMemory(key, it->second.ObjType(), delta, GetDBTable(db_ind));
AccountObjectMemory(slot, it->second.ObjType(), delta, GetDBTable(db_ind));
auto& db = *db_arr_[db_ind];
auto& watched_keys = db.watched_keys;
@ -1146,9 +1149,7 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
++events_.update;
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_writes += 1;
}
db.GetSlotStats(slot).total_writes++;
if (!client_tracking_map_.empty()) {
QueueInvalidationTrackingMessageAtomic(key);
@ -1636,7 +1637,8 @@ size_t DbSlice::StopSampleKeys(DbIndex db_ind) {
return count;
}
void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table) {
void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table,
optional<SlotId> slot) {
FiberAtomicGuard guard;
size_t table_before = table->table_memory();
if (!exp_it.is_done()) {
@ -1663,9 +1665,10 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
ssize_t value_heap_size = pv.MallocUsed(), key_size_used = del_it->first.MallocUsed();
stats.inline_keys -= del_it->first.IsInline();
AccountObjectMemory(del_it.key(), del_it->first.ObjType(), -key_size_used,
table); // Key
AccountObjectMemory(del_it.key(), pv.ObjType(), -value_heap_size, table); // Value
slot = slot ? slot : KeySlotOr(del_it.key(), 0);
AccountObjectMemory(*slot, del_it->first.ObjType(), -key_size_used, table); // Key
AccountObjectMemory(*slot, pv.ObjType(), -value_heap_size, table); // Value
if (del_it->first.IsAsyncDelete() && pv.ObjType() == OBJ_SET &&
pv.Encoding() == kEncodingStrMap2) {
@ -1681,10 +1684,7 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
}
} // del_it->first.IsAsyncDelete()
if (table->slots_stats) {
SlotId sid = KeySlot(del_it.key());
table->slots_stats[sid].key_count -= 1;
}
table->GetSlotStats(*slot).key_count--;
table->prime.Erase(del_it.GetInnerIt());
@ -1701,14 +1701,14 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
}
}
void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
void DbSlice::PerformDeletion(Iterator del_it, DbTable* table, optional<SlotId> slot) {
ExpIterator exp_it;
if (del_it->second.HasExpire()) {
exp_it = ExpIterator::FromPrime(table->expire.Find(del_it->first));
DCHECK(!exp_it.is_done());
}
PerformDeletionAtomic(del_it, exp_it, table);
PerformDeletionAtomic(del_it, exp_it, table, slot);
}
void DbSlice::OnCbFinishBlocking() {

View file

@ -166,6 +166,7 @@ class DbSlice {
// Wrap members in a struct to auto generate operator=
struct Fields {
DestructorAction action = DestructorAction::kDoNothing;
SlotId slot = 0;
DbSlice* db_slice = nullptr;
DbIndex db_ind = 0;
@ -338,8 +339,8 @@ class DbSlice {
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);
// Delete a key referred by its iterator. If slot is nullopt it will be calculated automatically
void PerformDeletion(Iterator del_it, DbTable* table, std::optional<SlotId> slot = std::nullopt);
// Deletes the iterator. The iterator must be valid.
void Del(Context cntx, Iterator it);
@ -540,7 +541,7 @@ class DbSlice {
private:
void PreUpdateBlocking(DbIndex db_ind, Iterator it);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size, SlotId slot);
bool DelEmptyPrimeValue(const Context& cntx, Iterator it);
@ -560,7 +561,8 @@ class DbSlice {
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table);
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table,
std::optional<SlotId> slot = std::nullopt);
// Queues invalidation message to the clients that are tracking the change to a key.
void QueueInvalidationTrackingMessageAtomic(std::string_view key);
@ -585,7 +587,7 @@ class DbSlice {
OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const;
UpdateStatsMode stats_mode, SlotId slot) const;
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type);

View file

@ -83,9 +83,9 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr),
index(db_index) {
if (IsClusterEnabled()) {
slots_stats.reset(new SlotStats[kMaxSlotNum + 1]);
}
// if cluster is not enabled we assume that we have only one slot to make code simpler
auto slots_stats_size = IsClusterEnabled() ? kMaxSlotNum + 1 : 1;
slots_stats_.reset(new SlotStats[slots_stats_size]);
thread_index = ServerState::tlocal()->thread_index();
}
@ -110,4 +110,8 @@ PrimeIterator DbTable::Launder(PrimeIterator it, string_view key) {
return it;
}
SlotStats& DbTable::GetSlotStats(SlotId slot) {
return IsClusterEnabled() ? slots_stats_[slot] : slots_stats_[0];
}
} // namespace dfly

View file

@ -126,7 +126,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
mutable std::vector<std::string> expired_keys_events_;
mutable DbTableStats stats;
std::unique_ptr<SlotStats[]> slots_stats;
ExpireTable::Cursor expire_cursor;
TopKeys* top_keys = nullptr;
@ -144,6 +144,13 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
size_t table_memory() const {
return expire.mem_usage() + prime.mem_usage();
}
// implement null-object pattern approach
SlotStats& GetSlotStats(SlotId slot);
private:
// without cluster we assume that we have only one slot
std::unique_ptr<SlotStats[]> slots_stats_;
};
// We use reference counting semantics of DbTable when doing snapshotting.