From 30cf9541c2fdd72c8a3002e9293fd963b451c8f8 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 17 May 2022 14:21:28 +0300 Subject: [PATCH] Add reference counting to DbTable. The will help us to implement flushing the tables in parallel with snapshotting --- src/core/dash.h | 14 ++-- src/server/CMakeLists.txt | 2 +- src/server/db_slice.cc | 156 ++++++++++++++--------------------- src/server/db_slice.h | 59 ++++--------- src/server/dragonfly_test.cc | 19 +++++ src/server/hset_family.cc | 6 +- src/server/rdb_test.cc | 5 ++ src/server/table.cc | 56 +++++++++++++ src/server/table.h | 45 +++++++++- 9 files changed, 213 insertions(+), 149 deletions(-) create mode 100644 src/server/table.cc diff --git a/src/core/dash.h b/src/core/dash.h index 480ce3c02..690981ce5 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -249,7 +249,9 @@ class DashTable : public detail::DashTableBase { void IncreaseDepth(unsigned new_depth); void Split(uint32_t seg_id); - template void IterateUnique(Cb&& cb); + // Segment directory contains multiple segment pointers, some of them pointing to + // the same object. IterateDistinct goes over all distinct segments in the table. + template void IterateDistinct(Cb&& cb); size_t NextSeg(size_t sid) const { size_t delta = (1u << (global_depth_ - segment_[sid]->local_depth())); @@ -453,7 +455,7 @@ DashTable<_Key, _Value, Policy>::~DashTable() { std::pmr::polymorphic_allocator pa(resource); using alloc_traits = std::allocator_traits; - IterateUnique([&](SegmentType* seg) { + IterateDistinct([&](SegmentType* seg) { alloc_traits::destroy(pa, seg); alloc_traits::deallocate(pa, seg, 1); return false; @@ -517,10 +519,10 @@ void DashTable<_Key, _Value, Policy>::Clear() { return false; }; - IterateUnique(cb); + IterateDistinct(cb); size_ = 0; - // Consider the following case: table with 8 segments overall, 4 unique. + // Consider the following case: table with 8 segments overall, 4 distinct. // S1, S1, S1, S1, S2, S3, S4, S4 /* This corresponds to the tree: R @@ -531,7 +533,7 @@ void DashTable<_Key, _Value, Policy>::Clear() { We want to collapse this tree into, say, 2 segment directory. That means we need to keep S1, S2 but delete S3, S4. That means, we need to move representative segments until we reached the desired size - and the erase all other unique segments. + and then erase all other distinct segments. **********/ if (global_depth_ > initial_depth_) { std::pmr::polymorphic_allocator pa(segment_.get_allocator()); @@ -582,7 +584,7 @@ bool DashTable<_Key, _Value, Policy>::ShiftRight(bucket_iterator it) { template template -void DashTable<_Key, _Value, Policy>::IterateUnique(Cb&& cb) { +void DashTable<_Key, _Value, Policy>::IterateDistinct(Cb&& cb) { size_t i = 0; while (i < segment_.size()) { auto* seg = segment_[i]; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 34ee337d2..48aa90254 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -7,7 +7,7 @@ add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_regist engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc - set_family.cc string_family.cc tiered_storage.cc + set_family.cc string_family.cc table.cc tiered_storage.cc transaction.cc zset_family.cc) cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 0c3c61b3e..384e69636 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -35,7 +35,7 @@ static_assert(kPrimeSegmentSize == 32288); // 24576 static_assert(kExpireSegmentSize == 23528); -void UpdateStatsOnDeletion(PrimeIterator it, InternalDbStats* stats) { +void UpdateStatsOnDeletion(PrimeIterator it, DbTableStats* stats) { size_t value_heap_size = it->second.MallocUsed(); stats->inline_keys -= it->first.IsInline(); stats->obj_memory_usage -= (it->first.MallocUsed() + value_heap_size); @@ -108,7 +108,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT // choose "randomly" a stash bucket to evict an item. auto bucket_it = eb.stash_buckets[eb.key_hash % kNumStashBuckets]; auto last_slot_it = bucket_it; - last_slot_it += (PrimeTable::kBucketWidth -1); + last_slot_it += (PrimeTable::kBucketWidth - 1); if (!last_slot_it.is_done()) { UpdateStatsOnDeletion(last_slot_it, db_slice_->MutableStats(db_indx_)); } @@ -122,26 +122,11 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT #define ADD(x) (x) += o.x -InternalDbStats& InternalDbStats::operator+=(const InternalDbStats& o) { - constexpr size_t kDbSz = sizeof(InternalDbStats); - static_assert(kDbSz == 56); - - ADD(inline_keys); - ADD(obj_memory_usage); - ADD(strval_memory_usage); - ADD(listpack_blob_cnt); - ADD(listpack_bytes); - ADD(external_entries); - ADD(external_size); - - return *this; -} - DbStats& DbStats::operator+=(const DbStats& o) { constexpr size_t kDbSz = sizeof(DbStats); static_assert(kDbSz == 88); - InternalDbStats::operator+=(o); + DbTableStats::operator+=(o); ADD(key_count); ADD(expire_count); @@ -165,12 +150,6 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { #undef ADD -DbSlice::DbWrapper::DbWrapper(std::pmr::memory_resource* mr) - : prime_table(4, detail::PrimeTablePolicy{}, mr), - expire_table(0, detail::ExpireTablePolicy{}, mr), - mcflag_table(0, detail::ExpireTablePolicy{}, mr) { -} - DbSlice::DbSlice(uint32_t index, bool caching_mode, EngineShard* owner) : shard_id_(index), caching_mode_(caching_mode), owner_(owner) { db_arr_.emplace_back(); @@ -200,10 +179,10 @@ auto DbSlice::GetStats() const -> Stats { const auto& db_wrap = *db_arr_[i]; DbStats& stats = s.db_stats[i]; stats = db_wrap.stats; - stats.key_count = db_wrap.prime_table.size(); - stats.bucket_count = db_wrap.prime_table.bucket_count(); - stats.expire_count = db_wrap.expire_table.size(); - stats.table_mem_usage = (db_wrap.prime_table.mem_usage() + db_wrap.expire_table.mem_usage()); + stats.key_count = db_wrap.prime.size(); + stats.bucket_count = db_wrap.prime.bucket_count(); + stats.expire_count = db_wrap.expire.size(); + stats.table_mem_usage = (db_wrap.prime.mem_usage() + db_wrap.expire.mem_usage()); } s.small_string_bytes = CompactObj::GetStats().small_string_bytes; @@ -216,7 +195,7 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { auto& db = db_arr_[db_ind]; DCHECK(db); - db->prime_table.Reserve(key_size); + db->prime.Reserve(key_size); } auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const @@ -234,18 +213,20 @@ auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) con } pair DbSlice::FindExt(DbIndex db_ind, string_view key) const { - DCHECK(IsDbValid(db_ind)); + pair res; + + if (!IsDbValid(db_ind)) + return res; auto& db = *db_arr_[db_ind]; - PrimeIterator it = db.prime_table.Find(key); - pair res(it, ExpireIterator{}); + res.first = db.prime.Find(key); - if (!IsValid(it)) { + if (!IsValid(res.first)) { return res; } - if (it->second.HasExpire()) { // check expiry state - res = ExpireIfNeeded(db_ind, it); + if (res.first->second.HasExpire()) { // check expiry state + res = ExpireIfNeeded(db_ind, res.first); } if (caching_mode_ && IsValid(res.first)) { @@ -256,10 +237,10 @@ pair DbSlice::FindExt(DbIndex db_ind, string_view } }; - db.prime_table.CVCUponBump(change_cb_.front().first, res.first, bump_cb); + db.prime.CVCUponBump(change_cb_.front().first, res.first, bump_cb); } - res.first = db.prime_table.BumpUp(res.first); + res.first = db.prime.BumpUp(res.first); ++events_.bumpups; } @@ -308,13 +289,13 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairprime_table.Insert(std::move(co_key), PrimeValue{}, evp); + auto [it, inserted] = db->prime.Insert(std::move(co_key), PrimeValue{}, evp); if (inserted) { // new entry db->stats.inline_keys += it->first.IsInline(); db->stats.obj_memory_usage += it->first.MallocUsed(); - events_.garbage_collected += db->prime_table.garbage_collected(); - events_.stash_unloaded = db->prime_table.stash_unloaded(); + events_.garbage_collected += db->prime.garbage_collected(); + events_.stash_unloaded = db->prime.stash_unloaded(); events_.evicted_keys += evp.evicted(); it.SetVersion(NextVersion()); @@ -328,7 +309,7 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairsecond.HasExpire()) { - auto expire_it = db->expire_table.Find(existing->first); + auto expire_it = db->expire.Find(existing->first); CHECK(IsValid(expire_it)); // TODO: to implement the incremental update of expiry values using multi-generation @@ -336,10 +317,10 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pairsecond.duration_ms() <= delta_ms) { - db->expire_table.Erase(expire_it); + db->expire.Erase(expire_it); if (existing->second.HasFlag()) { - db->mcflag_table.Erase(existing->first); + db->mcflag.Erase(existing->first); } // Keep the entry but reset the object. @@ -367,7 +348,7 @@ void DbSlice::ActivateDb(DbIndex db_ind) { void DbSlice::CreateDb(DbIndex index) { auto& db = db_arr_[index]; if (!db) { - db.reset(new DbWrapper{owner_->memory_resource()}); + db.reset(new DbTable{owner_->memory_resource()}); } } @@ -378,54 +359,53 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) { auto& db = db_arr_[db_ind]; if (it->second.HasExpire()) { - CHECK_EQ(1u, db->expire_table.Erase(it->first)); + CHECK_EQ(1u, db->expire.Erase(it->first)); } if (it->second.HasFlag()) { - CHECK_EQ(1u, db->mcflag_table.Erase(it->first)); + CHECK_EQ(1u, db->mcflag.Erase(it->first)); } UpdateStatsOnDeletion(it, &db->stats); - db->prime_table.Erase(it); + db->prime.Erase(it); return true; } -size_t DbSlice::FlushDb(DbIndex db_ind) { - auto flush_single = [this](DbIndex id) { - auto& db = db_arr_[id]; - - CHECK(db); - - size_t removed = db->prime_table.size(); - db->prime_table.Clear(); - db->expire_table.Clear(); - db->mcflag_table.Clear(); - db->stats = InternalDbStats{}; - - return removed; - }; +void DbSlice::FlushDb(DbIndex db_ind) { + // TODO: to add preeemptiveness by yielding inside clear. if (db_ind != kDbAll) { - CHECK_LT(db_ind, db_arr_.size()); + auto& db = db_arr_[db_ind]; + auto db_ptr = std::move(db); + DCHECK(!db); + CreateDb(db_ind); - return flush_single(db_ind); + boost::fibers::fiber([db_ptr = std::move(db_ptr)]() mutable { db_ptr.reset(); }).detach(); + + return; } - size_t removed = 0; + auto all_dbs = std::move(db_arr_); + db_arr_.resize(all_dbs.size()); for (size_t i = 0; i < db_arr_.size(); ++i) { - if (db_arr_[i]) { - removed += flush_single(i); + if (all_dbs[i]) { + CreateDb(i); } } - return removed; + + boost::fibers::fiber([all_dbs = std::move(all_dbs)]() mutable { + for (auto& db : all_dbs) { + db.reset(); + } + }).detach(); } // Returns true if a state has changed, false otherwise. bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) { auto& db = *db_arr_[db_ind]; if (at == 0 && it->second.HasExpire()) { - CHECK_EQ(1u, db.expire_table.Erase(it->first)); + CHECK_EQ(1u, db.expire.Erase(it->first)); it->second.SetExpire(false); return true; @@ -434,7 +414,7 @@ bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) { if (!it->second.HasExpire() && at) { uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates. - CHECK(db.expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); + CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); it->second.SetExpire(true); return true; @@ -446,9 +426,9 @@ bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) { void DbSlice::SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag) { auto& db = *db_arr_[db_ind]; if (flag == 0) { - db.mcflag_table.Erase(key); + db.mcflag.Erase(key); } else { - auto [it, inserted] = db.mcflag_table.Insert(std::move(key), flag); + auto [it, inserted] = db.mcflag.Insert(std::move(key), flag); if (!inserted) it->second = flag; } @@ -456,7 +436,7 @@ void DbSlice::SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag) { uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const { auto& db = *db_arr_[db_ind]; - auto it = db.mcflag_table.Find(key); + auto it = db.mcflag.Find(key); return it.is_done() ? 0 : it->second; } @@ -490,7 +470,7 @@ pair DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr if (expire_at_ms) { new_it->second.SetExpire(true); uint64_t delta = expire_at_ms - expire_base_[0]; - CHECK(db.expire_table.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second); + CHECK(db.expire.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second); } return res; @@ -500,7 +480,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { DCHECK_LT(db_ind, db_array_size()); if (IsDbValid(db_ind)) { - return db_arr_[db_ind]->prime_table.size(); + return db_arr_[db_ind]->prime.size(); } return 0; } @@ -508,7 +488,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { DCHECK(!lock_args.args.empty()); - auto& lt = db_arr_[lock_args.db_index]->lock_table; + auto& lt = db_arr_[lock_args.db_index]->trans_locks; bool lock_acquired = true; if (lock_args.args.size() == 1) { @@ -538,7 +518,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { if (lock_args.args.size() == 1) { Release(mode, lock_args.db_index, lock_args.args.front(), 1); } else { - auto& lt = db_arr_[lock_args.db_index]->lock_table; + auto& lt = db_arr_[lock_args.db_index]->trans_locks; uniq_keys_.clear(); for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { auto s = lock_args.args[i]; @@ -554,22 +534,10 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { } } -void DbSlice::Release(IntentLock::Mode mode, DbIndex db_index, string_view key, unsigned count) { - DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key; - - auto& lt = db_arr_[db_index]->lock_table; - auto it = lt.find(key); - CHECK(it != lt.end()) << key; - it->second.Release(mode, count); - if (it->second.IsFree()) { - lt.erase(it); - } -} - bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const { DCHECK(!lock_args.args.empty()); - const auto& lt = db_arr_[lock_args.db_index]->lock_table; + const auto& lt = db_arr_[lock_args.db_index]->trans_locks; for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { auto s = lock_args.args[i]; auto it = lt.find(s); @@ -614,7 +582,7 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, DCHECK(it->second.HasExpire()); auto& db = db_arr_[db_ind]; - auto expire_it = db->expire_table.Find(it->first); + auto expire_it = db->expire.Find(it->first); CHECK(IsValid(expire_it)); @@ -624,9 +592,9 @@ pair DbSlice::ExpireIfNeeded(DbIndex db_ind, if (now_ms_ < expire_time) return make_pair(it, expire_it); - db->expire_table.Erase(expire_it); + db->expire.Erase(expire_it); UpdateStatsOnDeletion(it, &db->stats); - db->prime_table.Erase(it); + db->prime.Erase(it); ++events_.expired_keys; return make_pair(PrimeIterator{}, ExpireIterator{}); @@ -656,7 +624,7 @@ pair DbSlice::DeleteExpired(DbIndex db_ind) { auto cb = [&](ExpireIterator it) { candidates++; if (ExpireTime(it) <= Now()) { - auto prime_it = db.prime_table.Find(it->first); + auto prime_it = db.prime.Find(it->first); CHECK(!prime_it.is_done()); ExpireIfNeeded(db_ind, prime_it); ++deleted; @@ -664,7 +632,7 @@ pair DbSlice::DeleteExpired(DbIndex db_ind) { }; for (unsigned i = 0; i < 10; ++i) { - db.expire_cursor = db.expire_table.Traverse(db.expire_cursor, cb); + db.expire_cursor = db.expire.Traverse(db.expire_cursor, cb); if (deleted) break; } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 5d5565ef9..b749cc6bb 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -4,10 +4,8 @@ #pragma once -#include #include -#include "core/intent_lock.h" #include "facade/op_status.h" #include "server/common.h" #include "server/table.h" @@ -20,23 +18,7 @@ namespace dfly { using facade::OpResult; -struct InternalDbStats { - // Number of inline keys. - uint64_t inline_keys = 0; - - // Object memory usage besides hash-table capacity. - // Applies for any non-inline objects. - size_t obj_memory_usage = 0; - size_t strval_memory_usage = 0; - size_t listpack_blob_cnt = 0; - size_t listpack_bytes = 0; - size_t external_entries = 0; - size_t external_size = 0; - - InternalDbStats& operator+=(const InternalDbStats& o); -}; - -struct DbStats : public InternalDbStats { +struct DbStats : public DbTableStats { // number of active keys. size_t key_count = 0; @@ -49,8 +31,8 @@ struct DbStats : public InternalDbStats { // Memory used by dictionaries. size_t table_mem_usage = 0; - using InternalDbStats::operator+=; - using InternalDbStats::operator=; + using DbTableStats::operator+=; + using DbTableStats::operator=; DbStats& operator+=(const DbStats& o); }; @@ -84,8 +66,10 @@ class DbSlice { // Otherwise (string_view is set) then it's a new key that is going to be added to the table. std::variant change; - ChangeReq(PrimeTable::bucket_iterator it) : change(it) {} - ChangeReq(std::string_view key) : change(key) {} + ChangeReq(PrimeTable::bucket_iterator it) : change(it) { + } + ChangeReq(std::string_view key) : change(key) { + } const PrimeTable::bucket_iterator* update() const { return std::get_if(&change); @@ -171,7 +155,7 @@ class DbSlice { * databases. * */ - size_t FlushDb(DbIndex db_ind); + void FlushDb(DbIndex db_ind); EngineShard* shard_owner() { return owner_; @@ -184,7 +168,10 @@ class DbSlice { bool Acquire(IntentLock::Mode m, const KeyLockArgs& lock_args); void Release(IntentLock::Mode m, const KeyLockArgs& lock_args); - void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count); + + void Release(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count) { + db_arr_[db_index]->Release(m, key, count); + } // Returns true if all keys can be locked under m. Does not lock them though. bool CheckLock(IntentLock::Mode m, const KeyLockArgs& lock_args) const; @@ -198,8 +185,7 @@ class DbSlice { } std::pair GetTables(DbIndex id) { - return std::pair(&db_arr_[id]->prime_table, - &db_arr_[id]->expire_table); + return std::pair(&db_arr_[id]->prime, &db_arr_[id]->expire); } // Returns existing keys count in the db. @@ -209,7 +195,7 @@ class DbSlice { void PreUpdate(DbIndex db_ind, PrimeIterator it); void PostUpdate(DbIndex db_ind, PrimeIterator it); - InternalDbStats* MutableStats(DbIndex db_ind) { + DbTableStats* MutableStats(DbIndex db_ind) { return &db_arr_[db_ind]->stats; } @@ -258,22 +244,7 @@ class DbSlice { mutable SliceEvents events_; // we may change this even for const operations. - using LockTable = absl::flat_hash_map; - - struct DbWrapper { - PrimeTable prime_table; - ExpireTable expire_table; - DashTable mcflag_table; - - LockTable lock_table; - - mutable InternalDbStats stats; - ExpireTable::cursor expire_cursor; - - explicit DbWrapper(std::pmr::memory_resource* mr); - }; - - std::vector> db_arr_; + std::vector> db_arr_; // Used in temporary computations in Acquire/Release. absl::flat_hash_set uniq_keys_; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 9e86fb961..befc95741 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -356,6 +356,25 @@ TEST_F(DflyEngineTest, LimitMemory) { } } +TEST_F(DflyEngineTest, FlushAll) { + auto fb0 = pp_->at(0)->LaunchFiber([&] { + Run({"flushall"}); + }); + + auto fb1 = pp_->at(1)->LaunchFiber([&] { + Run({"select", "2"}); + + for (size_t i = 1; i < 100; ++i) { + RespExpr resp = Run({"set", "foo", "bar"}); + ASSERT_EQ(resp, "OK"); + this_fiber::yield(); + } + }); + + fb0.join(); + fb1.join(); +} + // TODO: to test transactions with a single shard since then all transactions become local. // To consider having a parameter in dragonfly engine controlling number of shards // unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case. diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 7661dbc2a..3cef67c2d 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -440,7 +440,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd auto& db_slice = op_args.shard->db_slice(); const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); robj* hset = nullptr; uint8_t* lp = nullptr; @@ -525,7 +525,7 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd robj* hset = co.AsRObj(); unsigned deleted = 0; bool key_remove = false; - InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_bytes -= lpBytes((uint8_t*)hset->ptr); @@ -755,7 +755,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie auto& db_slice = op_args.shard->db_slice(); const auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - InternalDbStats* stats = db_slice.MutableStats(op_args.db_ind); + DbTableStats* stats = db_slice.MutableStats(op_args.db_ind); robj* hset = nullptr; size_t lpb = 0; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 3b5ba5c67..719fc8703 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -145,4 +145,9 @@ TEST_F(RdbTest, ReloadTtl) { EXPECT_LT(990, CheckedInt({"ttl", "key"})); } +TEST_F(RdbTest, SaveFlush) { + Run({"debug", "populate", "1000000"}); + +} + } // namespace dfly diff --git a/src/server/table.cc b/src/server/table.cc new file mode 100644 index 000000000..a92f8c132 --- /dev/null +++ b/src/server/table.cc @@ -0,0 +1,56 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/table.h" + +#include "base/logging.h" + +namespace dfly { + +#define ADD(x) (x) += o.x + +DbTableStats& DbTableStats::operator+=(const DbTableStats& o) { + constexpr size_t kDbSz = sizeof(DbTableStats); + static_assert(kDbSz == 56); + + ADD(inline_keys); + ADD(obj_memory_usage); + ADD(strval_memory_usage); + ADD(listpack_blob_cnt); + ADD(listpack_bytes); + ADD(external_entries); + ADD(external_size); + + return *this; +} + +DbTable::DbTable(std::pmr::memory_resource* mr) + : prime(4, detail::PrimeTablePolicy{}, mr), + expire(0, detail::ExpireTablePolicy{}, mr), + mcflag(0, detail::ExpireTablePolicy{}, mr) { +} + +DbTable::~DbTable() { +} + +void DbTable::Clear() { + prime.size(); + prime.Clear(); + expire.Clear(); + mcflag.Clear(); + stats = DbTableStats{}; +} + +void DbTable::Release(IntentLock::Mode mode, std::string_view key, unsigned count) { + DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key; + + auto it = trans_locks.find(key); + CHECK(it != trans_locks.end()) << key; + it->second.Release(mode, count); + if (it->second.IsFree()) { + trans_locks.erase(it); + } +} + +} // namespace dfly diff --git a/src/server/table.h b/src/server/table.h index c786ee08c..64c7e472d 100644 --- a/src/server/table.h +++ b/src/server/table.h @@ -4,8 +4,14 @@ #pragma once -#include "server/detail/table.h" +#include + +#include +#include + #include "core/expire_period.h" +#include "core/intent_lock.h" +#include "server/detail/table.h" namespace dfly { @@ -28,4 +34,41 @@ inline bool IsValid(ExpireIterator it) { return !it.is_done(); } +struct DbTableStats { + // Number of inline keys. + uint64_t inline_keys = 0; + + // Object memory usage besides hash-table capacity. + // Applies for any non-inline objects. + size_t obj_memory_usage = 0; + size_t strval_memory_usage = 0; + size_t listpack_blob_cnt = 0; + size_t listpack_bytes = 0; + size_t external_entries = 0; + size_t external_size = 0; + + DbTableStats& operator+=(const DbTableStats& o); +}; + +using LockTable = absl::flat_hash_map; + +// A single Db table that represents a table that can be chosen with "SELECT" command. +struct DbTable : boost::intrusive_ref_counter { + PrimeTable prime; + ExpireTable expire; + DashTable mcflag; + + // Contains transaction locks + LockTable trans_locks; + + mutable DbTableStats stats; + ExpireTable::cursor expire_cursor; + + explicit DbTable(std::pmr::memory_resource* mr); + ~DbTable(); + + void Clear(); + void Release(IntentLock::Mode mode, std::string_view key, unsigned count); +}; + } // namespace dfly