chore: cluster related clean ups (#4683)

Adding comments plus some missing include headers.
No functional changes.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-03 14:32:41 +02:00 committed by GitHub
parent 77b9a8f699
commit cf3eb8f05f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 30 additions and 18 deletions

View file

@ -15,6 +15,7 @@ extern "C" {
#include "core/top_keys.h"
#include "search/doc_index.h"
#include "server/channel_store.h"
#include "server/cluster/slot_set.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
@ -67,7 +68,7 @@ void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable*
stats.AddTypeMemoryUsage(type, size);
if (IsClusterEnabled()) {
if (db->slots_stats) {
db->slots_stats[KeySlot(key)].memory_bytes += size;
}
}
@ -157,6 +158,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
if (db_slice_->WillBlockOnJournalWrite()) {
return res;
}
// Disable flush journal changes to prevent preemtion in GarbageCollect.
journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal());
@ -187,6 +189,7 @@ unsigned PrimeEvictionPolicy::GarbageCollect(const PrimeTable::HotspotBuckets& e
unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeTable* me) {
if (!can_evict_ || db_slice_->WillBlockOnJournalWrite())
return 0;
// Disable flush journal changes to prevent preemtion in evict.
journal::JournalFlushGuard journal_flush_guard(db_slice_->shard_owner()->journal());
@ -594,7 +597,7 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
break;
case UpdateStatsMode::kReadStats:
events_.hits++;
if (IsClusterEnabled()) {
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_reads++;
}
if (res.it->second.IsExternal()) {
@ -756,7 +759,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
events_.stash_unloaded = db.prime.stash_unloaded();
events_.evicted_keys += evp.evicted();
events_.garbage_checked += evp.checked();
if (IsClusterEnabled()) {
if (db.slots_stats) {
SlotId sid = KeySlot(key);
db.slots_stats[sid].key_count += 1;
}
@ -855,7 +858,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
etl.DecommitMemory(ServerState::kDataHeap);
}
void DbSlice::FlushSlots(cluster::SlotRanges slot_ranges) {
void DbSlice::FlushSlots(const cluster::SlotRanges& slot_ranges) {
cluster::SlotSet slot_set(slot_ranges);
InvalidateSlotWatches(slot_set);
fb2::Fiber("flush_slots", [this, slot_set = std::move(slot_set)]() mutable {
@ -1179,7 +1182,7 @@ void DbSlice::PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size
++events_.update;
if (IsClusterEnabled()) {
if (db.slots_stats) {
db.slots_stats[KeySlot(key)].total_writes += 1;
}
@ -1320,8 +1323,8 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
if (ttl <= 0) {
auto prime_it = db.prime.Find(it->first);
if (prime_it.is_done()) { // A workaround for the case our tables are inconsistent.
LOG(DFATAL) << "Expired key " << key << " not found in prime table, expire_done: "
<< it.is_done();
LOG(DFATAL) << "Expired key " << key
<< " not found in prime table, expire_done: " << it.is_done();
if (!it.is_done()) {
db.expire.Erase(it->first);
}
@ -1682,7 +1685,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
}
} // del_it->first.IsAsyncDelete()
if (IsClusterEnabled()) {
if (table->slots_stats) {
SlotId sid = KeySlot(del_it.key());
table->slots_stats[sid].key_count -= 1;
}

View file

@ -8,7 +8,6 @@
#include "core/string_or_view.h"
#include "facade/dragonfly_connection.h"
#include "facade/op_status.h"
#include "server/cluster/slot_set.h"
#include "server/common.h"
#include "server/conn_context.h"
#include "server/table.h"
@ -17,6 +16,11 @@
namespace dfly {
namespace cluster {
class SlotRanges;
class SlotSet;
} // namespace cluster
using facade::OpResult;
struct DbStats : public DbTableStats {
@ -355,7 +359,7 @@ class DbSlice {
void FlushDb(DbIndex db_ind);
// Flushes the data of given slot ranges.
void FlushSlots(cluster::SlotRanges slot_ranges);
void FlushSlots(const cluster::SlotRanges& slot_ranges);
EngineShard* shard_owner() const {
return owner_;

View file

@ -187,6 +187,9 @@ void JournalSlice::AddLogRecord(const Entry& entry) {
}
void JournalSlice::CallOnChange(const JournalItem& item) {
// This lock is never blocking because it contends with UnregisterOnChange, which is cpu only.
// Hence this lock prevents the UnregisterOnChange to start running in the middle of CallOnChange.
// CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before.
std::shared_lock lk(cb_mu_);
const size_t size = change_cb_arr_.size();

View file

@ -6,6 +6,7 @@
#include <deque>
#include "server/cluster/slot_set.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/journal/journal.h"

View file

@ -16,6 +16,7 @@ extern "C" {
#include "base/logging.h"
#include "core/qlist.h"
#include "server/blocking_controller.h"
#include "server/cluster/cluster_defs.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"

View file

@ -111,7 +111,7 @@ class SliceSnapshot {
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
// Journal listener
void OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg);
void OnJournalEntry(const journal::JournalItem& item, bool allow_await);
// Push serializer's internal buffer.
// Push regardless of buffer size if force is true.

View file

@ -6,7 +6,6 @@
#include "base/flags.h"
#include "base/logging.h"
#include "core/top_keys.h"
#include "server/cluster_support.h"
#include "server/server_state.h"
@ -85,7 +84,7 @@ DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
mcflag(0, detail::ExpireTablePolicy{}, mr),
index(db_index) {
if (IsClusterEnabled()) {
slots_stats.resize(kMaxSlotNum + 1);
slots_stats.reset(new SlotStats[kMaxSlotNum + 1]);
}
thread_index = ServerState::tlocal()->thread_index();
}

View file

@ -126,7 +126,7 @@ struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_coun
mutable std::vector<std::string> expired_keys_events_;
mutable DbTableStats stats;
std::vector<SlotStats> slots_stats;
std::unique_ptr<SlotStats[]> slots_stats;
ExpireTable::Cursor expire_cursor;
TopKeys* top_keys = nullptr;

View file

@ -18,6 +18,7 @@
#include "core/intent_lock.h"
#include "core/tx_queue.h"
#include "facade/op_status.h"
#include "server/cluster_support.h"
#include "server/common.h"
#include "server/journal/types.h"
#include "server/namespaces.h"

View file

@ -222,14 +222,13 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs&
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1);
// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
// Record expiry in journal with independent transaction.
// Must be called from shard thread owning key.
// Might block the calling fiber unless Journal::SetFlushMode(false) is called.
void RecordExpiry(DbIndex dbid, std::string_view key);
// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
void TriggerJournalWriteToSink();
// std::ostream& operator<<(std::ostream& os, ArgSlice list);
} // namespace dfly

View file

@ -23,6 +23,7 @@ extern "C" {
#include "facade/cmd_arg_parser.h"
#include "facade/error.h"
#include "server/blocking_controller.h"
#include "server/cluster/cluster_defs.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/container_utils.h"