This commit is contained in:
Borys 2025-05-09 22:07:23 +08:00 committed by GitHub
commit 0d57f0170e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 68 additions and 43 deletions

View file

@ -75,10 +75,17 @@ void InitializeCluster() {
}
SlotId KeySlot(std::string_view key) {
if (!IsClusterEnabledOrEmulated())
return kNoSlotId;
string_view tag = LockTagOptions::instance().Tag(key);
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}
SlotId KeySlotIfNoSlotId(std::string_view key, SlotId slot) {
return slot == kNoSlotId ? KeySlot(key) : slot;
}
bool IsClusterShardedByTag() {
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
}

View file

@ -26,6 +26,8 @@ extern bool cluster_shard_by_slot;
using SlotId = std::uint16_t;
constexpr SlotId kMaxSlotNum = 0x3FFF;
// kNoSlotId - if slot wasn't set at all
constexpr SlotId kNoSlotId = kMaxSlotNum + 1;
// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same.
// Only works when cluster is enabled.
@ -45,8 +47,6 @@ class UniqueSlotChecker {
}
private:
// kNoSlotId - if slot wasn't set at all
static constexpr SlotId kNoSlotId = kMaxSlotNum + 1;
// kCrossSlot - if several different slots were set
static constexpr SlotId kCrossSlot = kNoSlotId + 1;
@ -55,6 +55,9 @@ class UniqueSlotChecker {
SlotId KeySlot(std::string_view key);
// calculate slot if slot == kNoSlotId
SlotId KeySlotIfNoSlotId(std::string_view key, SlotId slot);
void InitializeCluster();
inline bool IsClusterEnabled() {

View file

@ -60,7 +60,7 @@ static_assert(kPrimeSegmentSize <= 32288);
// 24576
static_assert(kExpireSegmentSize == 23528);
void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) {
void AccountObjectMemory(SlotId slot, unsigned type, int64_t size, DbTable* db) {
DCHECK_NE(db, nullptr);
if (size == 0)
return;
@ -71,9 +71,7 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable*
stats.AddTypeMemoryUsage(type, size);
if (db->slots_stats) {
db->slots_stats[KeySlot(key)].memory_bytes += size;
}
db->GetSlotStats(slot).memory_bytes += size;
}
class PrimeEvictionPolicy {
@ -416,7 +414,7 @@ auto DbSlice::GetStats() const -> Stats {
SlotStats DbSlice::GetSlotStats(SlotId sid) const {
CHECK(db_arr_[0]);
return db_arr_[0]->slots_stats[sid];
return db_arr_[0]->GetSlotStats(sid);
}
void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
@ -459,7 +457,8 @@ void DbSlice::AutoUpdater::Run() {
DCHECK(fields_.action == DestructorAction::kRun);
CHECK_NE(fields_.db_slice, nullptr);
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size);
fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size,
fields_.slot);
Cancel(); // Reset to not run again
}
@ -484,7 +483,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx, string_view key,
std::optional<unsigned> req_obj_type) {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats);
SlotId key_slot = KeySlot(key);
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats, key_slot);
if (!res.ok()) {
return res.status();
}
@ -498,6 +498,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
return {{it, exp_it,
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
@ -523,7 +524,8 @@ OpResult<DbSlice::ConstIterator> DbSlice::FindReadOnly(const Context& cntx, stri
}
auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const -> OpResult<PrimeItAndExp> {
UpdateStatsMode stats_mode, SlotId slot) const
-> OpResult<PrimeItAndExp> {
if (!IsDbValid(cntx.db_index)) { // Can it even happen?
LOG(DFATAL) << "Invalid db index " << cntx.db_index;
return OpStatus::KEY_NOTFOUND;
@ -567,9 +569,7 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
break;
case UpdateStatsMode::kReadStats:
events_.hits++;
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_reads++;
}
db.GetSlotStats(KeySlotIfNoSlotId(key, slot)).total_reads++;
if (res.it->second.IsExternal()) {
if (res.it->second.IsCool())
events_.ram_cool_hits++;
@ -610,7 +610,9 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
DCHECK(IsDbValid(cntx.db_index));
DbTable& db = *db_arr_[cntx.db_index];
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats);
SlotId key_slot = KeySlot(key);
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats, key_slot);
if (res.ok()) {
Iterator it(res->it, StringOrView::FromView(key));
@ -623,6 +625,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
.it = it,
.exp_it = exp_it,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
@ -718,7 +721,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
entries_count_++;
db.stats.inline_keys += it->first.IsInline();
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key
AccountObjectMemory(key_slot, it->first.ObjType(), it->first.MallocUsed(),
&db); // Account for key
DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op
it.SetVersion(NextVersion());
@ -730,14 +734,12 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx,
events_.stash_unloaded = db.prime.stash_unloaded();
events_.evicted_keys += evp.evicted();
events_.garbage_checked += evp.checked();
if (db.slots_stats) {
SlotId sid = KeySlot(key);
db.slots_stats[sid].key_count += 1;
}
db.GetSlotStats(key_slot).key_count++;
return ItAndUpdater{.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.slot = key_slot,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
@ -780,7 +782,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
std::string_view key = it->first.GetSlice(&tmp);
SlotId sid = KeySlot(key);
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get());
PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get(), sid);
++del_count;
}
++it;
@ -1127,9 +1129,10 @@ void DbSlice::PreUpdateBlocking(DbIndex db_ind, Iterator it) {
it.GetInnerIt().SetVersion(NextVersion());
}
void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size) {
void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size,
SlotId slot) {
int64_t delta = static_cast<int64_t>(it->second.MallocUsed()) - static_cast<int64_t>(orig_size);
AccountObjectMemory(key, it->second.ObjType(), delta, GetDBTable(db_ind));
AccountObjectMemory(slot, it->second.ObjType(), delta, GetDBTable(db_ind));
auto& db = *db_arr_[db_ind];
auto& watched_keys = db.watched_keys;
@ -1146,9 +1149,7 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
++events_.update;
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_writes += 1;
}
db.GetSlotStats(slot).total_writes++;
if (!client_tracking_map_.empty()) {
QueueInvalidationTrackingMessageAtomic(key);
@ -1636,7 +1637,8 @@ size_t DbSlice::StopSampleKeys(DbIndex db_ind) {
return count;
}
void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table) {
void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table,
SlotId slot) {
FiberAtomicGuard guard;
size_t table_before = table->table_memory();
if (!exp_it.is_done()) {
@ -1663,9 +1665,10 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
ssize_t value_heap_size = pv.MallocUsed(), key_size_used = del_it->first.MallocUsed();
stats.inline_keys -= del_it->first.IsInline();
AccountObjectMemory(del_it.key(), del_it->first.ObjType(), -key_size_used,
table); // Key
AccountObjectMemory(del_it.key(), pv.ObjType(), -value_heap_size, table); // Value
slot = KeySlotIfNoSlotId(del_it.key(), slot);
AccountObjectMemory(slot, del_it->first.ObjType(), -key_size_used, table); // Key
AccountObjectMemory(slot, pv.ObjType(), -value_heap_size, table); // Value
if (del_it->first.IsAsyncDelete() && pv.ObjType() == OBJ_SET &&
pv.Encoding() == kEncodingStrMap2) {
@ -1681,10 +1684,7 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
}
} // del_it->first.IsAsyncDelete()
if (table->slots_stats) {
SlotId sid = KeySlot(del_it.key());
table->slots_stats[sid].key_count -= 1;
}
table->GetSlotStats(slot).key_count--;
table->prime.Erase(del_it.GetInnerIt());
@ -1701,14 +1701,14 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable
}
}
void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) {
void DbSlice::PerformDeletion(Iterator del_it, DbTable* table, SlotId slot) {
ExpIterator exp_it;
if (del_it->second.HasExpire()) {
exp_it = ExpIterator::FromPrime(table->expire.Find(del_it->first));
DCHECK(!exp_it.is_done());
}
PerformDeletionAtomic(del_it, exp_it, table);
PerformDeletionAtomic(del_it, exp_it, table, slot);
}
void DbSlice::OnCbFinishBlocking() {

View file

@ -8,6 +8,7 @@
#include "core/string_or_view.h"
#include "facade/dragonfly_connection.h"
#include "facade/op_status.h"
#include "server/cluster_support.h"
#include "server/common.h"
#include "server/conn_context.h"
#include "server/table.h"
@ -166,6 +167,7 @@ class DbSlice {
// Wrap members in a struct to auto generate operator=
struct Fields {
DestructorAction action = DestructorAction::kDoNothing;
SlotId slot = 0;
DbSlice* db_slice = nullptr;
DbIndex db_ind = 0;
@ -339,7 +341,7 @@ class DbSlice {
void ActivateDb(DbIndex db_ind);
// Delete a key referred by its iterator.
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(Iterator del_it, DbTable* table, SlotId slot_hint = kNoSlotId);
// Deletes the iterator. The iterator must be valid.
void Del(Context cntx, Iterator it);
@ -540,7 +542,7 @@ class DbSlice {
private:
void PreUpdateBlocking(DbIndex db_ind, Iterator it);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size, SlotId slot);
bool DelEmptyPrimeValue(const Context& cntx, Iterator it);
@ -560,7 +562,8 @@ class DbSlice {
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table);
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table,
SlotId slot_hint = kNoSlotId);
// Queues invalidation message to the clients that are tracking the change to a key.
void QueueInvalidationTrackingMessageAtomic(std::string_view key);
@ -585,7 +588,8 @@ class DbSlice {
OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const;
UpdateStatsMode stats_mode,
SlotId slot_hint = kNoSlotId) const;
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type);

View file

@ -83,9 +83,9 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
expire(0, detail::ExpireTablePolicy{}, mr),
mcflag(0, detail::ExpireTablePolicy{}, mr),
index(db_index) {
if (IsClusterEnabled()) {
slots_stats.reset(new SlotStats[kMaxSlotNum + 1]);
}
// if cluster is not enabled we assume that we have only one slot to make code simpler
auto slots_stats_size = IsClusterEnabled() ? kMaxSlotNum + 1 : 1;
slots_stats_.reset(new SlotStats[slots_stats_size]);
thread_index = ServerState::tlocal()->thread_index();
}
@ -110,4 +110,8 @@ PrimeIterator DbTable::Launder(PrimeIterator it, string_view key) {
return it;
}
SlotStats& DbTable::GetSlotStats(SlotId slot) {
return IsClusterEnabled() ? slots_stats_[slot] : slots_stats_[0];
}
} // namespace dfly

View file

@ -126,7 +126,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
mutable std::vector<std::string> expired_keys_events_;
mutable DbTableStats stats;
std::unique_ptr<SlotStats[]> slots_stats;
ExpireTable::Cursor expire_cursor;
TopKeys* top_keys = nullptr;
@ -144,6 +144,13 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
size_t table_memory() const {
return expire.mem_usage() + prime.mem_usage();
}
// implement null-object pattern approach
SlotStats& GetSlotStats(SlotId slot);
private:
// without cluster we assume that we have only one slot
std::unique_ptr<SlotStats[]> slots_stats_;
};
// We use reference counting semantics of DbTable when doing snapshotting.