From 7a6852802277f6cb2fa74b28bb6ff74668b67c9f Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 2 Jan 2025 11:35:55 +0200 Subject: [PATCH] chore: minor refactorings around dense_set deletions (#4390) chore: refactorings around deletions Done as a preparation to introduce asynchronous deletions for sets/zsets/hmaps. 1. Restrict the interface around DbSlice::Del. Now it requires for the iterator to be valid and the checks should be explicit before the call. Most callers already provides a valid iterator. 2. Some minor refactoring in compact_object_test. 3. Expose DenseSet::ClearStep to allow iterative deletions of elements. Signed-off-by: Roman Gershman --- src/core/compact_object.cc | 4 +- src/core/compact_object_test.cc | 39 ++++++++++++------- src/core/dense_set.cc | 5 ++- src/core/dense_set.h | 11 +++--- src/server/db_slice.cc | 10 ++--- src/server/db_slice.h | 3 +- src/server/engine_shard.cc | 7 ++-- src/server/generic_family.cc | 69 +++++++++++++++++++-------------- src/server/generic_family.h | 1 + src/server/hset_family.cc | 19 +++++---- src/server/json_family.cc | 7 +++- src/server/list_family.cc | 10 ++--- src/server/set_family.cc | 12 +++--- src/server/zset_family.cc | 12 +++--- 14 files changed, 116 insertions(+), 93 deletions(-) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index d30f6f44c..261a7f4bd 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -791,8 +791,8 @@ uint64_t CompactObj::HashCode() const { } if (encoded) { - GetString(&tl.tmp_str); - return XXH3_64bits_withSeed(tl.tmp_str.data(), tl.tmp_str.size(), kHashSeed); + string_view sv = GetSlice(&tl.tmp_str); + return XXH3_64bits_withSeed(sv.data(), sv.size(), kHashSeed); } switch (taglen_) { diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 207e67f1c..eedce4fee 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -76,29 +76,38 @@ void DeallocateAtRandom(size_t steps, std::vector* ptrs) { } } +static void InitThreadStructs() { + auto* tlh = mi_heap_get_backing(); + init_zmalloc_threadlocal(tlh); + SmallString::InitThreadLocal(tlh); + thread_local MiMemoryResource mi_resource(tlh); + CompactObj::InitThreadLocal(&mi_resource); +}; + +static void CheckEverythingDeallocated() { + mi_heap_collect(mi_heap_get_backing(), true); + + auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block, + size_t block_size, void* arg) { + LOG(ERROR) << "Unfreed allocations: block_size " << block_size + << ", allocated: " << area->used * block_size; + return true; + }; + + mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit, + nullptr); +} + class CompactObjectTest : public ::testing::Test { protected: static void SetUpTestSuite() { InitRedisTables(); // to initialize server struct. - auto* tlh = mi_heap_get_backing(); - init_zmalloc_threadlocal(tlh); - SmallString::InitThreadLocal(tlh); - CompactObj::InitThreadLocal(PMR_NS::get_default_resource()); + InitThreadStructs(); } static void TearDownTestSuite() { - mi_heap_collect(mi_heap_get_backing(), true); - - auto cb_visit = [](const mi_heap_t* heap, const mi_heap_area_t* area, void* block, - size_t block_size, void* arg) { - LOG(ERROR) << "Unfreed allocations: block_size " << block_size - << ", allocated: " << area->used * block_size; - return true; - }; - - mi_heap_visit_blocks(mi_heap_get_backing(), false /* do not visit all blocks*/, cb_visit, - nullptr); + CheckEverythingDeallocated(); } CompactObj cobj_; diff --git a/src/core/dense_set.cc b/src/core/dense_set.cc index 0b8b7479d..9750ded2a 100644 --- a/src/core/dense_set.cc +++ b/src/core/dense_set.cc @@ -168,14 +168,14 @@ auto DenseSet::PopPtrFront(DenseSet::ChainVectorIterator it) -> DensePtr { return front; } -uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) { +uint32_t DenseSet::ClearStep(uint32_t start, uint32_t count) { constexpr unsigned kArrLen = 32; ClearItem arr[kArrLen]; unsigned len = 0; size_t end = min(entries_.size(), start + count); for (size_t i = start; i < end; ++i) { - DensePtr ptr = entries_[i]; + DensePtr& ptr = entries_[i]; if (ptr.IsEmpty()) continue; @@ -190,6 +190,7 @@ uint32_t DenseSet::ClearInternal(uint32_t start, uint32_t count) { dest.ptr = ptr; dest.obj = nullptr; } + ptr.Reset(); if (len == kArrLen) { ClearBatch(kArrLen, arr); len = 0; diff --git a/src/core/dense_set.h b/src/core/dense_set.h index 1cc5ac135..0ac73a47b 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -215,9 +215,13 @@ class DenseSet { virtual ~DenseSet(); void Clear() { - ClearInternal(0, entries_.size()); + ClearStep(0, entries_.size()); } + // Returns the next bucket index that should be cleared. + // Returns BucketCount when all objects are erased. + uint32_t ClearStep(uint32_t start, uint32_t count); + // Returns the number of elements in the map. Note that it might be that some of these elements // have expired and can't be accessed. size_t UpperBoundSize() const { @@ -303,11 +307,6 @@ class DenseSet { void* PopInternal(); - // Note this does not free any dynamic allocations done by derived classes, that a DensePtr - // in the set may point to. This function only frees the allocated DenseLinkKeys created by - // DenseSet. All data allocated by a derived class should be freed before calling this - uint32_t ClearInternal(uint32_t start, uint32_t count); - void IncreaseMallocUsed(size_t delta) { obj_malloc_used_ += delta; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 88f18696b..90c10a54f 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -669,10 +669,8 @@ void DbSlice::ActivateDb(DbIndex db_ind) { CreateDb(db_ind); } -bool DbSlice::Del(Context cntx, Iterator it) { - if (!IsValid(it)) { - return false; - } +void DbSlice::Del(Context cntx, Iterator it) { + CHECK(IsValid(it)); auto& db = db_arr_[cntx.db_index]; auto obj_type = it->second.ObjType(); @@ -683,8 +681,6 @@ bool DbSlice::Del(Context cntx, Iterator it) { doc_del_cb_(key, cntx, it->second); } PerformDeletion(it, db.get()); - - return true; } void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { @@ -917,7 +913,7 @@ OpResult DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it, } if (rel_msec <= 0) { // implicit - don't persist - CHECK(Del(cntx, prime_it)); + Del(cntx, prime_it); return -1; } else if (IsValid(expire_it) && !params.persist) { auto current = ExpireTime(expire_it); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 7b56ee8ca..263fa26b3 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -346,7 +346,8 @@ class DbSlice { // Delete a key referred by its iterator. void PerformDeletion(Iterator del_it, DbTable* table); - bool Del(Context cntx, Iterator it); + // Deletes the iterator. The iterator must be valid. + void Del(Context cntx, Iterator it); constexpr static DbIndex kDbAll = 0xFFFF; diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 5396b8f6f..f637f1a11 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -426,11 +426,10 @@ bool EngineShard::DoDefrag() { // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { - if (!namespaces) { - return util::ProactorBase::kOnIdleMaxLevel; - } - constexpr uint32_t kRunAtLowPriority = 0u; + if (!namespaces) { + return kRunAtLowPriority; + } if (defrag_state_.CheckRequired()) { VLOG(2) << shard_id_ << ": need to run defrag memory cursor state: " << defrag_state_.cursor; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 94a0319a0..1e0cb7508 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -440,7 +440,7 @@ OpStatus Renamer::DelSrc(Transaction* t, EngineShard* shard) { DVLOG(1) << "Rename: removing the key '" << src_key_; res.post_updater.Run(); - CHECK(db_slice.Del(t->GetDbContext(), it)); + db_slice.Del(t->GetDbContext(), it); if (shard->journal()) { RecordJournal(t->GetOpArgs(shard), "DEL"sv, ArgSlice{src_key_}, 2); } @@ -462,7 +462,7 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) { if (dest_found_) { DVLOG(1) << "Rename: deleting the destiny key '" << dest_key_; dest_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx, dest_res.it)); + db_slice.Del(op_args.db_cntx, dest_res.it); } if (restore_args.Expired()) { @@ -554,7 +554,7 @@ OpResult OpRestore(const OpArgs& op_args, std::string_view key, std::strin VLOG(1) << "restore command is running with replace, found old key '" << key << "' and removing it"; res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx, res.it)); + db_slice.Del(op_args.db_cntx, res.it); } else { // we are not allowed to replace it. return OpStatus::KEY_EXISTS; @@ -812,7 +812,7 @@ OpStatus OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) { // Restore expire flag after std::move. from_res.it->second.SetExpire(IsValid(from_res.exp_it)); - CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); + db_slice.Del(op_args.db_cntx, from_res.it); auto op_result = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts); RETURN_ON_BAD_STATUS(op_result); auto& add_res = *op_result; @@ -868,13 +868,13 @@ OpResult OpRen(const OpArgs& op_args, string_view from_key, string_view to to_res.post_updater.Run(); from_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); + db_slice.Del(op_args.db_cntx, from_res.it); } else { // Here we first delete from_it because AddNew below could invalidate from_it. // On the other hand, AddNew does not rely on the iterators - this is why we keep // the value in `from_obj`. from_res.post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx, from_res.it)); + db_slice.Del(op_args.db_cntx, from_res.it); auto op_result = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts); RETURN_ON_BAD_STATUS(op_result); to_res = std::move(*op_result); @@ -995,35 +995,14 @@ std::optional ParseExpireOptionsOrReply(const CmdArgList args, SinkRepl return flags; } -} // namespace - -OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { - DVLOG(1) << "Del: " << keys.Front(); - auto& db_slice = op_args.GetDbSlice(); - - uint32_t res = 0; - - for (string_view key : keys) { - auto fres = db_slice.FindMutable(op_args.db_cntx, key); - if (!IsValid(fres.it)) - continue; - fres.post_updater.Run(); - res += int(db_slice.Del(op_args.db_cntx, fres.it)); - } - - return res; -} - -void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { - VLOG(1) << "Del " << ArgS(args, 0); - +void DeleteGeneric(CmdArgList args, const CommandContext& cmd_cntx) { atomic_uint32_t result{0}; auto* builder = cmd_cntx.rb; bool is_mc = (builder->GetProtocol() == Protocol::MEMCACHE); auto cb = [&result](const Transaction* t, EngineShard* shard) { ShardArgs args = t->GetShardArgs(shard->shard_id()); - auto res = OpDel(t->GetOpArgs(shard), args); + auto res = GenericFamily::OpDel(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -1049,6 +1028,36 @@ void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { } } +} // namespace + +OpResult GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) { + DVLOG(1) << "Del: " << keys.Front(); + auto& db_slice = op_args.GetDbSlice(); + + uint32_t res = 0; + + for (string_view key : keys) { + auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately + if (!IsValid(it)) + continue; + + db_slice.Del(op_args.db_cntx, it); + ++res; + } + + return res; +} + +void GenericFamily::Del(CmdArgList args, const CommandContext& cmd_cntx) { + VLOG(1) << "Del " << ArgS(args, 0); + + DeleteGeneric(args, cmd_cntx); +} + +void GenericFamily::Unlink(CmdArgList args, const CommandContext& cmd_cntx) { + DeleteGeneric(args, cmd_cntx); +} + void GenericFamily::Ping(CmdArgList args, const CommandContext& cmd_cntx) { auto* rb = static_cast(cmd_cntx.rb); if (args.size() > 1) { @@ -1886,7 +1895,7 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"TIME", CO::LOADING | CO::FAST, 1, 0, 0, acl::kTime}.HFUNC(Time) << CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, acl::kType}.HFUNC(Type) << CI{"DUMP", CO::READONLY, 2, 1, 1, acl::kDump}.HFUNC(Dump) - << CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Del) + << CI{"UNLINK", CO::WRITE, -2, 1, -1, acl::kUnlink}.HFUNC(Unlink) << CI{"STICK", CO::WRITE, -2, 1, -1, acl::kStick}.HFUNC(Stick) << CI{"SORT", CO::READONLY, -2, 1, 1, acl::kSort}.HFUNC(Sort) << CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS | CO::NO_AUTOJOURNAL, 3, 1, 1, acl::kMove}.HFUNC( diff --git a/src/server/generic_family.h b/src/server/generic_family.h index 40af99bed..0fb231ec8 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -35,6 +35,7 @@ class GenericFamily { using SinkReplyBuilder = facade::SinkReplyBuilder; static void Del(CmdArgList args, const CommandContext& cmd_cntx); + static void Unlink(CmdArgList args, const CommandContext& cmd_cntx); static void Ping(CmdArgList args, const CommandContext& cmd_cntx); static void Exists(CmdArgList args, const CommandContext& cmd_cntx); static void Expire(CmdArgList args, const CommandContext& cmd_cntx); diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 62f001603..8361a514a 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -585,10 +585,10 @@ OpResult> OpGetAll(const OpArgs& op_args, string_view key, uint8_ // and the enconding is guaranteed to be a DenseSet since we only support expiring // value with that enconding. if (res.empty()) { - auto mutable_res = db_slice.FindMutable(op_args.db_cntx, key, OBJ_HASH); - // Run postupdater, it means that we deleted the keys - mutable_res->post_updater.Run(); - db_slice.Del(op_args.db_cntx, mutable_res->it); + // post_updater will run immediately + auto it = db_slice.FindMutable(op_args.db_cntx, key).it; + + db_slice.Del(op_args.db_cntx, it); } return res; @@ -1169,10 +1169,10 @@ void HSetFamily::HRandField(CmdArgList args, const CommandContext& cmd_cntx) { } } - if (string_map->Empty()) { - auto it_mutable = db_slice.FindMutable(db_context, key, OBJ_HASH); - it_mutable->post_updater.Run(); - db_slice.Del(db_context, it_mutable->it); + if (string_map->Empty()) { // Can happen if we use a TTL on hash members. + // post_updater will run immediately + auto it = db_slice.FindMutable(db_context, key).it; + db_slice.Del(db_context, it); return facade::OpStatus::KEY_NOTFOUND; } } else if (pv.Encoding() == kEncodingListPack) { @@ -1207,8 +1207,7 @@ void HSetFamily::HRandField(CmdArgList args, const CommandContext& cmd_cntx) { } } } else { - LOG(ERROR) << "Invalid encoding " << pv.Encoding(); - return OpStatus::INVALID_VALUE; + LOG(FATAL) << "Invalid encoding " << pv.Encoding(); } return str_vec; }; diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 45e91396c..4b070c236 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -913,8 +913,13 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, if (json_path.RefersToRootElement()) { auto& db_slice = op_args.GetDbSlice(); auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - return static_cast(db_slice.Del(op_args.db_cntx, it)); + if (IsValid(it)) { + db_slice.Del(op_args.db_cntx, it); + return 1; + } + return 0; } + JsonMemTracker tracker; // FindMutable because we need to run the AutoUpdater at the end which will account // the deltas calculated from the MemoryTracker diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 984e9f31d..ab16b0e63 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -180,7 +180,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis OpArgs op_args = t->GetOpArgs(shard); if (len == 0) { DVLOG(1) << "deleting key " << key << " " << t->DebugId(); - CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, it)); + op_args.GetDbSlice().Del(op_args.db_cntx, it); } if (op_args.shard->journal()) { @@ -276,7 +276,7 @@ OpResult OpMoveSingleShard(const OpArgs& op_args, string_view src, strin dest_res.post_updater.Run(); if (prev_len == 1) { - CHECK(db_slice.Del(op_args.db_cntx, src_it)); + db_slice.Del(op_args.db_cntx, src_it); } return val; @@ -452,7 +452,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u it_res->post_updater.Run(); if (count == prev_len) { - CHECK(db_slice.Del(op_args.db_cntx, it)); + db_slice.Del(op_args.db_cntx, it); } if (op_args.shard->journal() && journal_rewrite) { @@ -765,7 +765,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, string_view ele it_res->post_updater.Run(); if (len == 0) { - CHECK(db_slice.Del(op_args.db_cntx, it)); + db_slice.Del(op_args.db_cntx, it); } return removed; @@ -840,7 +840,7 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { it_res->post_updater.Run(); if (it->second.Size() == 0) { - CHECK(db_slice.Del(op_args.db_cntx, it)); + db_slice.Del(op_args.db_cntx, it); } return OpStatus::OK; } diff --git a/src/server/set_family.cc b/src/server/set_family.cc index a2ab2cbcf..f2fd67d0a 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -443,9 +443,11 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, const NewE // key if it exists. if (overwrite && (vals_it.begin() == vals_it.end())) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - db_slice.Del(op_args.db_cntx, it); - if (journal_update && op_args.shard->journal()) { - RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + if (IsValid(it)) { + db_slice.Del(op_args.db_cntx, it); + if (journal_update && op_args.shard->journal()) { + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + } } return 0; } @@ -561,7 +563,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRang find_res->post_updater.Run(); if (isempty) { - CHECK(db_slice.Del(op_args.db_cntx, find_res->it)); + db_slice.Del(op_args.db_cntx, find_res->it); } if (journal_rewrite && op_args.shard->journal()) { vector mapped(vals.Size() + 1); @@ -886,7 +888,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count // Delete the set as it is now empty find_res->post_updater.Run(); - CHECK(db_slice.Del(op_args.db_cntx, find_res->it)); + db_slice.Del(op_args.db_cntx, find_res->it); // Replicate as DEL. if (op_args.shard->journal()) { diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index fc440ad92..60eda900f 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -964,7 +964,9 @@ OpResult OpAdd(const OpArgs& op_args, const ZParams& zparams, string_ if (zparams.override && members.empty()) { auto it = db_slice.FindMutable(op_args.db_cntx, key).it; // post_updater will run immediately - db_slice.Del(op_args.db_cntx, it); + if (IsValid(it)) { + db_slice.Del(op_args.db_cntx, it); + } return OpStatus::OK; } @@ -1219,7 +1221,7 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo auto zlen = pv.Size(); if (zlen == 0) { DVLOG(1) << "deleting key " << key << " " << t->DebugId(); - CHECK(db_slice.Del(t->GetDbContext(), it_res->it)); + db_slice.Del(t->GetDbContext(), it_res->it); } OpArgs op_args = t->GetOpArgs(shard); @@ -1330,7 +1332,7 @@ auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, auto zlen = pv.Size(); if (zlen == 0) { - CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); + op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it); } return iv.PopResult(); @@ -1384,7 +1386,7 @@ OpResult OpRemRange(const OpArgs& op_args, string_view key, auto zlen = pv.Size(); if (zlen == 0) { - CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); + op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it); } return iv.removed(); @@ -1563,7 +1565,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, facade::ArgRang res_it->post_updater.Run(); if (zlen == 0) { - CHECK(op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it)); + op_args.GetDbSlice().Del(op_args.db_cntx, res_it->it); } return deleted;