mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix: proper memory accounting for objects loaded via streaming (#4774)
The bug: during the loading when appending to the existing object, ItAndUpdater scope did not account for the appended data, and as a result `object_used_memory` and its variation did not account for streamed objects. The fix: to extend the scope of the ItAndUpdater object to cover appends. Added a sanity DCHECK that ensures that object_used_memory is at least as the memory used by a single object. This dcheck fails pre-fix. Fixes #4773 Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
8acf849e9d
commit
753c25e37c
3 changed files with 21 additions and 4 deletions
|
@ -62,6 +62,9 @@ static_assert(kExpireSegmentSize == 23528);
|
|||
|
||||
void AccountObjectMemory(string_view key, unsigned type, int64_t size, DbTable* db) {
|
||||
DCHECK_NE(db, nullptr);
|
||||
if (size == 0)
|
||||
return;
|
||||
|
||||
DbTableStats& stats = db->stats;
|
||||
DCHECK_GE(static_cast<int64_t>(stats.obj_memory_usage) + size, 0)
|
||||
<< "Can't decrease " << size << " from " << stats.obj_memory_usage;
|
||||
|
@ -500,6 +503,8 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
|
|||
PreUpdateBlocking(cntx.db_index, it, key);
|
||||
// PreUpdate() might have caused a deletion of `it`
|
||||
if (res->it.IsOccupied()) {
|
||||
DCHECK_GE(db_arr_[cntx.db_index]->stats.obj_memory_usage, res->it->second.MallocUsed());
|
||||
|
||||
return {{it, exp_it,
|
||||
AutoUpdater({.action = AutoUpdater::DestructorAction::kRun,
|
||||
.db_slice = this,
|
||||
|
|
|
@ -2544,11 +2544,15 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
|
|||
item->val.rdb_type);
|
||||
};
|
||||
|
||||
// The scope is important here, as we need to ensure that the object memory is properly
|
||||
// accounted for.
|
||||
DbSlice::ItAndUpdater append_res;
|
||||
|
||||
// If we're appending the item to an existing key, first load the
|
||||
// object.
|
||||
if (item->load_config.append) {
|
||||
auto res = db_slice->FindMutable(db_cntx, item->key);
|
||||
if (!IsValid(res.it)) {
|
||||
append_res = db_slice->FindMutable(db_cntx, item->key);
|
||||
if (!IsValid(append_res.it)) {
|
||||
// If the item has expired we may not find the key. Note if the key
|
||||
// is found, but expired since we started loading, we still append to
|
||||
// avoid an inconsistent state where only part of the key is loaded.
|
||||
|
@ -2557,7 +2561,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
|
|||
}
|
||||
return;
|
||||
}
|
||||
pv_ptr = &res.it->second;
|
||||
pv_ptr = &append_res.it->second;
|
||||
}
|
||||
|
||||
if (ec_ = FromOpaque(item->val, item->load_config, pv_ptr); ec_) {
|
||||
|
@ -2598,7 +2602,7 @@ void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item,
|
|||
return;
|
||||
}
|
||||
|
||||
auto& res = *op_res;
|
||||
DbSlice::ItAndUpdater& res = *op_res;
|
||||
res.it->first.SetSticky(item->is_sticky);
|
||||
if (item->has_mc_flags) {
|
||||
res.it->second.SetFlag(true);
|
||||
|
|
|
@ -582,6 +582,8 @@ TEST_F(RdbTest, LoadHugeSet) {
|
|||
|
||||
ASSERT_EQ(100000, CheckedInt({"scard", "test:0"}));
|
||||
ASSERT_EQ(100000, CheckedInt({"scard", "test:1"}));
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 24'000'000u);
|
||||
}
|
||||
|
||||
// Tests loading a huge hmap, where the map is loaded in multiple partial
|
||||
|
@ -602,6 +604,8 @@ TEST_F(RdbTest, LoadHugeHMap) {
|
|||
|
||||
ASSERT_EQ(100000, CheckedInt({"hlen", "test:0"}));
|
||||
ASSERT_EQ(100000, CheckedInt({"hlen", "test:1"}));
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 29'000'000u);
|
||||
}
|
||||
|
||||
// Tests loading a huge zset, where the zset is loaded in multiple partial
|
||||
|
@ -622,6 +626,8 @@ TEST_F(RdbTest, LoadHugeZSet) {
|
|||
|
||||
ASSERT_EQ(100000, CheckedInt({"zcard", "test:0"}));
|
||||
ASSERT_EQ(100000, CheckedInt({"zcard", "test:1"}));
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 26'000'000u);
|
||||
}
|
||||
|
||||
// Tests loading a huge list, where the list is loaded in multiple partial
|
||||
|
@ -642,6 +648,8 @@ TEST_F(RdbTest, LoadHugeList) {
|
|||
|
||||
ASSERT_EQ(100000, CheckedInt({"llen", "test:0"}));
|
||||
ASSERT_EQ(100000, CheckedInt({"llen", "test:1"}));
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_GT(metrics.db_stats[0].obj_memory_usage, 20'000'000u);
|
||||
}
|
||||
|
||||
// Tests loading a huge stream, where the stream is loaded in multiple partial
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue