From 51a78a3ad3aed89a3ab30b308b277eb2ca793f52 Mon Sep 17 00:00:00 2001 From: Boaz Sade Date: Sun, 1 Jan 2023 18:50:57 +0200 Subject: [PATCH] feat(server): using memory defrag with per shard mem info (#616) Signed-off-by: Boaz Sade --- src/core/compact_object_test.cc | 140 ++++++++++++++++++++++++++++++++ src/redis/zmalloc.h | 7 ++ src/redis/zmalloc_mi.c | 35 ++++++++ src/server/dragonfly_test.cc | 44 ++++------ src/server/engine_shard_set.cc | 80 +++++++++++------- src/server/engine_shard_set.h | 5 +- 6 files changed, 252 insertions(+), 59 deletions(-) diff --git a/src/core/compact_object_test.cc b/src/core/compact_object_test.cc index 74d6a507b..0b9b4e02c 100644 --- a/src/core/compact_object_test.cc +++ b/src/core/compact_object_test.cc @@ -108,6 +108,146 @@ class CompactObjectTest : public ::testing::Test { string tmp_; }; +TEST_F(CompactObjectTest, WastedMemoryDetection) { + mi_option_set(mi_option_decommit_delay, 0); + + size_t allocated = 0, commited = 0, wasted = 0; + // By setting the threshold to high value we are expecting + // To find locations where we have wasted memory + float ratio = 0.8; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, 0); + EXPECT_EQ(commited, 0); + EXPECT_EQ(wasted, (commited - allocated)); + + std::size_t allocated_mem = 64; + auto* myheap = mi_heap_get_backing(); + + void* p1 = mi_heap_malloc(myheap, 64); + + void* ptrs_end[50]; + for (size_t i = 0; i < 50; ++i) { + ptrs_end[i] = mi_heap_malloc(myheap, 128); + allocated_mem += 128; + } + + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, allocated_mem); + EXPECT_GT(commited, allocated_mem); + EXPECT_EQ(wasted, (commited - allocated)); + void* ptr[50]; + // allocate 50 + for (size_t i = 0; i < 50; ++i) { + ptr[i] = mi_heap_malloc(myheap, 256); + allocated_mem += 256; + } + + // At this point all the blocks has committed > 0 and used > 0 + // and since we expecting to find these locations, the size of + // wasted == commited memory - allocated memory. + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, allocated_mem); + EXPECT_GT(commited, allocated_mem); + EXPECT_EQ(wasted, (commited - allocated)); + + // free 50/50 - + for (size_t i = 0; i < 50; ++i) { + mi_free(ptr[i]); + allocated_mem -= 256; + } + + // After all the memory at block size 256 is free, we would have commited there + // but the used is expected to be 0, so the number now is different from the + // case above + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, allocated_mem); + EXPECT_GT(commited, allocated_mem); + // since we release all 256 memory block, it should not be counted + EXPECT_EQ(wasted, (commited - allocated)); + for (size_t i = 0; i < 50; ++i) { + mi_free(ptrs_end[i]); + } + mi_free(p1); + + // Now that its all freed, we are not expecting to have any wasted memory any more + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, 0); + EXPECT_GT(commited, allocated); + EXPECT_EQ(wasted, (commited - allocated)); + + mi_collect(false); +} + +TEST_F(CompactObjectTest, WastedMemoryDontCount) { + // The commited memory per blocks are: + // 64bit => 4K + // 128bit => 8k + // 256 => 16k + // and so on, which mean every n * sizeof(ptr) ^ 2 == 2^11*2*(n-1) (where n starts with 1) + constexpr std::size_t kExpectedFor256MemWasted = 0x4000; // memory block 256 + mi_option_set(mi_option_decommit_delay, 0); + auto* myheap = mi_heap_get_backing(); + + size_t allocated = 0, commited = 0, wasted = 0; + // By setting the threshold to a very low number + // we don't expect to find and locations where memory is wasted + float ratio = 0.01; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + EXPECT_EQ(allocated, 0); + EXPECT_EQ(commited, 0); + EXPECT_EQ(wasted, (commited - allocated)); + + std::size_t allocated_mem = 64; + + void* p1 = mi_heap_malloc(myheap, 64); + + void* ptrs_end[50]; + for (size_t i = 0; i < 50; ++i) { + ptrs_end[i] = mi_heap_malloc(myheap, 128); + (void)p1; + allocated_mem += 128; + } + + void* ptr[50]; + + // allocate 50 + for (size_t i = 0; i < 50; ++i) { + ptr[i] = mi_heap_malloc(myheap, 256); + allocated_mem += 256; + } + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + // Threshold is low so we are not expecting any wasted memory to be found. + EXPECT_EQ(allocated, allocated_mem); + EXPECT_GT(commited, allocated_mem); + EXPECT_EQ(wasted, 0); + + // free 50/50 - + for (size_t i = 0; i < 50; ++i) { + mi_free(ptr[i]); + allocated_mem -= 256; + } + allocated = commited = wasted = 0; + zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted); + + EXPECT_EQ(allocated, allocated_mem); + EXPECT_GT(commited, allocated_mem); + // We will detect only wasted memory for block size of + // 256 - and all of it is wasted. + EXPECT_EQ(wasted, kExpectedFor256MemWasted); + // Threshold is low so we are not expecting any wasted memory to be found. + for (size_t i = 0; i < 50; ++i) { + mi_free(ptrs_end[i]); + } + mi_free(p1); + + mi_collect(false); +} + TEST_F(CompactObjectTest, Basic) { robj* rv = createRawStringObject("foo", 3); cobj_.ImportRObj(rv); diff --git a/src/redis/zmalloc.h b/src/redis/zmalloc.h index 01a876918..742530a77 100644 --- a/src/redis/zmalloc.h +++ b/src/redis/zmalloc.h @@ -111,6 +111,13 @@ size_t zmalloc_get_smap_bytes_by_field(char *field, long pid); size_t zmalloc_get_memory_size(void); size_t zmalloc_usable_size(const void* p); +/* get the memory usage + the number of wasted locations of memory +Based on a given threshold (ratio < 1). +Note that if a block is not used, it would not counted as wasted +*/ +int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t* commited, + size_t* wasted); + /* * checks whether a page that the pointer ptr located at is underutilized. * This uses the current local thread heap. diff --git a/src/redis/zmalloc_mi.c b/src/redis/zmalloc_mi.c index 8f9e5140c..073c50444 100644 --- a/src/redis/zmalloc_mi.c +++ b/src/redis/zmalloc_mi.c @@ -108,6 +108,13 @@ typedef struct Sum_s { size_t comitted; } Sum_t; +typedef struct { + size_t allocated; + size_t comitted; + size_t wasted; + float ratio; +} MemUtilized_t; + bool heap_visit_cb(const mi_heap_t* heap, const mi_heap_area_t* area, void* block, size_t block_size, void* arg) { assert(area->used < (1u << 31)); @@ -117,7 +124,23 @@ bool heap_visit_cb(const mi_heap_t* heap, const mi_heap_area_t* area, void* bloc // mimalloc mistakenly exports used in blocks instead of bytes. sum->allocated += block_size * area->used; sum->comitted += area->committed; + return true; // continue iteration +}; +bool heap_count_wasted_blocks(const mi_heap_t* heap, const mi_heap_area_t* area, void* block, + size_t block_size, void* arg) { + assert(area->used < (1u << 31)); + + MemUtilized_t* sum = (MemUtilized_t*)arg; + + // mimalloc mistakenly exports used in blocks instead of bytes. + size_t used = block_size * area->used; + sum->allocated += used; + sum->comitted += area->committed; + + if (used < area->committed * sum->ratio) { + sum->wasted += (area->committed - used); + } return true; // continue iteration }; @@ -132,6 +155,18 @@ int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* reside return 1; } +int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t* commited, + size_t* wasted) { + MemUtilized_t sum = {.allocated = 0, .comitted = 0, .wasted = 0, .ratio = ratio}; + + mi_heap_visit_blocks(zmalloc_heap, false /* visit all blocks*/, heap_count_wasted_blocks, &sum); + *allocated = sum.allocated; + *commited = sum.comitted; + *wasted = sum.wasted; + + return 1; +} + void init_zmalloc_threadlocal(void* heap) { if (zmalloc_heap) return; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 5f58abb19..3d2d2c4ed 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -20,7 +20,7 @@ extern "C" { #include "server/main_service.h" #include "server/test_utils.h" -ABSL_DECLARE_FLAG(float, commit_use_threshold); +ABSL_DECLARE_FLAG(float, mem_defrag_threshold); namespace dfly { @@ -788,32 +788,22 @@ TEST_F(DflyEngineTest, Issue706) { } TEST_F(DefragDflyEngineTest, TestDefragOption) { - absl::SetFlag(&FLAGS_commit_use_threshold, 1.1); - // Fill data into dragonfly and then check if we have - // any location in memory to defrag. See issue #448 for details about this. + absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.02); + // Fill data into dragonfly and then check if we have + // any location in memory to defrag. See issue #448 for details about this. constexpr size_t kMaxMemoryForTest = 1'100'000; constexpr int kNumberOfKeys = 1'000; // this fill the memory constexpr int kKeySize = 637; - constexpr int kMaxDefragTriesForTests = 10; - constexpr int kFactor = 10; - constexpr int kMaxNumKeysToDelete = 100; + constexpr int kMaxDefragTriesForTests = 30; + constexpr int kFactor = 4; max_memory_limit = kMaxMemoryForTest; // control memory size so no need for too many keys - shard_set->TEST_EnableHeartBeat(); // enable use memory update (used_mem_current) - std::vector keys2delete; keys2delete.push_back("del"); - // Generate a list of keys that would be deleted - // The keys that we will delete are all in the form of "key-name:1" - // This is because we are populating keys that has this format, but we don't want - // to delete all keys, only some random keys so we deleting those that start with 1 - int current_step = kFactor; - for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) { - for (; i < current_step; i++) { - int j = i - 1 + current_step; - keys2delete.push_back("key-name:" + std::to_string(j)); - } + // create keys that we would like to remove, try to make it none adjusting locations + for (int i = 0; i < kNumberOfKeys; i += kFactor) { + keys2delete.push_back("key-name:" + std::to_string(i)); } std::vector keys(keys2delete.begin(), keys2delete.end()); @@ -829,20 +819,21 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { EngineShard* shard = EngineShard::tlocal(); ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty! fibers_ext::SleepFor(100ms); - auto mem_used = 0; + // make sure that the task that collect memory usage from all shard ran // for at least once, and that no defrag was done yet. - for (int i = 0; i < 3 && mem_used == 0; i++) { + auto stats = shard->stats(); + for (int i = 0; i < 3; i++) { fibers_ext::SleepFor(100ms); - EXPECT_EQ(shard->stats().defrag_realloc_total, 0); - mem_used = used_mem_current.load(memory_order_relaxed); + EXPECT_EQ(stats.defrag_realloc_total, 0); } }); ArgSlice delete_cmd(keys); r = CheckedInt(delete_cmd); + LOG(WARNING) << "finish deleting memory entries " << r; // the first element in this is the command del so size is one less - ASSERT_EQ(r, kMaxNumKeysToDelete - 1); + ASSERT_EQ(r, keys2delete.size() - 1); // At this point we need to see whether we did running the task and whether the task did something shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) { EngineShard* shard = EngineShard::tlocal(); @@ -850,11 +841,8 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) { // a "busy wait" to ensure that memory defragmentations was successful: // the task ran and did it work auto stats = shard->stats(); - for (int i = 0; i < kMaxDefragTriesForTests; i++) { + for (int i = 0; i < kMaxDefragTriesForTests && stats.defrag_realloc_total == 0; i++) { stats = shard->stats(); - if (stats.defrag_realloc_total > 0) { - break; - } fibers_ext::SleepFor(220ms); } // make sure that we successfully found places to defrag in memory diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 4d6f85d54..1fdd3ea5b 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -35,10 +35,10 @@ ABSL_FLAG(float, mem_defrag_threshold, 0.7, "Minimum percentage of used memory relative to maxmemory cap before running " "defragmentation"); -ABSL_FLAG(float, commit_use_threshold, 1.3, - "The ratio of commited/used memory above which we run defragmentation"); +ABSL_FLAG(float, mem_defrag_waste_threshold, 0.2, + "The ratio of wasted/commited memory above which we run defragmentation"); -ABSL_FLAG(float, mem_utilization_threshold, 0.8, +ABSL_FLAG(float, mem_defrag_page_utilization_threshold, 0.8, "memory page under utilization threshold. Ratio between used and commited size, below " "this, memory in this page will defragmented"); namespace dfly { @@ -54,6 +54,24 @@ constexpr uint64_t kCursorDoneState = 0u; vector cached_stats; // initialized in EngineShardSet::Init +struct ShardMemUsage { + std::size_t commited = 0; + std::size_t used = 0; + std::size_t wasted_mem = 0; +}; + +std::ostream& operator<<(std::ostream& os, const ShardMemUsage& mem) { + return os << "commited: " << mem.commited << " vs used " << mem.used << ", wasted memory " + << mem.wasted_mem; +} + +ShardMemUsage ReadShardMemUsage(float wasted_ratio) { + ShardMemUsage usage; + zmalloc_get_allocator_wasted_blocks(wasted_ratio, &usage.used, &usage.commited, + &usage.wasted_mem); + return usage; +} + } // namespace constexpr size_t kQueueLen = 64; @@ -72,37 +90,39 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) return *this; } +void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) { + cursor = cursor_val; + underutilized_found = false; +} + // This function checks 3 things: // 1. Don't try memory fragmentation if we don't use "enough" memory (control by // mem_defrag_threshold flag) -// 2. That we had change in memory usage - to prevent endless loop - out of scope for now +// 2. We have memory blocks that can be better utilized (there is a "wasted memory" in them). // 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory -// (control by commit_use_threshold flag) -bool EngineShard::DefragTaskState::IsRequired() const { - const int64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold); - const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold); - - if (cursor > kCursorDoneState) { +// (control by mem_defrag_waste_threshold flag) +bool EngineShard::DefragTaskState::CheckRequired() { + if (cursor > kCursorDoneState || underutilized_found) { + VLOG(1) << "Already found memory utilization issue - cursor: " << cursor + << " and underutilized_found " << underutilized_found; return true; } + const std::size_t memory_per_shard = max_memory_limit / shard_set->size(); - // can be negative due to weird accounting of mimalloc. - int64_t commited = GetMallocCurrentCommitted(); + const std::size_t threshold_mem = memory_per_shard * GetFlag(FLAGS_mem_defrag_threshold); + const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold); - uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed); + ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold)); - // we want to make sure that we are not running this to many times - i.e. - // if there was no change to the memory, don't run this - if (threshold_mem < commited && mem_in_use > 0 && - (uint64_t(mem_in_use * commit_use_threshold) < uint64_t(commited))) { - // we have way more commited then actual usage - return true; + if (threshold_mem < usage.commited && + usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) { + VLOG(1) << "memory issue found for memory " << usage; + underutilized_found = true; } return false; } -// for now this does nothing bool EngineShard::DoDefrag() { // -------------------------------------------------------------------------- // NOTE: This task is running with exclusive access to the shard. @@ -111,8 +131,8 @@ bool EngineShard::DoDefrag() { // context of the controlling thread will access this shard! // -------------------------------------------------------------------------- - constexpr size_t kMaxTraverses = 50; - const float threshold = GetFlag(FLAGS_mem_utilization_threshold); + constexpr size_t kMaxTraverses = 40; + const float threshold = GetFlag(FLAGS_mem_defrag_page_utilization_threshold); auto& slice = db_slice(); DCHECK(slice.IsDbValid(kDefaultDbIndex)); @@ -135,15 +155,15 @@ bool EngineShard::DoDefrag() { traverses_count++; } while (traverses_count < kMaxTraverses && cur); - defrag_state_.cursor = cur.value(); + defrag_state_.UpdateScanState(cur.value()); if (reallocations > 0) { VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations << " times, did it in " << traverses_count << " cursor is at the " - << (defrag_state_.cursor == 0 ? "end" : "in progress"); + << (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress"); } else { VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count << " times out of maximum " << kMaxTraverses << ", with cursor at " - << (defrag_state_.cursor == 0 ? "end" : "in progress") + << (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress") << " but no location for defrag were found"; } stats_.defrag_realloc_total += reallocations; @@ -155,8 +175,7 @@ bool EngineShard::DoDefrag() { // the memory defragmentation task is as follow: // 1. Check if memory usage is high enough // 2. Check if diff between commited and used memory is high enough -// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO -// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to +// 3. if all the above pass -> scan this shard and try to find whether we can move pointer to // underutilized pages values // if the cursor returned from scan is not in done state, schedule the task to run at high // priority. @@ -164,14 +183,15 @@ bool EngineShard::DoDefrag() { uint32_t EngineShard::DefragTask() { constexpr uint32_t kRunAtLowPriority = 0u; const auto shard_id = db_slice().shard_id(); - if (defrag_state_.IsRequired()) { - VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor; + + if (defrag_state_.CheckRequired()) { + VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor + << ", underutilzation found: " << defrag_state_.underutilized_found; if (DoDefrag()) { // we didn't finish the scan return util::ProactorBase::kOnIdleMaxLevel; } } - return kRunAtLowPriority; } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 804d83bdf..16acb37c3 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -150,10 +150,13 @@ class EngineShard { struct DefragTaskState { // we will add more data members later uint64_t cursor = 0u; + bool underutilized_found = false; // check the current threshold and return true if // we need to do the de-fermentation - bool IsRequired() const; + bool CheckRequired(); + + void UpdateScanState(uint64_t cursor_val); }; EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);