diff --git a/src/server/cluster_support.cc b/src/server/cluster_support.cc index 47754287d..36e798501 100644 --- a/src/server/cluster_support.cc +++ b/src/server/cluster_support.cc @@ -75,10 +75,17 @@ void InitializeCluster() { } SlotId KeySlot(std::string_view key) { + if (!IsClusterEnabledOrEmulated()) + return kNoSlotId; + string_view tag = LockTagOptions::instance().Tag(key); return crc16(tag.data(), tag.length()) & kMaxSlotNum; } +SlotId KeySlotIfNoSlotId(std::string_view key, SlotId slot) { + return slot == kNoSlotId ? KeySlot(key) : slot; +} + bool IsClusterShardedByTag() { return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled; } diff --git a/src/server/cluster_support.h b/src/server/cluster_support.h index 01a230531..e2d660b2f 100644 --- a/src/server/cluster_support.h +++ b/src/server/cluster_support.h @@ -26,6 +26,8 @@ extern bool cluster_shard_by_slot; using SlotId = std::uint16_t; constexpr SlotId kMaxSlotNum = 0x3FFF; +// kNoSlotId - if slot wasn't set at all +constexpr SlotId kNoSlotId = kMaxSlotNum + 1; // A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. // Only works when cluster is enabled. @@ -45,8 +47,6 @@ class UniqueSlotChecker { } private: - // kNoSlotId - if slot wasn't set at all - static constexpr SlotId kNoSlotId = kMaxSlotNum + 1; // kCrossSlot - if several different slots were set static constexpr SlotId kCrossSlot = kNoSlotId + 1; @@ -55,6 +55,9 @@ class UniqueSlotChecker { SlotId KeySlot(std::string_view key); +// calculate slot if slot == kNoSlotId +SlotId KeySlotIfNoSlotId(std::string_view key, SlotId slot); + void InitializeCluster(); inline bool IsClusterEnabled() { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cb286dc93..74c12f29d 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -60,7 +60,7 @@ static_assert(kPrimeSegmentSize <= 32288); // 24576 static_assert(kExpireSegmentSize == 23528); -void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) { +void AccountObjectMemory(SlotId slot, unsigned type, int64_t size, DbTable* db) { DCHECK_NE(db, nullptr); if (size == 0) return; @@ -71,9 +71,7 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* stats.AddTypeMemoryUsage(type, size); - if (db->slots_stats) { - db->slots_stats[KeySlot(key)].memory_bytes += size; - } + db->GetSlotStats(slot).memory_bytes += size; } class PrimeEvictionPolicy { @@ -416,7 +414,7 @@ auto DbSlice::GetStats() const -> Stats { SlotStats DbSlice::GetSlotStats(SlotId sid) const { CHECK(db_arr_[0]); - return db_arr_[0]->slots_stats[sid]; + return db_arr_[0]->GetSlotStats(sid); } void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { @@ -459,7 +457,8 @@ void DbSlice::AutoUpdater::Run() { DCHECK(fields_.action == DestructorAction::kRun); CHECK_NE(fields_.db_slice, nullptr); - fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size); + fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size, + fields_.slot); Cancel(); // Reset to not run again } @@ -484,7 +483,8 @@ OpResult DbSlice::FindMutable(const Context& cntx, string OpResult DbSlice::FindMutableInternal(const Context& cntx, string_view key, std::optional req_obj_type) { - auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats); + SlotId key_slot = KeySlot(key); + auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kMutableStats, key_slot); if (!res.ok()) { return res.status(); } @@ -498,6 +498,7 @@ OpResult DbSlice::FindMutableInternal(const Context& cntx return {{it, exp_it, AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .slot = key_slot, .db_slice = this, .db_ind = cntx.db_index, .it = it, @@ -523,7 +524,8 @@ OpResult DbSlice::FindReadOnly(const Context& cntx, stri } auto DbSlice::FindInternal(const Context& cntx, string_view key, optional req_obj_type, - UpdateStatsMode stats_mode) const -> OpResult { + UpdateStatsMode stats_mode, SlotId slot) const + -> OpResult { if (!IsDbValid(cntx.db_index)) { // Can it even happen? LOG(DFATAL) << "Invalid db index " << cntx.db_index; return OpStatus::KEY_NOTFOUND; @@ -567,9 +569,7 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optionalsecond.IsExternal()) { if (res.it->second.IsCool()) events_.ram_cool_hits++; @@ -610,7 +610,9 @@ OpResult DbSlice::AddOrFindInternal(const Context& cntx, DCHECK(IsDbValid(cntx.db_index)); DbTable& db = *db_arr_[cntx.db_index]; - auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats); + + SlotId key_slot = KeySlot(key); + auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kMutableStats, key_slot); if (res.ok()) { Iterator it(res->it, StringOrView::FromView(key)); @@ -623,6 +625,7 @@ OpResult DbSlice::AddOrFindInternal(const Context& cntx, .it = it, .exp_it = exp_it, .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .slot = key_slot, .db_slice = this, .db_ind = cntx.db_index, .it = it, @@ -718,7 +721,8 @@ OpResult DbSlice::AddOrFindInternal(const Context& cntx, entries_count_++; db.stats.inline_keys += it->first.IsInline(); - AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key + AccountObjectMemory(key_slot, it->first.ObjType(), it->first.MallocUsed(), + &db); // Account for key DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op it.SetVersion(NextVersion()); @@ -730,14 +734,12 @@ OpResult DbSlice::AddOrFindInternal(const Context& cntx, events_.stash_unloaded = db.prime.stash_unloaded(); events_.evicted_keys += evp.evicted(); events_.garbage_checked += evp.checked(); - if (db.slots_stats) { - SlotId sid = KeySlot(key); - db.slots_stats[sid].key_count += 1; - } + db.GetSlotStats(key_slot).key_count++; return ItAndUpdater{.it = Iterator(it, StringOrView::FromView(key)), .exp_it = ExpIterator{}, .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .slot = key_slot, .db_slice = this, .db_ind = cntx.db_index, .it = Iterator(it, StringOrView::FromView(key)), @@ -780,7 +782,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { std::string_view key = it->first.GetSlice(&tmp); SlotId sid = KeySlot(key); if (slot_ids.Contains(sid) && it.GetVersion() < next_version) { - PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get()); + PerformDeletion(Iterator::FromPrime(it), db_arr_[0].get(), sid); ++del_count; } ++it; @@ -1127,9 +1129,10 @@ void DbSlice::PreUpdateBlocking(DbIndex db_ind, Iterator it) { it.GetInnerIt().SetVersion(NextVersion()); } -void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size) { +void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size, + SlotId slot) { int64_t delta = static_cast(it->second.MallocUsed()) - static_cast(orig_size); - AccountObjectMemory(key, it->second.ObjType(), delta, GetDBTable(db_ind)); + AccountObjectMemory(slot, it->second.ObjType(), delta, GetDBTable(db_ind)); auto& db = *db_arr_[db_ind]; auto& watched_keys = db.watched_keys; @@ -1146,9 +1149,7 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size ++events_.update; - if (db.slots_stats) { - db.slots_stats[KeySlot(key)].total_writes += 1; - } + db.GetSlotStats(slot).total_writes++; if (!client_tracking_map_.empty()) { QueueInvalidationTrackingMessageAtomic(key); @@ -1636,7 +1637,8 @@ size_t DbSlice::StopSampleKeys(DbIndex db_ind) { return count; } -void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table) { +void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table, + SlotId slot) { FiberAtomicGuard guard; size_t table_before = table->table_memory(); if (!exp_it.is_done()) { @@ -1663,9 +1665,10 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable ssize_t value_heap_size = pv.MallocUsed(), key_size_used = del_it->first.MallocUsed(); stats.inline_keys -= del_it->first.IsInline(); - AccountObjectMemory(del_it.key(), del_it->first.ObjType(), -key_size_used, - table); // Key - AccountObjectMemory(del_it.key(), pv.ObjType(), -value_heap_size, table); // Value + + slot = KeySlotIfNoSlotId(del_it.key(), slot); + AccountObjectMemory(slot, del_it->first.ObjType(), -key_size_used, table); // Key + AccountObjectMemory(slot, pv.ObjType(), -value_heap_size, table); // Value if (del_it->first.IsAsyncDelete() && pv.ObjType() == OBJ_SET && pv.Encoding() == kEncodingStrMap2) { @@ -1681,10 +1684,7 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable } } // del_it->first.IsAsyncDelete() - if (table->slots_stats) { - SlotId sid = KeySlot(del_it.key()); - table->slots_stats[sid].key_count -= 1; - } + table->GetSlotStats(slot).key_count--; table->prime.Erase(del_it.GetInnerIt()); @@ -1701,14 +1701,14 @@ void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable } } -void DbSlice::PerformDeletion(Iterator del_it, DbTable* table) { +void DbSlice::PerformDeletion(Iterator del_it, DbTable* table, SlotId slot) { ExpIterator exp_it; if (del_it->second.HasExpire()) { exp_it = ExpIterator::FromPrime(table->expire.Find(del_it->first)); DCHECK(!exp_it.is_done()); } - PerformDeletionAtomic(del_it, exp_it, table); + PerformDeletionAtomic(del_it, exp_it, table, slot); } void DbSlice::OnCbFinishBlocking() { diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 493d7e008..b0499f5b7 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -8,6 +8,7 @@ #include "core/string_or_view.h" #include "facade/dragonfly_connection.h" #include "facade/op_status.h" +#include "server/cluster_support.h" #include "server/common.h" #include "server/conn_context.h" #include "server/table.h" @@ -166,6 +167,7 @@ class DbSlice { // Wrap members in a struct to auto generate operator= struct Fields { DestructorAction action = DestructorAction::kDoNothing; + SlotId slot = 0; DbSlice* db_slice = nullptr; DbIndex db_ind = 0; @@ -339,7 +341,7 @@ class DbSlice { void ActivateDb(DbIndex db_ind); // Delete a key referred by its iterator. - void PerformDeletion(Iterator del_it, DbTable* table); + void PerformDeletion(Iterator del_it, DbTable* table, SlotId slot_hint = kNoSlotId); // Deletes the iterator. The iterator must be valid. void Del(Context cntx, Iterator it); @@ -540,7 +542,7 @@ class DbSlice { private: void PreUpdateBlocking(DbIndex db_ind, Iterator it); - void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); + void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size, SlotId slot); bool DelEmptyPrimeValue(const Context& cntx, Iterator it); @@ -560,7 +562,8 @@ class DbSlice { // Clear tiered storage entries for the specified indices. void ClearOffloadedEntries(absl::Span indices, const DbTableArray& db_arr); - void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table); + void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table, + SlotId slot_hint = kNoSlotId); // Queues invalidation message to the clients that are tracking the change to a key. void QueueInvalidationTrackingMessageAtomic(std::string_view key); @@ -585,7 +588,8 @@ class DbSlice { OpResult FindInternal(const Context& cntx, std::string_view key, std::optional req_obj_type, - UpdateStatsMode stats_mode) const; + UpdateStatsMode stats_mode, + SlotId slot_hint = kNoSlotId) const; OpResult FindMutableInternal(const Context& cntx, std::string_view key, std::optional req_obj_type); diff --git a/src/server/table.cc b/src/server/table.cc index 7ad08abf9..c43f67b25 100644 --- a/src/server/table.cc +++ b/src/server/table.cc @@ -83,9 +83,9 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index) expire(0, detail::ExpireTablePolicy{}, mr), mcflag(0, detail::ExpireTablePolicy{}, mr), index(db_index) { - if (IsClusterEnabled()) { - slots_stats.reset(new SlotStats[kMaxSlotNum + 1]); - } + // if cluster is not enabled we assume that we have only one slot to make code simpler + auto slots_stats_size = IsClusterEnabled() ? kMaxSlotNum + 1 : 1; + slots_stats_.reset(new SlotStats[slots_stats_size]); thread_index = ServerState::tlocal()->thread_index(); } @@ -110,4 +110,8 @@ PrimeIterator DbTable::Launder(PrimeIterator it, string_view key) { return it; } +SlotStats& DbTable::GetSlotStats(SlotId slot) { + return IsClusterEnabled() ? slots_stats_[slot] : slots_stats_[0]; +} + } // namespace dfly diff --git a/src/server/table.h b/src/server/table.h index 2cda63f6c..9dce59620 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -126,7 +126,7 @@ struct DbTable : boost::intrusive_ref_counter expired_keys_events_; mutable DbTableStats stats; - std::unique_ptr slots_stats; + ExpireTable::Cursor expire_cursor; TopKeys* top_keys = nullptr; @@ -144,6 +144,13 @@ struct DbTable : boost::intrusive_ref_counter slots_stats_; }; // We use reference counting semantics of DbTable when doing snapshotting.