feat(server): using memory defrag with per shard mem info (#616)

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
This commit is contained in:
Boaz Sade 2023-01-01 18:50:57 +02:00 committed by GitHub
parent 1286bac238
commit 51a78a3ad3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 252 additions and 59 deletions

View file

@ -108,6 +108,146 @@ class CompactObjectTest : public ::testing::Test {
string tmp_;
};
TEST_F(CompactObjectTest, WastedMemoryDetection) {
mi_option_set(mi_option_decommit_delay, 0);
size_t allocated = 0, commited = 0, wasted = 0;
// By setting the threshold to high value we are expecting
// To find locations where we have wasted memory
float ratio = 0.8;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, 0);
EXPECT_EQ(commited, 0);
EXPECT_EQ(wasted, (commited - allocated));
std::size_t allocated_mem = 64;
auto* myheap = mi_heap_get_backing();
void* p1 = mi_heap_malloc(myheap, 64);
void* ptrs_end[50];
for (size_t i = 0; i < 50; ++i) {
ptrs_end[i] = mi_heap_malloc(myheap, 128);
allocated_mem += 128;
}
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, allocated_mem);
EXPECT_GT(commited, allocated_mem);
EXPECT_EQ(wasted, (commited - allocated));
void* ptr[50];
// allocate 50
for (size_t i = 0; i < 50; ++i) {
ptr[i] = mi_heap_malloc(myheap, 256);
allocated_mem += 256;
}
// At this point all the blocks has committed > 0 and used > 0
// and since we expecting to find these locations, the size of
// wasted == commited memory - allocated memory.
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, allocated_mem);
EXPECT_GT(commited, allocated_mem);
EXPECT_EQ(wasted, (commited - allocated));
// free 50/50 -
for (size_t i = 0; i < 50; ++i) {
mi_free(ptr[i]);
allocated_mem -= 256;
}
// After all the memory at block size 256 is free, we would have commited there
// but the used is expected to be 0, so the number now is different from the
// case above
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, allocated_mem);
EXPECT_GT(commited, allocated_mem);
// since we release all 256 memory block, it should not be counted
EXPECT_EQ(wasted, (commited - allocated));
for (size_t i = 0; i < 50; ++i) {
mi_free(ptrs_end[i]);
}
mi_free(p1);
// Now that its all freed, we are not expecting to have any wasted memory any more
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, 0);
EXPECT_GT(commited, allocated);
EXPECT_EQ(wasted, (commited - allocated));
mi_collect(false);
}
TEST_F(CompactObjectTest, WastedMemoryDontCount) {
// The commited memory per blocks are:
// 64bit => 4K
// 128bit => 8k
// 256 => 16k
// and so on, which mean every n * sizeof(ptr) ^ 2 == 2^11*2*(n-1) (where n starts with 1)
constexpr std::size_t kExpectedFor256MemWasted = 0x4000; // memory block 256
mi_option_set(mi_option_decommit_delay, 0);
auto* myheap = mi_heap_get_backing();
size_t allocated = 0, commited = 0, wasted = 0;
// By setting the threshold to a very low number
// we don't expect to find and locations where memory is wasted
float ratio = 0.01;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, 0);
EXPECT_EQ(commited, 0);
EXPECT_EQ(wasted, (commited - allocated));
std::size_t allocated_mem = 64;
void* p1 = mi_heap_malloc(myheap, 64);
void* ptrs_end[50];
for (size_t i = 0; i < 50; ++i) {
ptrs_end[i] = mi_heap_malloc(myheap, 128);
(void)p1;
allocated_mem += 128;
}
void* ptr[50];
// allocate 50
for (size_t i = 0; i < 50; ++i) {
ptr[i] = mi_heap_malloc(myheap, 256);
allocated_mem += 256;
}
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
// Threshold is low so we are not expecting any wasted memory to be found.
EXPECT_EQ(allocated, allocated_mem);
EXPECT_GT(commited, allocated_mem);
EXPECT_EQ(wasted, 0);
// free 50/50 -
for (size_t i = 0; i < 50; ++i) {
mi_free(ptr[i]);
allocated_mem -= 256;
}
allocated = commited = wasted = 0;
zmalloc_get_allocator_wasted_blocks(ratio, &allocated, &commited, &wasted);
EXPECT_EQ(allocated, allocated_mem);
EXPECT_GT(commited, allocated_mem);
// We will detect only wasted memory for block size of
// 256 - and all of it is wasted.
EXPECT_EQ(wasted, kExpectedFor256MemWasted);
// Threshold is low so we are not expecting any wasted memory to be found.
for (size_t i = 0; i < 50; ++i) {
mi_free(ptrs_end[i]);
}
mi_free(p1);
mi_collect(false);
}
TEST_F(CompactObjectTest, Basic) {
robj* rv = createRawStringObject("foo", 3);
cobj_.ImportRObj(rv);

View file

@ -111,6 +111,13 @@ size_t zmalloc_get_smap_bytes_by_field(char *field, long pid);
size_t zmalloc_get_memory_size(void);
size_t zmalloc_usable_size(const void* p);
/* get the memory usage + the number of wasted locations of memory
Based on a given threshold (ratio < 1).
Note that if a block is not used, it would not counted as wasted
*/
int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t* commited,
size_t* wasted);
/*
* checks whether a page that the pointer ptr located at is underutilized.
* This uses the current local thread heap.

View file

@ -108,6 +108,13 @@ typedef struct Sum_s {
size_t comitted;
} Sum_t;
typedef struct {
size_t allocated;
size_t comitted;
size_t wasted;
float ratio;
} MemUtilized_t;
bool heap_visit_cb(const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
assert(area->used < (1u << 31));
@ -117,7 +124,23 @@ bool heap_visit_cb(const mi_heap_t* heap, const mi_heap_area_t* area, void* bloc
// mimalloc mistakenly exports used in blocks instead of bytes.
sum->allocated += block_size * area->used;
sum->comitted += area->committed;
return true; // continue iteration
};
bool heap_count_wasted_blocks(const mi_heap_t* heap, const mi_heap_area_t* area, void* block,
size_t block_size, void* arg) {
assert(area->used < (1u << 31));
MemUtilized_t* sum = (MemUtilized_t*)arg;
// mimalloc mistakenly exports used in blocks instead of bytes.
size_t used = block_size * area->used;
sum->allocated += used;
sum->comitted += area->committed;
if (used < area->committed * sum->ratio) {
sum->wasted += (area->committed - used);
}
return true; // continue iteration
};
@ -132,6 +155,18 @@ int zmalloc_get_allocator_info(size_t* allocated, size_t* active, size_t* reside
return 1;
}
int zmalloc_get_allocator_wasted_blocks(float ratio, size_t* allocated, size_t* commited,
size_t* wasted) {
MemUtilized_t sum = {.allocated = 0, .comitted = 0, .wasted = 0, .ratio = ratio};
mi_heap_visit_blocks(zmalloc_heap, false /* visit all blocks*/, heap_count_wasted_blocks, &sum);
*allocated = sum.allocated;
*commited = sum.comitted;
*wasted = sum.wasted;
return 1;
}
void init_zmalloc_threadlocal(void* heap) {
if (zmalloc_heap)
return;

View file

@ -20,7 +20,7 @@ extern "C" {
#include "server/main_service.h"
#include "server/test_utils.h"
ABSL_DECLARE_FLAG(float, commit_use_threshold);
ABSL_DECLARE_FLAG(float, mem_defrag_threshold);
namespace dfly {
@ -788,32 +788,22 @@ TEST_F(DflyEngineTest, Issue706) {
}
TEST_F(DefragDflyEngineTest, TestDefragOption) {
absl::SetFlag(&FLAGS_commit_use_threshold, 1.1);
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
absl::SetFlag(&FLAGS_mem_defrag_threshold, 0.02);
// Fill data into dragonfly and then check if we have
// any location in memory to defrag. See issue #448 for details about this.
constexpr size_t kMaxMemoryForTest = 1'100'000;
constexpr int kNumberOfKeys = 1'000; // this fill the memory
constexpr int kKeySize = 637;
constexpr int kMaxDefragTriesForTests = 10;
constexpr int kFactor = 10;
constexpr int kMaxNumKeysToDelete = 100;
constexpr int kMaxDefragTriesForTests = 30;
constexpr int kFactor = 4;
max_memory_limit = kMaxMemoryForTest; // control memory size so no need for too many keys
shard_set->TEST_EnableHeartBeat(); // enable use memory update (used_mem_current)
std::vector<std::string> keys2delete;
keys2delete.push_back("del");
// Generate a list of keys that would be deleted
// The keys that we will delete are all in the form of "key-name:1<other digits>"
// This is because we are populating keys that has this format, but we don't want
// to delete all keys, only some random keys so we deleting those that start with 1
int current_step = kFactor;
for (int i = 1; i < kMaxNumKeysToDelete; current_step *= kFactor) {
for (; i < current_step; i++) {
int j = i - 1 + current_step;
keys2delete.push_back("key-name:" + std::to_string(j));
}
// create keys that we would like to remove, try to make it none adjusting locations
for (int i = 0; i < kNumberOfKeys; i += kFactor) {
keys2delete.push_back("key-name:" + std::to_string(i));
}
std::vector<std::string_view> keys(keys2delete.begin(), keys2delete.end());
@ -829,20 +819,21 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
EngineShard* shard = EngineShard::tlocal();
ASSERT_FALSE(shard == nullptr); // we only have one and its should not be empty!
fibers_ext::SleepFor(100ms);
auto mem_used = 0;
// make sure that the task that collect memory usage from all shard ran
// for at least once, and that no defrag was done yet.
for (int i = 0; i < 3 && mem_used == 0; i++) {
auto stats = shard->stats();
for (int i = 0; i < 3; i++) {
fibers_ext::SleepFor(100ms);
EXPECT_EQ(shard->stats().defrag_realloc_total, 0);
mem_used = used_mem_current.load(memory_order_relaxed);
EXPECT_EQ(stats.defrag_realloc_total, 0);
}
});
ArgSlice delete_cmd(keys);
r = CheckedInt(delete_cmd);
LOG(WARNING) << "finish deleting memory entries " << r;
// the first element in this is the command del so size is one less
ASSERT_EQ(r, kMaxNumKeysToDelete - 1);
ASSERT_EQ(r, keys2delete.size() - 1);
// At this point we need to see whether we did running the task and whether the task did something
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, ProactorBase* base) {
EngineShard* shard = EngineShard::tlocal();
@ -850,11 +841,8 @@ TEST_F(DefragDflyEngineTest, TestDefragOption) {
// a "busy wait" to ensure that memory defragmentations was successful:
// the task ran and did it work
auto stats = shard->stats();
for (int i = 0; i < kMaxDefragTriesForTests; i++) {
for (int i = 0; i < kMaxDefragTriesForTests && stats.defrag_realloc_total == 0; i++) {
stats = shard->stats();
if (stats.defrag_realloc_total > 0) {
break;
}
fibers_ext::SleepFor(220ms);
}
// make sure that we successfully found places to defrag in memory

View file

@ -35,10 +35,10 @@ ABSL_FLAG(float, mem_defrag_threshold, 0.7,
"Minimum percentage of used memory relative to maxmemory cap before running "
"defragmentation");
ABSL_FLAG(float, commit_use_threshold, 1.3,
"The ratio of commited/used memory above which we run defragmentation");
ABSL_FLAG(float, mem_defrag_waste_threshold, 0.2,
"The ratio of wasted/commited memory above which we run defragmentation");
ABSL_FLAG(float, mem_utilization_threshold, 0.8,
ABSL_FLAG(float, mem_defrag_page_utilization_threshold, 0.8,
"memory page under utilization threshold. Ratio between used and commited size, below "
"this, memory in this page will defragmented");
namespace dfly {
@ -54,6 +54,24 @@ constexpr uint64_t kCursorDoneState = 0u;
vector<EngineShardSet::CachedStats> cached_stats; // initialized in EngineShardSet::Init
struct ShardMemUsage {
std::size_t commited = 0;
std::size_t used = 0;
std::size_t wasted_mem = 0;
};
std::ostream& operator<<(std::ostream& os, const ShardMemUsage& mem) {
return os << "commited: " << mem.commited << " vs used " << mem.used << ", wasted memory "
<< mem.wasted_mem;
}
ShardMemUsage ReadShardMemUsage(float wasted_ratio) {
ShardMemUsage usage;
zmalloc_get_allocator_wasted_blocks(wasted_ratio, &usage.used, &usage.commited,
&usage.wasted_mem);
return usage;
}
} // namespace
constexpr size_t kQueueLen = 64;
@ -72,37 +90,39 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
return *this;
}
void EngineShard::DefragTaskState::UpdateScanState(uint64_t cursor_val) {
cursor = cursor_val;
underutilized_found = false;
}
// This function checks 3 things:
// 1. Don't try memory fragmentation if we don't use "enough" memory (control by
// mem_defrag_threshold flag)
// 2. That we had change in memory usage - to prevent endless loop - out of scope for now
// 2. We have memory blocks that can be better utilized (there is a "wasted memory" in them).
// 3. in case the above is OK, make sure that we have a "gap" between usage and commited memory
// (control by commit_use_threshold flag)
bool EngineShard::DefragTaskState::IsRequired() const {
const int64_t threshold_mem = max_memory_limit * GetFlag(FLAGS_mem_defrag_threshold);
const double commit_use_threshold = GetFlag(FLAGS_commit_use_threshold);
if (cursor > kCursorDoneState) {
// (control by mem_defrag_waste_threshold flag)
bool EngineShard::DefragTaskState::CheckRequired() {
if (cursor > kCursorDoneState || underutilized_found) {
VLOG(1) << "Already found memory utilization issue - cursor: " << cursor
<< " and underutilized_found " << underutilized_found;
return true;
}
const std::size_t memory_per_shard = max_memory_limit / shard_set->size();
// can be negative due to weird accounting of mimalloc.
int64_t commited = GetMallocCurrentCommitted();
const std::size_t threshold_mem = memory_per_shard * GetFlag(FLAGS_mem_defrag_threshold);
const double waste_threshold = GetFlag(FLAGS_mem_defrag_waste_threshold);
uint64_t mem_in_use = used_mem_current.load(memory_order_relaxed);
ShardMemUsage usage = ReadShardMemUsage(GetFlag(FLAGS_mem_defrag_page_utilization_threshold));
// we want to make sure that we are not running this to many times - i.e.
// if there was no change to the memory, don't run this
if (threshold_mem < commited && mem_in_use > 0 &&
(uint64_t(mem_in_use * commit_use_threshold) < uint64_t(commited))) {
// we have way more commited then actual usage
return true;
if (threshold_mem < usage.commited &&
usage.wasted_mem > (uint64_t(usage.commited * waste_threshold))) {
VLOG(1) << "memory issue found for memory " << usage;
underutilized_found = true;
}
return false;
}
// for now this does nothing
bool EngineShard::DoDefrag() {
// --------------------------------------------------------------------------
// NOTE: This task is running with exclusive access to the shard.
@ -111,8 +131,8 @@ bool EngineShard::DoDefrag() {
// context of the controlling thread will access this shard!
// --------------------------------------------------------------------------
constexpr size_t kMaxTraverses = 50;
const float threshold = GetFlag(FLAGS_mem_utilization_threshold);
constexpr size_t kMaxTraverses = 40;
const float threshold = GetFlag(FLAGS_mem_defrag_page_utilization_threshold);
auto& slice = db_slice();
DCHECK(slice.IsDbValid(kDefaultDbIndex));
@ -135,15 +155,15 @@ bool EngineShard::DoDefrag() {
traverses_count++;
} while (traverses_count < kMaxTraverses && cur);
defrag_state_.cursor = cur.value();
defrag_state_.UpdateScanState(cur.value());
if (reallocations > 0) {
VLOG(1) << "shard " << slice.shard_id() << ": successfully defrag " << reallocations
<< " times, did it in " << traverses_count << " cursor is at the "
<< (defrag_state_.cursor == 0 ? "end" : "in progress");
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress");
} else {
VLOG(1) << "shard " << slice.shard_id() << ": run the defrag " << traverses_count
<< " times out of maximum " << kMaxTraverses << ", with cursor at "
<< (defrag_state_.cursor == 0 ? "end" : "in progress")
<< (defrag_state_.cursor == kCursorDoneState ? "end" : "in progress")
<< " but no location for defrag were found";
}
stats_.defrag_realloc_total += reallocations;
@ -155,8 +175,7 @@ bool EngineShard::DoDefrag() {
// the memory defragmentation task is as follow:
// 1. Check if memory usage is high enough
// 2. Check if diff between commited and used memory is high enough
// 3. Check if we have memory changes (to ensure that we not running endlessly). - TODO
// 4. if all the above pass -> scan this shard and try to find whether we can move pointer to
// 3. if all the above pass -> scan this shard and try to find whether we can move pointer to
// underutilized pages values
// if the cursor returned from scan is not in done state, schedule the task to run at high
// priority.
@ -164,14 +183,15 @@ bool EngineShard::DoDefrag() {
uint32_t EngineShard::DefragTask() {
constexpr uint32_t kRunAtLowPriority = 0u;
const auto shard_id = db_slice().shard_id();
if (defrag_state_.IsRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor;
if (defrag_state_.CheckRequired()) {
VLOG(1) << shard_id << ": need to run defrag memory cursor state: " << defrag_state_.cursor
<< ", underutilzation found: " << defrag_state_.underutilized_found;
if (DoDefrag()) {
// we didn't finish the scan
return util::ProactorBase::kOnIdleMaxLevel;
}
}
return kRunAtLowPriority;
}

View file

@ -150,10 +150,13 @@ class EngineShard {
struct DefragTaskState {
// we will add more data members later
uint64_t cursor = 0u;
bool underutilized_found = false;
// check the current threshold and return true if
// we need to do the de-fermentation
bool IsRequired() const;
bool CheckRequired();
void UpdateScanState(uint64_t cursor_val);
};
EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap);