chore: clean up dbslice (#4769)

* merge AddOrFindResult with ItAndUpdater
* remove unused functions
* add missing FiberAtomicGuards on atomic member functions
This commit is contained in:
Kostas Kyrimis 2025-03-15 10:38:26 +02:00 committed by GitHub
parent 464dd0522e
commit 8acf849e9d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 41 additions and 71 deletions

View file

@ -261,7 +261,7 @@ class ElementAccess {
EngineShard* shard_ = nullptr;
mutable DbSlice::AutoUpdater post_updater_;
void SetFields(EngineShard* shard, DbSlice::AddOrFindResult res);
void SetFields(EngineShard* shard, DbSlice::ItAndUpdater res);
public:
ElementAccess(string_view key, const OpArgs& args) : key_{key}, context_{args.db_cntx} {
@ -298,7 +298,7 @@ std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
return res.status() != OpStatus::KEY_NOTFOUND;
}
void ElementAccess::SetFields(EngineShard* shard, DbSlice::AddOrFindResult res) {
void ElementAccess::SetFields(EngineShard* shard, DbSlice::ItAndUpdater res) {
element_iter_ = res.it;
added_ = res.is_new;
shard_ = shard;

View file

@ -479,14 +479,6 @@ DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
fields_.orig_heap_size = fields.it->second.MallocUsed();
}
DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) {
it = o.it;
exp_it = o.exp_it;
is_new = false;
post_updater = std::move(o).post_updater;
return *this;
}
DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) {
return std::move(FindMutableInternal(cntx, key, std::nullopt).value());
}
@ -626,12 +618,11 @@ auto DbSlice::FindInternal(const Context& cntx, string_view key, optional<unsign
return res;
}
OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFind(const Context& cntx, string_view key) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFind(const Context& cntx, string_view key) {
return AddOrFindInternal(cntx, key);
}
OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cntx,
string_view key) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrFindInternal(const Context& cntx, string_view key) {
DCHECK(IsDbValid(cntx.db_index));
DbTable& db = *db_arr_[cntx.db_index];
@ -644,15 +635,15 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
// PreUpdate() might have caused a deletion of `it`
if (res->it.IsOccupied()) {
return DbSlice::AddOrFindResult{
return ItAndUpdater{
.it = it,
.exp_it = exp_it,
.is_new = false,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = it,
.key = key})};
.key = key}),
.is_new = false};
} else {
res = OpStatus::KEY_NOTFOUND;
}
@ -760,15 +751,14 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
db.slots_stats[sid].key_count += 1;
}
return DbSlice::AddOrFindResult{
.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.is_new = true,
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
.key = key})};
return ItAndUpdater{.it = Iterator(it, StringOrView::FromView(key)),
.exp_it = ExpIterator{},
.post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
.db_slice = this,
.db_ind = cntx.db_index,
.it = Iterator(it, StringOrView::FromView(key)),
.key = key}),
.is_new = true};
}
void DbSlice::ActivateDb(DbIndex db_ind) {
@ -1049,11 +1039,10 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it,
}
}
OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key,
PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrUpdateInternal(const Context& cntx,
std::string_view key, PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) {
DCHECK(!obj.IsRef());
auto op_result = AddOrFind(cntx, key);
@ -1084,8 +1073,8 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdateInternal(const Context& c
return op_result;
}
OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrUpdate(const Context& cntx, string_view key,
PrimeValue obj, uint64_t expire_at_ms) {
OpResult<DbSlice::ItAndUpdater> DbSlice::AddOrUpdate(const Context& cntx, string_view key,
PrimeValue obj, uint64_t expire_at_ms) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}
@ -1545,6 +1534,7 @@ void DbSlice::SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events) {
}
void DbSlice::QueueInvalidationTrackingMessageAtomic(std::string_view key) {
FiberAtomicGuard guard;
auto it = client_tracking_map_.find(key);
if (it == client_tracking_map_.end()) {
return;
@ -1651,10 +1641,6 @@ size_t DbSlice::StopSampleKeys(DbIndex db_ind) {
return count;
}
void DbSlice::PerformDeletion(PrimeIterator del_it, DbTable* table) {
return PerformDeletion(Iterator::FromPrime(del_it), table);
}
void DbSlice::PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table) {
FiberAtomicGuard guard;
size_t table_before = table->table_memory();

View file

@ -285,10 +285,7 @@ class DbSlice {
Iterator it;
ExpIterator exp_it;
AutoUpdater post_updater;
bool IsValid() const {
return !it.is_done();
}
bool is_new = false;
};
ItAndUpdater FindMutable(const Context& cntx, std::string_view key);
@ -304,20 +301,11 @@ class DbSlice {
OpResult<ConstIterator> FindReadOnly(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;
struct AddOrFindResult {
Iterator it;
ExpIterator exp_it;
bool is_new = false;
AutoUpdater post_updater;
AddOrFindResult& operator=(ItAndUpdater&& o);
};
OpResult<AddOrFindResult> AddOrFind(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> AddOrFind(const Context& cntx, std::string_view key);
// Same as AddOrSkip, but overwrites in case entry exists.
OpResult<AddOrFindResult> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);
OpResult<ItAndUpdater> AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms);
// Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry.
@ -552,9 +540,9 @@ class DbSlice {
bool DelEmptyPrimeValue(const Context& cntx, Iterator it);
OpResult<AddOrFindResult> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update);
OpResult<ItAndUpdater> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update);
void FlushSlotsFb(const cluster::SlotSet& slot_ids);
void FlushDbIndexes(const std::vector<DbIndex>& indexes);
@ -568,9 +556,7 @@ class DbSlice {
// Clear tiered storage entries for the specified indices.
void ClearOffloadedEntries(absl::Span<const DbIndex> indices, const DbTableArray& db_arr);
//
void PerformDeletionAtomic(Iterator del_it, ExpIterator exp_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);
// Queues invalidation message to the clients that are tracking the change to a key.
void QueueInvalidationTrackingMessageAtomic(std::string_view key);
@ -590,7 +576,7 @@ class DbSlice {
PrimeItAndExp ExpireIfNeeded(const Context& cntx, PrimeIterator it) const;
OpResult<AddOrFindResult> AddOrFindInternal(const Context& cntx, std::string_view key);
OpResult<ItAndUpdater> AddOrFindInternal(const Context& cntx, std::string_view key);
OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,

View file

@ -157,9 +157,8 @@ class RdbRestoreValue : protected RdbLoaderBase {
rdb_version_ = rdb_version;
}
OpResult<DbSlice::AddOrFindResult> Add(string_view key, string_view payload,
const DbContext& cntx, const RestoreArgs& args,
DbSlice* db_slice);
OpResult<DbSlice::ItAndUpdater> Add(string_view key, string_view payload, const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice);
private:
std::optional<OpaqueObj> Parse(io::Source* source);
@ -190,10 +189,9 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
return std::optional<OpaqueObj>(std::move(obj));
}
OpResult<DbSlice::AddOrFindResult> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx,
const RestoreArgs& args,
DbSlice* db_slice) {
OpResult<DbSlice::ItAndUpdater> RdbRestoreValue::Add(string_view key, string_view data,
const DbContext& cntx, const RestoreArgs& args,
DbSlice* db_slice) {
InMemSource data_src(data);
PrimeValue pv;
bool first_parse = true;
@ -715,7 +713,7 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
OpResult<vector<long>> OpFieldExpire(const OpArgs& op_args, string_view key, uint32_t ttl_sec,
CmdArgList values) {
auto& db_slice = op_args.GetDbSlice();
auto [it, expire_it, auto_updater] = db_slice.FindMutable(op_args.db_cntx, key);
auto [it, expire_it, auto_updater, is_new] = db_slice.FindMutable(op_args.db_cntx, key);
if (!IsValid(it) || (it->second.ObjType() != OBJ_SET && it->second.ObjType() != OBJ_HASH)) {
std::vector<long> res(values.size(), -2);

View file

@ -328,8 +328,8 @@ std::optional<JsonType> ShardJsonFromString(std::string_view input) {
return dfly::JsonFromString(input, CompactObj::memory_resource());
}
OpResult<DbSlice::AddOrFindResult> SetJson(const OpArgs& op_args, string_view key,
string_view json_str) {
OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
string_view json_str) {
auto& db_slice = op_args.GetDbSlice();
auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
@ -1330,7 +1330,7 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
}
JsonMemTracker mem_tracker;
OpResult<DbSlice::AddOrFindResult> st = SetJson(op_args, key, json_str);
auto st = SetJson(op_args, key, json_str);
RETURN_ON_BAD_STATUS(st);
mem_tracker.SetJsonSize(st->it->second, st->is_new);
return true;

View file

@ -326,7 +326,7 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, facade::ArgRange vals, bool journal_rewrite) {
EngineShard* es = op_args.shard;
DbSlice::AddOrFindResult res;
DbSlice::ItAndUpdater res;
if (skip_notexist) {
auto tmp_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_LIST);

View file

@ -673,7 +673,7 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
auto& db_slice = op_args.GetDbSlice();
DbSlice::AddOrFindResult add_res;
DbSlice::ItAndUpdater add_res;
if (opts.no_mkstream) {
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
RETURN_ON_BAD_STATUS(res_it);