From 6857209fe27495e742c4e6c1788a56fec2b2b3ec Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Tue, 6 May 2025 09:37:19 +0200 Subject: [PATCH 1/4] feat(memory_cmd): Add WITHOUTKEY option for the MEMORY USAGE command Signed-off-by: Stepan Bagritsevich --- src/server/memory_cmd.cc | 55 +++++++++++++++++++++------------------- src/server/memory_cmd.h | 2 +- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/src/server/memory_cmd.cc b/src/server/memory_cmd.cc index 00d1cb88d..d29941907 100644 --- a/src/server/memory_cmd.cc +++ b/src/server/memory_cmd.cc @@ -82,8 +82,8 @@ std::string MallocStatsCb(bool backing, unsigned tid) { return str; } -size_t MemoryUsage(PrimeIterator it) { - size_t key_size = it->first.MallocUsed(); +size_t MemoryUsage(PrimeIterator it, bool account_key_memory_usage) { + size_t key_size = account_key_memory_usage ? it->first.MallocUsed() : 0; return key_size + it->second.MallocUsed(true); } @@ -95,9 +95,9 @@ MemoryCmd::MemoryCmd(ServerFamily* owner, facade::SinkReplyBuilder* builder, } void MemoryCmd::Run(CmdArgList args) { - string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); + CmdArgParser parser(args); - if (sub_cmd == "HELP") { + if (parser.Check("HELP")) { string_view help_arr[] = { "MEMORY [ ...]. Subcommands are:", "STATS", @@ -110,8 +110,9 @@ void MemoryCmd::Run(CmdArgList args) { "ARENA SHOW", " Prints the arena summary report for the entire process.", " Requires MIMALLOC_VERBOSE=1 environment to be set. The output goes to stdout", - "USAGE ", + "USAGE [WITHOUTKEY]", " Show memory usage of a key.", + " If WITHOUTKEY is specified, the key itself is not accounted.", "DECOMMIT", " Force decommit the memory freed by the server back to OS.", "TRACK", @@ -137,35 +138,36 @@ void MemoryCmd::Run(CmdArgList args) { return rb->SendSimpleStrArr(help_arr); }; - if (sub_cmd == "STATS") { + if (parser.Check("STATS")) { return Stats(); } - if (sub_cmd == "USAGE" && args.size() > 1) { - string_view key = ArgS(args, 1); - return Usage(key); + if (parser.Check("USAGE") && args.size() > 1) { + string_view key = parser.Next(); + bool account_key_memory_usage = !parser.Check("WITHOUTKEY"); + return Usage(key, account_key_memory_usage); } - if (sub_cmd == "DECOMMIT") { + if (parser.Check("DECOMMIT")) { shard_set->pool()->AwaitBrief( [](unsigned, auto* pb) { ServerState::tlocal()->DecommitMemory(ServerState::kAllMemory); }); return builder_->SendSimpleString("OK"); } - if (sub_cmd == "MALLOC-STATS") { + if (parser.Check("MALLOC-STATS")) { return MallocStats(); } - if (sub_cmd == "ARENA") { + if (parser.Check("ARENA")) { return ArenaStats(args); } - if (sub_cmd == "TRACK") { + if (parser.Check("TRACK")) { args.remove_prefix(1); return Track(args); } - if (sub_cmd == "DEFRAGMENT") { + if (parser.Check("DEFRAGMENT")) { shard_set->pool()->DispatchOnAll([](util::ProactorBase*) { if (auto* shard = EngineShard::tlocal(); shard) shard->ForceDefrag(); @@ -173,7 +175,7 @@ void MemoryCmd::Run(CmdArgList args) { return builder_->SendSimpleString("OK"); } - string err = UnknownSubCmd(sub_cmd, "MEMORY"); + string err = UnknownSubCmd(parser.Next(), "MEMORY"); return builder_->SendError(err, kSyntaxErrType); } @@ -346,18 +348,19 @@ void MemoryCmd::ArenaStats(CmdArgList args) { return rb->SendVerbatimString(mi_malloc_info); } -void MemoryCmd::Usage(std::string_view key) { +void MemoryCmd::Usage(std::string_view key, bool account_key_memory_usage) { ShardId sid = Shard(key, shard_set->size()); - ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief([key, this, sid]() -> ssize_t { - auto& db_slice = cntx_->ns->GetDbSlice(sid); - auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); - PrimeIterator it = pt->Find(key); - if (IsValid(it)) { - return MemoryUsage(it); - } else { - return -1; - } - }); + ssize_t memory_usage = shard_set->pool()->at(sid)->AwaitBrief( + [key, account_key_memory_usage, this, sid]() -> ssize_t { + auto& db_slice = cntx_->ns->GetDbSlice(sid); + auto [pt, exp_t] = db_slice.GetTables(cntx_->db_index()); + PrimeIterator it = pt->Find(key); + if (IsValid(it)) { + return MemoryUsage(it, account_key_memory_usage); + } else { + return -1; + } + }); auto* rb = static_cast(builder_); if (memory_usage < 0) diff --git a/src/server/memory_cmd.h b/src/server/memory_cmd.h index 6986e0cae..5850d17d2 100644 --- a/src/server/memory_cmd.h +++ b/src/server/memory_cmd.h @@ -20,7 +20,7 @@ class MemoryCmd { void Stats(); void MallocStats(); void ArenaStats(CmdArgList args); - void Usage(std::string_view key); + void Usage(std::string_view key, bool account_key_memory_usage); void Track(CmdArgList args); ConnectionContext* cntx_; From 2b4b2a4ac63f2145c1f61b0ed785e43bb44692de Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Tue, 6 May 2025 09:39:51 +0200 Subject: [PATCH 2/4] fix(json_family): Fix memory tracking for the JSON.SET command fixes dragonflydb#5054 Signed-off-by: Stepan Bagritsevich --- src/server/CMakeLists.txt | 5 +- src/server/json_family.cc | 315 +++++++++++++++--------- src/server/json_family_memory_test.cc | 108 ++++++++ src/server/search/search_family_test.cc | 14 ++ 4 files changed, 326 insertions(+), 116 deletions(-) create mode 100644 src/server/json_family_memory_test.cc diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index f1e9b6ae0..27236153d 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -118,6 +118,7 @@ cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(geo_family_test dfly_test_lib LABELS DFLY) cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY) +cxx_test(json_family_memory_test dfly_test_lib LABELS DFLY) cxx_test(journal/journal_test dfly_test_lib LABELS DFLY) cxx_test(hll_family_test dfly_test_lib LABELS DFLY) cxx_test(bloom_family_test dfly_test_lib LABELS DFLY) @@ -131,6 +132,7 @@ if (WITH_ASAN OR WITH_USAN) target_compile_definitions(multi_test PRIVATE SANITIZERS) target_compile_definitions(search_family_test PRIVATE SANITIZERS) target_compile_definitions(json_family_test PRIVATE SANITIZERS) + target_compile_definitions(json_family_memory_test PRIVATE SANITIZERS) target_compile_definitions(dragonfly_test PRIVATE SANITIZERS) endif() cxx_test(search/aggregator_test dfly_test_lib LABELS DFLY) @@ -142,4 +144,5 @@ add_dependencies(check_dfly dragonfly_test json_family_test list_family_test generic_family_test memcache_parser_test rdb_test journal_test redis_parser_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test geo_family_test - hll_family_test cluster_config_test cluster_family_test acl_family_test) + hll_family_test cluster_config_test cluster_family_test acl_family_test + json_family_memory_test) diff --git a/src/server/json_family.cc b/src/server/json_family.cc index ac9f2271e..e237615e9 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -82,6 +82,80 @@ class JsonMemTracker { size_t start_size_{0}; }; +/* Helper class which must be initialized before any mutate operations on json. + It will track the memory usage of the json object and update the size in the CompactObj. + It also contains indexes updates, post update operations on the iterator. */ +class JsonAutoUpdater { + public: + JsonAutoUpdater(const OpArgs& op_args, string_view key, DbSlice::ItAndUpdater it, + bool update_on_delete = false) + : op_args_(op_args), key_(key), it_(std::move(it)), update_on_delete_(update_on_delete) { + op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it.it->second); + + /* We need to initialize start memory usage after RemoveDoc because internally RemoveDoc has + static cache that can allocate/deallocate memory. Because of this, we will + overestimate/underestimate memory usage for json object. */ + start_size_ = GetMemoryUsage(); + } + + JsonAutoUpdater(const JsonAutoUpdater&) = delete; + JsonAutoUpdater& operator=(const JsonAutoUpdater&) = delete; + + JsonAutoUpdater(JsonAutoUpdater&&) = default; + JsonAutoUpdater& operator=(JsonAutoUpdater&&) = delete; + + void SetJsonSize() { + set_size_was_called_ = true; + + const size_t current = GetMemoryUsage(); + int64_t diff = static_cast(current) - static_cast(start_size_); + + GetPrimeValue().SetJsonSize(diff); + + // Under any flow we must not end up with this special value. + DCHECK(GetPrimeValue().MallocUsed() != 0); + } + + ~JsonAutoUpdater() { + if (update_on_delete_ && !set_size_was_called_) { + SetJsonSize(); + } else if (!set_size_was_called_) { + LOG(WARNING) << "JsonAutoUpdater destructor called without SetJsonSize() being called. This " + "may lead to memory tracking issues."; + } + + it_.post_updater.Run(); + + /* We need to call AddDoc after SetJsonSize because internally AddDoc has static cache that can + allocate/deallocate memory. Because of this, we will overestimate/underestimate memory usage for + json object. */ + op_args_.shard->search_indices()->AddDoc(key_, op_args_.db_cntx, GetPrimeValue()); + } + + PrimeValue& GetPrimeValue() { + return it_.it->second; + } + + JsonType* GetJson() { + return GetPrimeValue().GetJson(); + } + + private: + size_t GetMemoryUsage() const { + return static_cast(CompactObj::memory_resource())->used(); + } + + private: + const OpArgs& op_args_; + string_view key_; + DbSlice::ItAndUpdater it_; + + // Used to track the memory usage of the json object + size_t start_size_{0}; + bool set_size_was_called_{false}; + bool update_on_delete_; +}; + template using ParseResult = io::Result; ParseResult ParseJsonPathAsExpression(std::string_view path) { @@ -318,28 +392,63 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) { } } +/* Converts a JSONPath to a JSONPointer. + E.g. $[a][b][0] -> /a/b/0. + V1 JSONPath is not supported. */ +std::optional ConvertJsonPathToJsonPointer(string_view json_path) { + auto parsed_path = json::ParsePath(json_path); + + if (!parsed_path) { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Invalid JSONPath."; + return std::nullopt; + } + + std::string pointer; + const auto& path = parsed_path.value(); + for (const auto& node : path) { + const auto& type = node.type(); + if (type == json::SegmentType::IDENTIFIER) { + pointer += '/' + node.identifier(); + } else if (type == json::SegmentType::INDEX) { + const auto& index = node.index(); + + if (index.first != index.second) { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Index range is not supported."; + return std::nullopt; + } + + pointer += '/' + std::to_string(node.index().first); + } else { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Unsupported segment type."; + return std::nullopt; + } + } + + return pointer; +} + // Use this method on the coordinator thread std::optional JsonFromString(std::string_view input) { return dfly::JsonFromString(input, PMR_NS::get_default_resource()); } -// Use this method on the shard thread +/* Use this method on the shard thread + + If you do memory tracking, make sure to initialize it before calling this method, and reset the + result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may + still hold the JSON value, which can lead to incorrect memory tracking. */ std::optional ShardJsonFromString(std::string_view input) { return dfly::JsonFromString(input, CompactObj::memory_resource()); } -OpResult SetJson(const OpArgs& op_args, string_view key, - string_view json_str) { - auto& db_slice = op_args.GetDbSlice(); +OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) { + auto it_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key); + RETURN_ON_BAD_STATUS(it_res); - auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); - RETURN_ON_BAD_STATUS(op_res); - - auto& res = *op_res; - - op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second); - - JsonMemTracker tracker; + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); std::optional parsed_json = ShardJsonFromString(json_str); if (!parsed_json) { @@ -352,15 +461,83 @@ OpResult SetJson(const OpArgs& op_args, string_view key, json::FromJsonType(*parsed_json, &fbb); fbb.Finish(); const auto& buf = fbb.GetBuffer(); - res.it->second.SetJson(buf.data(), buf.size()); + updater.GetPrimeValue().SetJson(buf.data(), buf.size()); } else { - res.it->second.SetJson(std::move(*parsed_json)); + updater.GetPrimeValue().SetJson(std::move(*parsed_json)); } - tracker.SetJsonSize(res.it->second, res.is_new); - op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second); + // We should do reset before setting the size of the json, because + // std::optional still holds the value and it will be deallocated + parsed_json.reset(); + updater.SetJsonSize(); - return std::move(res); + return OpStatus::OK; +} + +/* Sets a partial JSON value at the specified path. + True means that the value was set, false means that the value was not set. */ +OpResult SetPartialJson(const OpArgs& op_args, string_view key, + const WrappedJsonPath& json_path, string_view json_str, + bool is_nx_condition, bool is_xx_condition) { + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); + RETURN_ON_BAD_STATUS(it_res); + + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); + + /* This method would use copy for parsed_json and not move! + The reason being, that we are applying this multiple times for each match we found. + So for example if we have an array that this expression will match each entry in it then the + assign here is called N times. */ + std::optional parsed_json = ShardJsonFromString(json_str); + if (!parsed_json) { + VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; + return OpStatus::INVALID_JSON; + } + + bool path_exists = false; + bool value_was_set = false; + + // If the path exists, this callback will be called + auto mutate_cb = [&](std::optional, JsonType* val) -> MutateCallbackResult<> { + path_exists = true; + if (!is_nx_condition) { + value_was_set = true; + *val = JsonType(parsed_json.value(), + std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); + } + return {}; + }; + + auto mutate_res = json_path.ExecuteMutateCallback( + updater.GetJson(), mutate_cb, CallbackResultOptions::DefaultMutateOptions()); + + // Set a new value if the path doesn't exist and the xx condition is not set. + if (mutate_res && !path_exists && !is_xx_condition) { + auto pointer = ConvertJsonPathToJsonPointer(json_path.Path()); + if (!pointer) { + return OpStatus::SYNTAX_ERR; + } + + std::error_code ec; + jsoncons::jsonpointer::add(*updater.GetJson(), pointer.value(), std::move(parsed_json).value(), + ec); + if (ec) { + VLOG(1) << "Failed to add a JSON value to the following path: " << json_str + << " with the error: " << ec.message(); + return OpStatus::SYNTAX_ERR; + } + + value_was_set = true; + } + + if (value_was_set) { + // We should do reset before setting the size of the json, because + // std::optional still holds the value and it will be deallocated + parsed_json.reset(); + updater.SetJsonSize(); + } + + return value_was_set; } size_t NormalizeNegativeIndex(int index, size_t size) { @@ -517,44 +694,6 @@ string ConvertToJsonPointer(string_view json_path) { return result; } -/* Converts a JSONPath to a JSONPointer. - E.g. $[a][b][0] -> /a/b/0. - V1 JSONPath is not supported. */ -std::optional ConvertJsonPathToJsonPointer(string_view json_path) { - auto parsed_path = json::ParsePath(json_path); - - if (!parsed_path) { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Invalid JSONPath."; - return std::nullopt; - } - - std::string pointer; - const auto& path = parsed_path.value(); - for (const auto& node : path) { - const auto& type = node.type(); - if (type == json::SegmentType::IDENTIFIER) { - pointer += '/' + node.identifier(); - } else if (type == json::SegmentType::INDEX) { - const auto& index = node.index(); - - if (index.first != index.second) { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Index range is not supported."; - return std::nullopt; - } - - pointer += '/' + std::to_string(node.index().first); - } else { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Unsupported segment type."; - return std::nullopt; - } - } - - return pointer; -} - size_t CountJsonFields(const JsonType& j) { size_t res = 0; json_type type = j.type(); @@ -1331,68 +1470,14 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, } } - auto st = SetJson(op_args, key, json_str); - RETURN_ON_BAD_STATUS(st); - return true; + OpStatus result = SetFullJson(op_args, key, json_str); + if (result == OpStatus::OK) { + return true; + } + return result; } - // Note that this operation would use copy and not move! - // The reason being, that we are applying this multiple times - // For each match we found. So for example if we have - // an array that this expression will match each entry in it - // then the assign here is called N times, where N == array.size(). - bool path_exists = false; - bool operation_result = false; - - optional parsed_json = ShardJsonFromString(json_str); - if (!parsed_json) { - VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; - return OpStatus::INVALID_JSON; - } - const JsonType& new_json = parsed_json.value(); - - // If the path exists, this callback will be called - auto mutate_cb = [&](std::optional, JsonType* val) -> MutateCallbackResult<> { - path_exists = true; - if (!is_nx_condition) { - operation_result = true; - *val = - JsonType(new_json, std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); - } - return {}; - }; - - // If the path doesn't exist, this callback will be called - auto insert_cb = [&](JsonType* json) { - // Set a new value if the path doesn't exist and the xx condition is not set. - if (!path_exists && !is_xx_condition) { - auto pointer = ConvertJsonPathToJsonPointer(json_path.Path()); - if (!pointer) { - return OpStatus::SYNTAX_ERR; - } - - std::error_code ec; - jsoncons::jsonpointer::add(*json, pointer.value(), new_json, ec); - if (ec) { - VLOG(1) << "Failed to add a JSON value to the following path: " << path - << " with the error: " << ec.message(); - return OpStatus::SYNTAX_ERR; - } - - operation_result = true; - } - - return OpStatus::OK; - }; - - // JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already - // existing json keys use copy assign, so we don't really need to account for the memory - // allocated by ShardJsonFromString above since it's not being moved here at all. - auto res = JsonMutateOperation(op_args, key, json_path, std::move(mutate_cb), - MutateOperationOptions{std::move(insert_cb)}); - RETURN_ON_BAD_STATUS(res); - - return operation_result; + return SetPartialJson(op_args, key, json_path, json_str, is_nx_condition, is_xx_condition); } OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, diff --git a/src/server/json_family_memory_test.cc b/src/server/json_family_memory_test.cc new file mode 100644 index 000000000..e414a3fd9 --- /dev/null +++ b/src/server/json_family_memory_test.cc @@ -0,0 +1,108 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "base/gtest.h" +#include "base/logging.h" +#include "facade/facade_test.h" +#include "server/command_registry.h" +#include "server/json_family.h" +#include "server/test_utils.h" + +using namespace testing; +using namespace std; +using namespace util; + +ABSL_DECLARE_FLAG(bool, jsonpathv2); + +namespace dfly { + +class JsonFamilyMemoryTest : public BaseFamilyTest { + public: + static dfly::MiMemoryResource* GetMemoryResource() { + static thread_local mi_heap_t* heap = mi_heap_new(); + static thread_local dfly::MiMemoryResource memory_resource{heap}; + return &memory_resource; + } + + protected: + auto GetJsonMemoryUsageFromDb(std::string_view key) { + return Run({"MEMORY", "USAGE", key, "WITHOUTKEY"}); + } +}; + +size_t GetMemoryUsage() { + return static_cast(JsonFamilyMemoryTest::GetMemoryResource())->used(); +} + +size_t GetJsonMemoryUsageFromString(std::string_view json_str) { + size_t start = GetMemoryUsage(); + auto json = dfly::JsonFromString(json_str, JsonFamilyMemoryTest::GetMemoryResource()); + if (!json) { + return 0; + } + + // The same behaviour as in CompactObj + void* ptr = + JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType)); + JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value()); + DCHECK(json_on_heap); + + size_t result = GetMemoryUsage() - start; + + // Free the memory + json_on_heap->~JsonType(); + JsonFamilyMemoryTest::GetMemoryResource()->deallocate(json_on_heap, sizeof(JsonType), + alignof(JsonType)); + return result; +} + +TEST_F(JsonFamilyMemoryTest, SimpleSet) { + std::string_view big_json = R"({"a":"some big string asdkasdkasdfkkasjdkfjka"})"; + size_t start_size = GetJsonMemoryUsageFromString(big_json); + + auto resp = Run({"JSON.SET", "j1", "$", big_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view small_json = R"({"a":" "})"; + size_t next_size = GetJsonMemoryUsageFromString(small_json); + + resp = Run({"JSON.SET", "j1", "$", small_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(next_size)); + + // Again set big json + resp = Run({"JSON.SET", "j1", "$", big_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); +} + +TEST_F(JsonFamilyMemoryTest, PartialSet) { + std::string_view start_json = R"({"a":"some text", "b":" "})"; + size_t start_size = GetJsonMemoryUsageFromString(start_json); + + auto resp = Run({"JSON.SET", "j1", "$", start_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view json_after_set = R"({"a":"some text", "b":"some another text"})"; + size_t size_after_set = GetJsonMemoryUsageFromString(json_after_set); + + resp = Run({"JSON.SET", "j1", "$.b", "\"some another text\""}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(size_after_set)); + + // Again set start json + resp = Run({"JSON.SET", "j1", "$.b", "\" \""}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); +} + +} // namespace dfly diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 38dc0a9bd..7411a2ea0 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -2753,4 +2753,18 @@ TEST_F(SearchFamilyTest, RenameDocumentBetweenIndices) { EXPECT_EQ(Run({"rename", "idx2:{doc}1", "idx1:{doc}1"}), "OK"); } +TEST_F(SearchFamilyTest, JsonSetIndexesBug) { + auto resp = Run({"JSON.SET", "j1", "$", R"({"text":"some text"})"}); + EXPECT_THAT(resp, "OK"); + + resp = Run( + {"FT.CREATE", "index", "ON", "json", "SCHEMA", "$.text", "AS", "text", "TEXT", "SORTABLE"}); + EXPECT_THAT(resp, "OK"); + + resp = Run({"JSON.SET", "j1", "$", R"({"asd}"})"}); + EXPECT_THAT(resp, ErrArg("ERR failed to parse JSON")); + + resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@text"}); + EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("text", "some text"))); +} } // namespace dfly From 91ef2af1695bc303b530b524197dcc491ba30a59 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Tue, 6 May 2025 09:58:09 +0200 Subject: [PATCH 3/4] fix(json_family): Fix memory tracking for JSON mutate operations Signed-off-by: Stepan Bagritsevich --- src/server/json_family.cc | 44 +++++++-------------------------------- 1 file changed, 8 insertions(+), 36 deletions(-) diff --git a/src/server/json_family.cc b/src/server/json_family.cc index e237615e9..a2f0552ec 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -748,47 +748,19 @@ OpResult> JsonReadOnlyOperation(const OpArgs& op_args, std return json_path.ExecuteReadOnlyCallback(json_val, cb, options.cb_result_options); } -struct MutateOperationOptions { - using PostMutateCallback = absl::FunctionRef; - - std::optional post_mutate_cb; - CallbackResultOptions cb_result_options = CallbackResultOptions::DefaultMutateOptions(); -}; - template -OpResult>> JsonMutateOperation(const OpArgs& op_args, - std::string_view key, - const WrappedJsonPath& json_path, - JsonPathMutateCallback cb, - MutateOperationOptions options = {}) { +OpResult>> JsonMutateOperation( + const OpArgs& op_args, std::string_view key, const WrappedJsonPath& json_path, + JsonPathMutateCallback cb, + CallbackResultOptions cb_result_options = CallbackResultOptions::DefaultMutateOptions()) { auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); RETURN_ON_BAD_STATUS(it_res); - JsonMemTracker mem_tracker; + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); - PrimeValue& pv = it_res->it->second; + auto mutate_res = json_path.ExecuteMutateCallback(updater.GetJson(), cb, cb_result_options); - JsonType* json_val = pv.GetJson(); - DCHECK(json_val) << "should have a valid JSON object for key '" << key << "' the type for it is '" - << pv.ObjType() << "'"; - - op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv); - - auto mutate_res = json_path.ExecuteMutateCallback(json_val, cb, options.cb_result_options); - - // Call post mutate callback - if (mutate_res && options.post_mutate_cb) { - auto res = options.post_mutate_cb.value()(json_val); - // We can not return result here, because we need to update the size - if (res != OpStatus::OK) { - mutate_res = res; - } - } - - // we need to manually run this before the PostUpdater run - mem_tracker.SetJsonSize(pv, false); - it_res->post_updater.Run(); - op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv); + updater.SetJsonSize(); return mutate_res; } @@ -1223,7 +1195,7 @@ auto OpArrPop(const OpArgs& op_args, string_view key, WrappedJsonPath& path, int return {false, std::move(str)}; }; return JsonMutateOperation(op_args, key, path, std::move(cb), - {{}, CallbackResultOptions{OnEmpty::kSendNil}}); + CallbackResultOptions{OnEmpty::kSendNil}); } // Returns numeric vector that represents the new length of the array at each path. From e9976f1df6e5331649a253f29173d25f4b3b2ffc Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich Date: Tue, 6 May 2025 17:31:32 +0200 Subject: [PATCH 4/4] fix(json_family): Fix memory tracking for the JSON.DEL command fixes dragonflydb#5055 Signed-off-by: Stepan Bagritsevich --- src/server/json_family.cc | 67 ++++++--------- src/server/json_family_memory_test.cc | 104 ++++++++++++++++++++++++ src/server/search/search_family_test.cc | 15 ++++ 3 files changed, 144 insertions(+), 42 deletions(-) diff --git a/src/server/json_family.cc b/src/server/json_family.cc index a2f0552ec..5723b8de5 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -54,34 +54,6 @@ using CI = CommandId; namespace { -class JsonMemTracker { - public: - JsonMemTracker() { - start_size_ = static_cast(CompactObj::memory_resource())->used(); - } - - void SetJsonSize(PrimeValue& pv, bool is_op_set) { - const size_t current = static_cast(CompactObj::memory_resource())->used(); - int64_t diff = static_cast(current) - static_cast(start_size_); - // If the diff is 0 it means the object use the same memory as before. No action needed. - if (diff == 0) { - return; - } - // If op_set_ it means we JSON.SET or JSON.MSET was called. This is a blind update, - // and because the operation sets the size to 0 we also need to include the size of - // the pointer. - if (is_op_set) { - diff += static_cast(mi_usable_size(pv.GetJson())); - } - pv.SetJsonSize(diff); - // Under any flow we must not end up with this special value. - DCHECK(pv.MallocUsed() != 0); - } - - private: - size_t start_size_{0}; -}; - /* Helper class which must be initialized before any mutate operations on json. It will track the memory usage of the json object and update the size in the CompactObj. It also contains indexes updates, post update operations on the iterator. */ @@ -107,6 +79,8 @@ class JsonAutoUpdater { void SetJsonSize() { set_size_was_called_ = true; + ShrinkJsonIfNeeded(); + const size_t current = GetMemoryUsage(); int64_t diff = static_cast(current) - static_cast(start_size_); @@ -145,6 +119,16 @@ class JsonAutoUpdater { return static_cast(CompactObj::memory_resource())->used(); } + /* Shrinks the json object to fit its current size. + Sometimes after mutating the json object, it may have more capacity than needed. + This method will reduce the capacity to fit the current size. */ + void ShrinkJsonIfNeeded() { + auto json = GetJson(); + if (json->size() * 2 < json->capacity()) { + json->shrink_to_fit(); + } + } + private: const OpArgs& op_args_; string_view key_; @@ -1060,29 +1044,24 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, return 0; } - PrimeValue& pv = it_res->it->second; - JsonType* json_val = pv.GetJson(); - - JsonMemTracker tracker; - absl::Cleanup update_size_on_exit([tracker, &pv]() mutable { tracker.SetJsonSize(pv, false); }); - if (json_path.HoldsJsonPath()) { + JsonAutoUpdater updater(op_args, key, *std::move(it_res), true); const json::Path& path = json_path.AsJsonPath(); long deletions = json::MutatePath( - path, [](optional, JsonType* val) { return true; }, json_val); + path, [](optional, JsonType* val) { return true; }, updater.GetJson()); return deletions; } + // Allocates memory for the deletion_items. + // So we need to initialize JsonAutoUpdater after this callback vector deletion_items; - auto cb = [&](std::optional path, JsonType* val) -> MutateCallbackResult<> { - deletion_items.emplace_back(*path); + auto cb = [&deletion_items](string_view path, const JsonType& val) -> Nothing { + deletion_items.emplace_back(path); return {}; }; - auto res = json_path.ExecuteMutateCallback( - json_val, std::move(cb), CallbackResultOptions::DefaultMutateOptions()); - RETURN_ON_BAD_STATUS(res); - + auto res = json_path.ExecuteReadOnlyCallback( + it_res->it->second.GetJson(), cb, CallbackResultOptions::DefaultReadOnlyOptions()); if (deletion_items.empty()) { return 0; } @@ -1097,13 +1076,17 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, patch.emplace_back(patch_item); } + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); + std::error_code ec; - jsoncons::jsonpatch::apply_patch(*json_val, patch, ec); + jsoncons::jsonpatch::apply_patch(*updater.GetJson(), patch, ec); if (ec) { VLOG(1) << "Failed to apply patch on json with error: " << ec.message(); return 0; } + updater.SetJsonSize(); + // SetString(op_args, key, j.as_string()); return total_deletions; } diff --git a/src/server/json_family_memory_test.cc b/src/server/json_family_memory_test.cc index e414a3fd9..95e6d3ad2 100644 --- a/src/server/json_family_memory_test.cc +++ b/src/server/json_family_memory_test.cc @@ -105,4 +105,108 @@ TEST_F(JsonFamilyMemoryTest, PartialSet) { EXPECT_THAT(resp, IntArg(start_size)); } +/* Tests how works memory usage after deleting json object in jsoncons */ +TEST_F(JsonFamilyMemoryTest, JsonConsDelTest) { + std::string_view start_json = R"({"a":"some text", "b":" "})"; + + size_t start = GetMemoryUsage(); + + auto json = dfly::JsonFromString(start_json, JsonFamilyMemoryTest::GetMemoryResource()); + void* ptr = + JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType)); + JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value()); + + size_t memory_usage_before_erase = GetMemoryUsage() - start; + + json_on_heap->erase("a"); + /* To deallocate memory we should use shrink_to_fit */ + json_on_heap->shrink_to_fit(); + + size_t memory_usage_after_erase = GetMemoryUsage() - start; + + EXPECT_GT(memory_usage_before_erase, memory_usage_after_erase); + EXPECT_EQ(memory_usage_after_erase, GetJsonMemoryUsageFromString(R"({"b":" "})")); +} + +TEST_F(JsonFamilyMemoryTest, SimpleDel) { + std::string_view start_json = R"({"a":"some text", "b":" "})"; + size_t start_size = GetJsonMemoryUsageFromString(start_json); + + auto resp = Run({"JSON.SET", "j1", "$", start_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view json_after_del = R"({"b":" "})"; + size_t size_after_del = GetJsonMemoryUsageFromString(json_after_del); + + // Test that raw memory usage is correct + resp = Run({"JSON.SET", "j2", "$", json_after_del}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j2"); + EXPECT_THAT(resp, IntArg(size_after_del)); + + // Test that after deletion memory usage is correct + resp = Run({"JSON.DEL", "j1", "$.a"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"JSON.GET", "j1"}); + EXPECT_EQ(resp, json_after_del); + resp = GetJsonMemoryUsageFromDb("j1"); + + /* We still expect the initial size here, because after deletion we do not call shrink_to_fit on + the JSON object. As a result, the memory will not be deallocated. Check + JsonFamilyMemoryTest::JsonConsDelTest for example. */ + EXPECT_THAT(resp, IntArg(start_size)); + + // Again set start json + resp = Run({"JSON.SET", "j1", "$.a", "\"some text\""}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); +} + +TEST_F(JsonFamilyMemoryTest, JsonShrinking) { + std::string_view start_json = R"({"a":"some text","b":"some another text","c":" "})"; + size_t start_size = GetJsonMemoryUsageFromString(start_json); + + auto resp = Run({"JSON.SET", "j1", "$", start_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view json_after_del = R"({"c":" "})"; + size_t size_after_del = GetJsonMemoryUsageFromString(json_after_del); + + // Test that raw memory usage is correct + resp = Run({"JSON.SET", "j2", "$", json_after_del}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j2"); + EXPECT_THAT(resp, IntArg(size_after_del)); + + // Test that after deletion memory usage decreases + resp = Run({"JSON.DEL", "j1", "$.a"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"JSON.DEL", "j1", "$.b"}); + EXPECT_THAT(resp, IntArg(1)); + resp = Run({"JSON.GET", "j1"}); + EXPECT_EQ(resp, json_after_del); + resp = GetJsonMemoryUsageFromDb("j1"); + // Now we expect the size to be smaller, because shrink_to_fit was called + EXPECT_THAT(resp, IntArg(size_after_del)); + + // Again set start json + resp = Run({"JSON.SET", "j1", "$.a", "\"some text\""}); + EXPECT_EQ(resp, "OK"); + resp = Run({"JSON.SET", "j1", "$.b", "\"some another text\""}); + EXPECT_EQ(resp, "OK"); + resp = Run({"JSON.GET", "j1"}); + EXPECT_EQ(resp, start_json); + resp = GetJsonMemoryUsageFromDb("j1"); + + // Jsoncons will allocate more memory for the new json that needed. + // This is totally fine, because we will not call shrink_to_fit. + EXPECT_THAT(resp, IntArg(368)); + EXPECT_GT(368, start_size); +} + } // namespace dfly diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 7411a2ea0..9c6dce2a7 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -2767,4 +2767,19 @@ TEST_F(SearchFamilyTest, JsonSetIndexesBug) { resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@text"}); EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("text", "some text"))); } + +TEST_F(SearchFamilyTest, JsonDelIndexesBug) { + auto resp = Run({"JSON.SET", "j1", "$", R"({"text":"some text"})"}); + EXPECT_THAT(resp, "OK"); + + resp = Run( + {"FT.CREATE", "index", "ON", "json", "SCHEMA", "$.text", "AS", "text", "TEXT", "SORTABLE"}); + EXPECT_THAT(resp, "OK"); + + resp = Run({"JSON.DEL", "j1", "$.text"}); + EXPECT_THAT(resp, IntArg(1)); + + resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@text"}); + EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("text", ArgType(RespExpr::NIL)))); +} } // namespace dfly