From 753c25e37c71419da542cb10a846692593b5146c Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 16 Mar 2025 12:03:42 +0200 Subject: [PATCH] fix: proper memory accounting for objects loaded via streaming (#4774) The bug: during the loading when appending to the existing object, ItAndUpdater scope did not account for the appended data, and as a result `object_used_memory` and its variation did not account for streamed objects. The fix: to extend the scope of the ItAndUpdater object to cover appends. Added a sanity DCHECK that ensures that object_used_memory is at least as the memory used by a single object. This dcheck fails pre-fix. Fixes #4773 Signed-off-by: Roman Gershman --- src/server/db_slice.cc | 5 +++++ src/server/rdb_load.cc | 12 ++++++++---- src/server/rdb_test.cc | 8 ++++++++ 3 files changed, 21 insertions(+), 4 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 023ab2515..1113df694 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -62,6 +62,9 @@ static_assert(kExpireSegmentSize == 23528); void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) { DCHECK_NE(db, nullptr); + if (size == 0) + return; + DbTableStats& stats = db->stats; DCHECK_GE(static_cast(stats.obj_memory_usage) + size, 0) << "Can't decrease " << size << " from " << stats.obj_memory_usage; @@ -500,6 +503,8 @@ OpResult DbSlice::FindMutableInternal(const Context& cntx PreUpdateBlocking(cntx.db_index, it, key); // PreUpdate() might have caused a deletion of `it` if (res->it.IsOccupied()) { + DCHECK_GE(db_arr_[cntx.db_index]->stats.obj_memory_usage, res->it->second.MallocUsed()); + return {{it, exp_it, AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, .db_slice = this, diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 42dcbf924..b90ddd1a1 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2544,11 +2544,15 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item, item->val.rdb_type); }; + // The scope is important here, as we need to ensure that the object memory is properly + // accounted for. + DbSlice::ItAndUpdater append_res; + // If we're appending the item to an existing key, first load the // object. if (item->load_config.append) { - auto res = db_slice->FindMutable(db_cntx, item->key); - if (!IsValid(res.it)) { + append_res = db_slice->FindMutable(db_cntx, item->key); + if (!IsValid(append_res.it)) { // If the item has expired we may not find the key. Note if the key // is found, but expired since we started loading, we still append to // avoid an inconsistent state where only part of the key is loaded. @@ -2557,7 +2561,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item, } return; } - pv_ptr = &res.it->second; + pv_ptr = &append_res.it->second; } if (ec_ = FromOpaque(item->val, item->load_config, pv_ptr); ec_) { @@ -2598,7 +2602,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item, return; } - auto& res = *op_res; + DbSlice::ItAndUpdater& res = *op_res; res.it->first.SetSticky(item->is_sticky); if (item->has_mc_flags) { res.it->second.SetFlag(true); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 747549c06..82bc5cd48 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -582,6 +582,8 @@ TEST_F(RdbTest, LoadHugeSet) { ASSERT_EQ(100000, CheckedInt({"scard", "test:0"})); ASSERT_EQ(100000, CheckedInt({"scard", "test:1"})); + auto metrics = GetMetrics(); + EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 24'000'000u); } // Tests loading a huge hmap, where the map is loaded in multiple partial @@ -602,6 +604,8 @@ TEST_F(RdbTest, LoadHugeHMap) { ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"})); ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"})); + auto metrics = GetMetrics(); + EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 29'000'000u); } // Tests loading a huge zset, where the zset is loaded in multiple partial @@ -622,6 +626,8 @@ TEST_F(RdbTest, LoadHugeZSet) { ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"})); ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"})); + auto metrics = GetMetrics(); + EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 26'000'000u); } // Tests loading a huge list, where the list is loaded in multiple partial @@ -642,6 +648,8 @@ TEST_F(RdbTest, LoadHugeList) { ASSERT_EQ(100000, CheckedInt({"llen", "test:0"})); ASSERT_EQ(100000, CheckedInt({"llen", "test:1"})); + auto metrics = GetMetrics(); + EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 20'000'000u); } // Tests loading a huge stream, where the stream is loaded in multiple partial