diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index d20c9f0ff..8e17ed55b 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -46,11 +46,11 @@ static_assert(kPrimeSegmentSize == 32288); // 24576 static_assert(kExpireSegmentSize == 23528); -void AccountObjectMemory(const CompactObj& obj, DbTableStats* stats, int64_t multiplier) { - const int64_t value_heap_size = obj.MallocUsed() * multiplier; - - stats->obj_memory_usage += value_heap_size; - stats->AddTypeMemoryUsage(obj.ObjType(), value_heap_size); +void AccountObjectMemory(unsigned type, int64_t size, DbTableStats* stats) { + DCHECK_GE(static_cast(stats->obj_memory_usage) + size, 0) + << "Can't decrease " << size << " from " << stats->obj_memory_usage; + stats->obj_memory_usage += size; + stats->AddTypeMemoryUsage(type, size); } void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard, @@ -82,8 +82,8 @@ void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* s } stats.inline_keys -= del_it->first.IsInline(); - AccountObjectMemory(del_it->first, &stats, -1); // Key - AccountObjectMemory(del_it->second, &stats, -1); // Value + AccountObjectMemory(del_it->first.ObjType(), -del_it->first.MallocUsed(), &stats); // Key + AccountObjectMemory(del_it->second.ObjType(), -del_it->second.MallocUsed(), &stats); // Value if (pv.ObjType() == OBJ_HASH && pv.Encoding() == kEncodingListPack) { --stats.listpack_blob_cnt; @@ -377,7 +377,7 @@ void DbSlice::AutoUpdater::Run() { DCHECK(fields_.action == DestructorAction::kRun); CHECK_NE(fields_.db_slice, nullptr); - fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.key_existed); + fields_.db_slice->PostUpdate(fields_.db_ind, fields_.it, fields_.key, fields_.orig_heap_size); Cancel(); // Reset to not run again } @@ -387,8 +387,10 @@ void DbSlice::AutoUpdater::Cancel() { DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) { DCHECK(fields_.action == DestructorAction::kRun); + DCHECK(IsValid(fields.it)); fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind); fields_.deletion_count = fields_.db_slice->deletion_count_; + fields_.orig_heap_size = fields.it->second.MallocUsed(); } DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) { @@ -405,7 +407,11 @@ DbSlice::ItAndUpdater DbSlice::FindMutable(const Context& cntx, string_view key) if (IsValid(it)) { PreUpdate(cntx.db_index, it); return {it, exp_it, - AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})}; + AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = it, + .key = key})}; } else { return {it, exp_it, {}}; } @@ -425,7 +431,11 @@ OpResult DbSlice::FindMutable(const Context& cntx, string PreUpdate(cntx.db_index, it); return {{it, exp_it, - AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})}}; + AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = it, + .key = key})}}; } DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) { @@ -534,8 +544,11 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key return {.it = res.it, .exp_it = res.exp_it, .is_new = false, - .post_updater = AutoUpdater( - {AutoUpdater::DestructorAction::kRun, this, cntx.db_index, res.it, key, true})}; + .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = res.it, + .key = key})}; } // It's a new entry. @@ -597,7 +610,7 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key } db.stats.inline_keys += it->first.IsInline(); - AccountObjectMemory(it->first, &db.stats, 1); // Account for key + AccountObjectMemory(it->first.ObjType(), it->first.MallocUsed(), &db.stats); // Account for key DCHECK_EQ(it->second.MallocUsed(), 0UL); // Make sure accounting is no-op it.SetVersion(NextVersion()); @@ -616,8 +629,11 @@ DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key return {.it = it, .exp_it = ExpireIterator{}, .is_new = true, - .post_updater = AutoUpdater( - {AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, false})}; + .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = it, + .key = key})}; } void DbSlice::ActivateDb(DbIndex db_ind) { @@ -999,10 +1015,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) { ccb.second(db_ind, ChangeReq{it}); } - // TODO(#2252): Remove and do accounting only in PostUpdate() auto* stats = MutableStats(db_ind); - AccountObjectMemory(it->second, stats, -1); - if (it->second.ObjType() == OBJ_STRING) { if (it->second.IsExternal()) { // We assume here that the operation code either loaded the entry into memory @@ -1026,10 +1039,11 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) { it.SetVersion(NextVersion()); } -void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, bool existing) { +void DbSlice::PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size) { DbTableStats* stats = MutableStats(db_ind); - AccountObjectMemory(it->second, stats, 1); + int64_t delta = static_cast(it->second.MallocUsed()) - static_cast(orig_size); + AccountObjectMemory(it->second.ObjType(), delta, stats); auto& db = *db_arr_[db_ind]; auto& watched_keys = db.watched_keys; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index e1b143af6..502bbf99d 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -90,11 +90,11 @@ class DbSlice { DbIndex db_ind = 0; PrimeIterator it; std::string_view key; - bool key_existed = false; + // The following fields are calculated at init time size_t db_size = 0; size_t deletion_count = 0; - // TODO(#2252): Add heap size here, and only update memory in d'tor + size_t orig_heap_size = 0; }; AutoUpdater(const Fields& fields); @@ -311,11 +311,6 @@ class DbSlice { size_t DbSize(DbIndex db_ind) const; // Callback functions called upon writing to the existing key. - // TODO(#2252): Remove these (or make them private) - void PreUpdate(DbIndex db_ind, PrimeIterator it); - void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, - bool existing_entry = true); - DbTableStats* MutableStats(DbIndex db_ind) { return &db_arr_[db_ind]->stats; } @@ -397,6 +392,9 @@ class DbSlice { void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table); private: + void PreUpdate(DbIndex db_ind, PrimeIterator it); + void PostUpdate(DbIndex db_ind, PrimeIterator it, std::string_view key, size_t orig_size); + // Releases a single key. `key` must have been normalized by GetLockKey(). void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key, unsigned count); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index f543fe4b3..c04189f36 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -277,7 +277,6 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, CompactObj cobj; cobj.SetInt(incr); - // AddNew calls PostUpdate inside. try { res = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0); } catch (bad_alloc&) { @@ -449,7 +448,6 @@ OpResult> OpThrottle(const OpArgs& op_args, const string_view CompactObj cobj; cobj.SetInt(new_tat_ms); - // AddNew calls PostUpdate inside. try { db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), new_tat_ms); } catch (bad_alloc&) { diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index a545ab921..8cffaedd8 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1275,7 +1275,6 @@ ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key, bo range_spec.interval = ZSetFamily::TopNScored(1); DVLOG(2) << "popping from " << key << " " << t->DebugId(); - db_slice.PreUpdate(t->GetDbIndex(), it); PrimeValue& pv = it->second; IntervalVisitor iv{Action::POP, range_spec.params, &pv}; diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 4c556f7e0..9a3867fa8 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -254,13 +254,58 @@ class TestDflySnapshotOnShutdown(SnapshotTestBase): def setup(self, tmp_dir: Path): self.tmp_dir = tmp_dir + async def _get_info_memory_fields(self, client): + res = await client.execute_command("INFO MEMORY") + fields = {} + for line in res.decode("ascii").splitlines(): + if line.startswith("#"): + continue + k, v = line.split(":") + if k == "object_used_memory" or k.startswith("type_used_memory_"): + fields.update({k: int(v)}) + return fields + + async def _delete_all_keys(self, client): + # Delete all keys from all DBs + for i in range(0, SEEDER_ARGS["dbcount"]): + await client.select(i) + while True: + keys = await client.keys("*") + if len(keys) == 0: + break + await client.delete(*keys) + + @pytest.mark.asyncio + async def test_memory_counters(self, df_seeder_factory, df_server): + a_client = aioredis.Redis(port=df_server.port) + + memory_counters = await self._get_info_memory_fields(a_client) + assert memory_counters == {"object_used_memory": 0} + + seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS) + await seeder.run(target_deviation=0.1) + + memory_counters = await self._get_info_memory_fields(a_client) + assert all(value > 0 for value in memory_counters.values()) + + await self._delete_all_keys(a_client) + memory_counters = await self._get_info_memory_fields(a_client) + assert memory_counters == {"object_used_memory": 0} + @pytest.mark.asyncio @pytest.mark.slow async def test_snapshot(self, df_seeder_factory, df_server): + """Checks that: + 1. After reloading the snapshot file the data is the same + 2. Memory counters after loading from snapshot is similar to before creating a snapshot + 3. Memory counters after deleting all keys loaded by snapshot - this validates the memory + counting when loading from snapshot.""" seeder = df_seeder_factory.create(port=df_server.port, **SEEDER_ARGS) await seeder.run(target_deviation=0.1) start_capture = await seeder.capture() + a_client = aioredis.Redis(port=df_server.port) + memory_before = await self._get_info_memory_fields(a_client) df_server.stop() df_server.start() @@ -270,6 +315,16 @@ class TestDflySnapshotOnShutdown(SnapshotTestBase): await a_client.connection_pool.disconnect() assert await seeder.compare(start_capture, port=df_server.port) + memory_after = await self._get_info_memory_fields(a_client) + for counter, value in memory_before.items(): + # Unfortunately memory usage sometimes depends on order of insertion / deletion, so + # it's usually not exactly the same. For the test to be stable we check that it's + # at least 50% that of the original value. + assert memory_after[counter] >= 0.5 * value + + await self._delete_all_keys(a_client) + memory_empty = await self._get_info_memory_fields(a_client) + assert memory_empty == {"object_used_memory": 0} @dfly_args({**BASIC_ARGS, "dbfilename": "test-info-persistence"})