diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f587f8bdc..106918c2f 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -153,7 +153,7 @@ jobs: dfly-executable: dragonfly run-only-on-ubuntu-latest: true build-folder-name: build - filter: "not slow" + filter: ${{ matrix.build-type == 'Release' && 'not slow' || '(not slow) and (not opt_only)' }} - name: Upload logs on failure if: failure() uses: actions/upload-artifact@v3 diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 9afbe47a1..0d3ffcf46 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -18,6 +18,7 @@ #include "server/engine_shard_set.h" #include "server/error.h" #include "server/main_service.h" +#include "server/multi_command_squasher.h" #include "server/rdb_load.h" #include "server/server_state.h" #include "server/string_family.h" @@ -68,32 +69,99 @@ struct ValueCompressInfo { size_t compressed_size = 0; }; -void DoPopulateBatch(std::string_view prefix, size_t val_size, bool random_value_str, - const SetCmd::SetParams& params, const PopulateBatch& batch) { - DbContext db_cntx{batch.dbid, 0}; - OpArgs op_args(EngineShard::tlocal(), 0, db_cntx); - SetCmd sg(op_args, false); +std::string GenerateValue(size_t val_size, bool random_value, absl::InsecureBitGen* gen) { + if (random_value) { + return GetRandomHex(*gen, val_size); + } else { + return string(val_size, 'x'); + } +} + +tuple> GeneratePopulateCommand( + string_view type, std::string key, size_t val_size, bool random_value, uint32_t elements, + const CommandRegistry& registry, absl::InsecureBitGen* gen) { + absl::InlinedVector args; + args.push_back(std::move(key)); + + const CommandId* cid = nullptr; + if (type == "STRING") { + cid = registry.Find("SET"); + args.push_back(GenerateValue(val_size, random_value, gen)); + } else if (type == "LIST") { + cid = registry.Find("LPUSH"); + for (uint32_t i = 0; i < elements; ++i) { + args.push_back(GenerateValue(val_size, random_value, gen)); + } + } else if (type == "SET") { + cid = registry.Find("SADD"); + for (size_t i = 0; i < elements; ++i) { + args.push_back(GenerateValue(val_size, random_value, gen)); + } + } else if (type == "HSET") { + cid = registry.Find("HSET"); + for (size_t i = 0; i < elements; ++i) { + args.push_back(GenerateValue(val_size / 2, random_value, gen)); + args.push_back(GenerateValue(val_size / 2, random_value, gen)); + } + } else if (type == "ZSET") { + cid = registry.Find("ZADD"); + for (size_t i = 0; i < elements; ++i) { + args.push_back(absl::StrCat((*gen)() % val_size)); + args.push_back(GenerateValue(val_size, random_value, gen)); + } + } else if (type == "JSON") { + cid = registry.Find("JSON.SET"); + args.push_back("$"); + + string json = "{"; + for (size_t i = 0; i < elements; ++i) { + absl::StrAppend(&json, "\"", i, "\":\"", GenerateValue(val_size, random_value, gen), "\","); + } + json[json.size() - 1] = '}'; // Replace last ',' with '}' + args.push_back(json); + } + return {cid, args}; +} + +void DoPopulateBatch(string_view type, string_view prefix, size_t val_size, bool random_value, + int32_t elements, const PopulateBatch& batch, ServerFamily* sf, + ConnectionContext* cntx) { + boost::intrusive_ptr local_tx = + new Transaction{sf->service().mutable_registry()->Find("EXEC")}; + local_tx->StartMultiNonAtomic(); + boost::intrusive_ptr stub_tx = + new Transaction{local_tx.get(), EngineShard::tlocal()->shard_id(), nullopt}; + absl::InlinedVector args_view; + facade::CapturingReplyBuilder crb; + ConnectionContext local_cntx{cntx, stub_tx.get(), &crb}; absl::InsecureBitGen gen; for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); - string val; - if (random_value_str) { - val = GetRandomHex(gen, val_size); - } else { - val = absl::StrCat("value:", batch.index[i]); - if (val.size() < val_size) { - val.resize(val_size, 'x'); - } + auto [cid, args] = GeneratePopulateCommand(type, std::move(key), val_size, random_value, + elements, *sf->service().mutable_registry(), &gen); + if (!cid) { + LOG_EVERY_N(WARNING, 10'000) << "Unable to find command, was it renamed?"; + break; } - auto res = sg.Set(params, key, val); - if (!res) { - LOG_EVERY_N(WARNING, 10'000) << "Debug populate failed to set value. Status:" << res.status(); - return; + args_view.clear(); + for (auto& arg : args) { + args_view.push_back(absl::MakeSpan(arg)); } + auto args_span = absl::MakeSpan(args_view); + + stub_tx->MultiSwitchCmd(cid); + local_cntx.cid = cid; + crb.SetReplyMode(ReplyMode::NONE); + stub_tx->InitByArgs(local_cntx.conn_state.db_index, args_span); + + sf->service().InvokeCmd(cid, args_span, &local_cntx); } + + local_cntx.Inject(nullptr); + local_tx->UnlockMulti(); } struct ObjHist { @@ -315,13 +383,15 @@ void DebugCmd::Run(CmdArgList args) { " Return sync id and array of number of journal commands executed for each replica flow", "WATCHED", " Shows the watched keys as a result of BLPOP and similar operations.", - "POPULATE [] [] [RAND] [SLOTS start end]", + "POPULATE [prefix] [size] [RAND] [SLOTS start end] [TYPE type] [ELEMENTS elements]", " Create string keys named key: with value value:.", " If is specified then it is used instead of the 'key' prefix.", " If is specified then X character is concatenated multiple times to value:", " to meet value size.", " If RAND is specified then value will be set to random hex string in specified size.", " If SLOTS is specified then create keys only in given slots range.", + " TYPE specifies data type (must be STRING/LIST/SET/HSET/ZSET/JSON), default STRING.", + " ELEMENTS specifies how many sub elements if relevant (like entries in a list / set).", "OBJHIST", " Prints histogram of object sizes.", "STACKTRACE", @@ -498,7 +568,7 @@ void DebugCmd::Load(string_view filename) { } optional DebugCmd::ParsePopulateArgs(CmdArgList args) { - if (args.size() < 2 || args.size() > 8) { + if (args.size() < 2) { cntx_->SendError(UnknownSubCmd("populate", "DEBUG")); return nullopt; } @@ -525,6 +595,22 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args) std::string_view str = ArgS(args, index); if (str == "RAND") { options.populate_random_values = true; + } else if (str == "TYPE") { + if (args.size() < index + 2) { + cntx_->SendError(kSyntaxErr); + return nullopt; + } + ToUpper(&args[++index]); + options.type = ArgS(args, index); + } else if (str == "ELEMENTS") { + if (args.size() < index + 2) { + cntx_->SendError(kSyntaxErr); + return nullopt; + } + if (!absl::SimpleAtoi(ArgS(args, ++index), &options.elements)) { + cntx_->SendError(kSyntaxErr); + return nullopt; + } } else if (str == "SLOTS") { if (args.size() < index + 3) { cntx_->SendError(kSyntaxErr); @@ -605,7 +691,6 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, DbIndex db_indx = cntx_->db_index(); EngineShardSet& ess = *shard_set; std::vector ps(ess.size(), PopulateBatch{db_indx}); - SetCmd::SetParams params; uint64_t index = from; uint64_t to = from + num_of_keys; @@ -640,9 +725,9 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, ++index; if (shard_batch.sz == 32) { - ess.Add(sid, [=] { - DoPopulateBatch(options.prefix, options.val_size, options.populate_random_values, params, - shard_batch); + ess.Add(sid, [this, index, options, shard_batch] { + DoPopulateBatch(options.type, options.prefix, options.val_size, + options.populate_random_values, options.elements, shard_batch, &sf_, cntx_); if (index % 50 == 0) { ThisFiber::Yield(); } @@ -654,8 +739,8 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t num_of_keys, } ess.AwaitRunningOnShardQueue([&](EngineShard* shard) { - DoPopulateBatch(options.prefix, options.val_size, options.populate_random_values, params, - ps[shard->shard_id()]); + DoPopulateBatch(options.type, options.prefix, options.val_size, options.populate_random_values, + options.elements, ps[shard->shard_id()], &sf_, cntx_); // Debug populate does not use transaction framework therefore we call OnCbFinish manually // after running the callback // Note that running debug populate while running flushall/db can cause dcheck fail because the diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 8aa3b8252..8dbfef3bd 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -19,6 +19,8 @@ class DebugCmd { std::string_view prefix{"key"}; uint32_t val_size = 0; bool populate_random_values = false; + std::string_view type{"STRING"}; + uint32_t elements = 1; std::optional slot_range; }; diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index a8f80ab14..800803c88 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -168,7 +168,7 @@ class DflyInstance: proc.kill() else: proc.terminate() - proc.communicate(timeout=15) + proc.communicate(timeout=30) except subprocess.TimeoutExpired: # We need to send SIGUSR1 to DF such that it prints the stacktrace proc.send_signal(signal.SIGUSR1) diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py new file mode 100644 index 000000000..9c25d36a2 --- /dev/null +++ b/tests/dragonfly/memory_test.py @@ -0,0 +1,40 @@ +import pytest +from redis import asyncio as aioredis +from .utility import * + + +@pytest.mark.opt_only +@pytest.mark.parametrize( + "type, keys, val_size, elements", + [ + ("JSON", 300_000, 100, 100), + ("SET", 500_000, 100, 100), + ("HSET", 500_000, 100, 100), + ("ZSET", 400_000, 100, 100), + ("LIST", 500_000, 100, 100), + ("STRING", 10_000_000, 1000, 1), + ], +) +async def test_rss_used_mem_gap(df_local_factory, type, keys, val_size, elements): + # Create a Dragonfly and fill it up with `type` until it reaches `min_rss`, then make sure that + # the gap between used_memory and rss is no more than `max_unaccounted_ratio`. + min_rss = 5 * 1024 * 1024 * 1024 # 5gb + max_unaccounted = 200 * 1024 * 1024 # 200mb + + df_server = df_local_factory.create() + df_local_factory.start_all([df_server]) + client = df_server.client() + await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly + + cmd = f"DEBUG POPULATE {keys} {type} {val_size} RAND TYPE {type} ELEMENTS {elements}" + print(f"Running {cmd}") + await client.execute_command(cmd) + + await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly + + info = await client.info("memory") + print(f'Used memory {info["used_memory"]}, rss {info["used_memory_rss"]}') + assert info["used_memory"] > min_rss, "Weak testcase: too little used memory" + assert info["used_memory_rss"] - info["used_memory"] < max_unaccounted + + await disconnect_clients(client)