mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: stream memory counting during snapshot loading (#4346)
* fix: stream memory counting during snapshot loading
This commit is contained in:
parent
9fbb301988
commit
5b9c7e415a
5 changed files with 73 additions and 32 deletions
|
@ -802,6 +802,25 @@ TEST_F(DflyEngineTest, DebugObject) {
|
||||||
EXPECT_THAT(resp.GetString(), HasSubstr("encoding:listpack"));
|
EXPECT_THAT(resp.GetString(), HasSubstr("encoding:listpack"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DflyEngineTest, StreamMemInfo) {
|
||||||
|
for (int i = 1; i < 2; ++i) {
|
||||||
|
Run({"XADD", "test", std::to_string(i), "var", "val" + std::to_string(i)});
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t stream_mem_first = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM];
|
||||||
|
EXPECT_GT(stream_mem_first, 0);
|
||||||
|
|
||||||
|
auto dump = Run({"dump", "test"});
|
||||||
|
Run({"del", "test"});
|
||||||
|
Run({"restore", "test", "0", facade::ToSV(dump.GetBuf())});
|
||||||
|
|
||||||
|
int64_t stream_mem_second = GetMetrics().db_stats[0].memory_usage_by_type[OBJ_STREAM];
|
||||||
|
|
||||||
|
// stream_mem_first != stream_mem_second due to a preallocation in XADD command (see
|
||||||
|
// STREAM_LISTPACK_MAX_PRE_ALLOCATE)
|
||||||
|
EXPECT_GT(stream_mem_second, 0);
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: to test transactions with a single shard since then all transactions become local.
|
// TODO: to test transactions with a single shard since then all transactions become local.
|
||||||
// To consider having a parameter in dragonfly engine controlling number of shards
|
// To consider having a parameter in dragonfly engine controlling number of shards
|
||||||
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
|
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.
|
||||||
|
|
|
@ -50,6 +50,7 @@ extern "C" {
|
||||||
#include "server/serializer_commons.h"
|
#include "server/serializer_commons.h"
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
#include "server/set_family.h"
|
#include "server/set_family.h"
|
||||||
|
#include "server/stream_family.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "strings/human_readable.h"
|
#include "strings/human_readable.h"
|
||||||
|
|
||||||
|
@ -703,6 +704,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
||||||
|
|
||||||
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
||||||
stream* s;
|
stream* s;
|
||||||
|
StreamMemTracker mem_tracker;
|
||||||
if (config_.append) {
|
if (config_.append) {
|
||||||
if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) {
|
if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) {
|
||||||
return;
|
return;
|
||||||
|
@ -848,6 +850,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
||||||
if (!config_.append) {
|
if (!config_.append) {
|
||||||
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
|
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
|
||||||
}
|
}
|
||||||
|
mem_tracker.UpdateStreamSize(*pv_);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
||||||
|
|
|
@ -27,6 +27,18 @@ namespace dfly {
|
||||||
using namespace facade;
|
using namespace facade;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
StreamMemTracker::StreamMemTracker() {
|
||||||
|
start_size_ = zmalloc_used_memory_tl;
|
||||||
|
}
|
||||||
|
|
||||||
|
void StreamMemTracker::UpdateStreamSize(PrimeValue& pv) const {
|
||||||
|
const size_t current = zmalloc_used_memory_tl;
|
||||||
|
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
|
||||||
|
pv.AddStreamSize(diff);
|
||||||
|
// Under any flow we must not end up with this special value.
|
||||||
|
DCHECK(pv.MallocUsed() != 0);
|
||||||
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
struct Record {
|
struct Record {
|
||||||
|
@ -612,24 +624,6 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
class StreamMemTracker {
|
|
||||||
public:
|
|
||||||
StreamMemTracker() {
|
|
||||||
start_size_ = zmalloc_used_memory_tl;
|
|
||||||
}
|
|
||||||
|
|
||||||
void UpdateStreamSize(PrimeValue& pv) const {
|
|
||||||
const size_t current = zmalloc_used_memory_tl;
|
|
||||||
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
|
|
||||||
pv.AddStreamSize(diff);
|
|
||||||
// Under any flow we must not end up with this special value.
|
|
||||||
DCHECK(pv.MallocUsed() != 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
size_t start_size_{0};
|
|
||||||
};
|
|
||||||
|
|
||||||
OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
|
OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
|
||||||
DCHECK(!args.empty() && args.size() % 2 == 0);
|
DCHECK(!args.empty() && args.size() % 2 == 0);
|
||||||
auto& db_slice = op_args.GetDbSlice();
|
auto& db_slice = op_args.GetDbSlice();
|
||||||
|
|
|
@ -15,6 +15,19 @@ namespace dfly {
|
||||||
class CommandRegistry;
|
class CommandRegistry;
|
||||||
struct CommandContext;
|
struct CommandContext;
|
||||||
|
|
||||||
|
class CompactObj;
|
||||||
|
using PrimeValue = CompactObj;
|
||||||
|
|
||||||
|
class StreamMemTracker {
|
||||||
|
public:
|
||||||
|
StreamMemTracker();
|
||||||
|
|
||||||
|
void UpdateStreamSize(PrimeValue& pv) const;
|
||||||
|
|
||||||
|
private:
|
||||||
|
size_t start_size_{0};
|
||||||
|
};
|
||||||
|
|
||||||
class StreamFamily {
|
class StreamFamily {
|
||||||
public:
|
public:
|
||||||
static void Register(CommandRegistry* registry);
|
static void Register(CommandRegistry* registry);
|
||||||
|
|
|
@ -6,6 +6,7 @@ from . import dfly_args
|
||||||
from .instance import DflyInstance, DflyInstanceFactory
|
from .instance import DflyInstance, DflyInstanceFactory
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.slow
|
||||||
@pytest.mark.opt_only
|
@pytest.mark.opt_only
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
"type, keys, val_size, elements",
|
"type, keys, val_size, elements",
|
||||||
|
@ -23,7 +24,10 @@ from .instance import DflyInstance, DflyInstanceFactory
|
||||||
# memory it might force the gh runner to run out of memory (since OOM killer might not even
|
# memory it might force the gh runner to run out of memory (since OOM killer might not even
|
||||||
# get a chance to run).
|
# get a chance to run).
|
||||||
@dfly_args({"proactor_threads": 4, "maxmemory": "5gb"})
|
@dfly_args({"proactor_threads": 4, "maxmemory": "5gb"})
|
||||||
async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, elements):
|
async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements):
|
||||||
|
dbfilename = f"dump_{tmp_file_name()}"
|
||||||
|
instance = df_factory.create(dbfilename=dbfilename)
|
||||||
|
instance.start()
|
||||||
# Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that
|
# Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that
|
||||||
# the gap between used_memory and rss is no more than `max_unaccounted_ratio`.
|
# the gap between used_memory and rss is no more than `max_unaccounted_ratio`.
|
||||||
min_rss = 3 * 1024 * 1024 * 1024 # 3gb
|
min_rss = 3 * 1024 * 1024 * 1024 # 3gb
|
||||||
|
@ -35,7 +39,7 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
|
||||||
if type == "STREAM":
|
if type == "STREAM":
|
||||||
max_unaccounted = max_unaccounted * 3
|
max_unaccounted = max_unaccounted * 3
|
||||||
|
|
||||||
client = df_server.client()
|
client = instance.client()
|
||||||
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
|
await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly
|
||||||
|
|
||||||
cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}"
|
cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}"
|
||||||
|
@ -44,6 +48,7 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
|
||||||
|
|
||||||
await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly
|
await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly
|
||||||
|
|
||||||
|
async def check_memory():
|
||||||
info = await client.info("memory")
|
info = await client.info("memory")
|
||||||
logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
|
logging.info(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}')
|
||||||
assert info["used_memory"] > min_rss, "Weak testcase: too little used memory"
|
assert info["used_memory"] > min_rss, "Weak testcase: too little used memory"
|
||||||
|
@ -58,6 +63,13 @@ async def test_rss_used_mem_gap(df_server: DflyInstance, type, keys, val_size, e
|
||||||
assert info["object_used_memory"] > keys * elements * val_size
|
assert info["object_used_memory"] > keys * elements * val_size
|
||||||
assert info["used_memory"] > info["object_used_memory"]
|
assert info["used_memory"] > info["object_used_memory"]
|
||||||
|
|
||||||
|
await check_memory()
|
||||||
|
|
||||||
|
await client.execute_command("SAVE", "DF")
|
||||||
|
await client.execute_command("DFLY", "LOAD", f"{dbfilename}-summary.dfs")
|
||||||
|
|
||||||
|
await check_memory()
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@dfly_args(
|
@dfly_args(
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue