From 3d79664a19f27b43be921d5acfeba977a37fdc40 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Wed, 7 May 2025 08:56:43 +0200 Subject: [PATCH] fix(json_family): Fix memory tracking for the JSON.SET command. THIRD PR (#5069) * fix(json_family): Fix memory tracking for the JSON.SET command fixes dragonflydb#5054 Signed-off-by: Stepan Bagritsevich * refactor: address comments Signed-off-by: Stepan Bagritsevich --------- Signed-off-by: Stepan Bagritsevich --- src/server/CMakeLists.txt | 5 +- src/server/json_family.cc | 315 +++++++++++++++--------- src/server/json_family_memory_test.cc | 108 ++++++++ src/server/search/search_family_test.cc | 14 ++ 4 files changed, 326 insertions(+), 116 deletions(-) create mode 100644 src/server/json_family_memory_test.cc diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index f1e9b6ae0..27236153d 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -118,6 +118,7 @@ cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(geo_family_test dfly_test_lib LABELS DFLY) cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY) +cxx_test(json_family_memory_test dfly_test_lib LABELS DFLY) cxx_test(journal/journal_test dfly_test_lib LABELS DFLY) cxx_test(hll_family_test dfly_test_lib LABELS DFLY) cxx_test(bloom_family_test dfly_test_lib LABELS DFLY) @@ -131,6 +132,7 @@ if (WITH_ASAN OR WITH_USAN) target_compile_definitions(multi_test PRIVATE SANITIZERS) target_compile_definitions(search_family_test PRIVATE SANITIZERS) target_compile_definitions(json_family_test PRIVATE SANITIZERS) + target_compile_definitions(json_family_memory_test PRIVATE SANITIZERS) target_compile_definitions(dragonfly_test PRIVATE SANITIZERS) endif() cxx_test(search/aggregator_test dfly_test_lib LABELS DFLY) @@ -142,4 +144,5 @@ add_dependencies(check_dfly dragonfly_test json_family_test list_family_test generic_family_test memcache_parser_test rdb_test journal_test redis_parser_test stream_family_test string_family_test bitops_family_test set_family_test zset_family_test geo_family_test - hll_family_test cluster_config_test cluster_family_test acl_family_test) + hll_family_test cluster_config_test cluster_family_test acl_family_test + json_family_memory_test) diff --git a/src/server/json_family.cc b/src/server/json_family.cc index ac9f2271e..bee4e64a5 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -82,6 +82,80 @@ class JsonMemTracker { size_t start_size_{0}; }; +/* Helper class which must be initialized before any mutate operations on json. + It will track the memory usage of the json object and update the size in the CompactObj. + It also contains indexes updates, post update operations on the iterator. */ +class JsonAutoUpdater { + public: + JsonAutoUpdater(const OpArgs& op_args, string_view key, DbSlice::ItAndUpdater it, + bool update_on_delete = false) + : op_args_(op_args), key_(key), it_(std::move(it)), update_on_delete_(update_on_delete) { + op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it.it->second); + + /* We need to initialize start memory usage after RemoveDoc because internally RemoveDoc has + static cache that can allocate/deallocate memory. Because of this, we will + overestimate/underestimate memory usage for json object. */ + start_size_ = GetMemoryUsage(); + } + + JsonAutoUpdater(const JsonAutoUpdater&) = delete; + JsonAutoUpdater& operator=(const JsonAutoUpdater&) = delete; + + JsonAutoUpdater(JsonAutoUpdater&&) = default; + JsonAutoUpdater& operator=(JsonAutoUpdater&&) = delete; + + void SetJsonSize() { + set_size_was_called_ = true; + + const size_t current = GetMemoryUsage(); + int64_t diff = static_cast(current) - static_cast(start_size_); + + GetPrimeValue().SetJsonSize(diff); + + // Under any flow we must not end up with this special value. + DCHECK(GetPrimeValue().MallocUsed() != 0); + } + + ~JsonAutoUpdater() { + if (update_on_delete_ && !set_size_was_called_) { + SetJsonSize(); + } else if (!set_size_was_called_) { + LOG(WARNING) << "JsonAutoUpdater destructor called without SetJsonSize() being called. This " + "may lead to memory tracking issues."; + } + + it_.post_updater.Run(); + + /* We need to call AddDoc after SetJsonSize because internally AddDoc has static cache that can + allocate/deallocate memory. Because of this, we will overestimate/underestimate memory usage for + json object. */ + op_args_.shard->search_indices()->AddDoc(key_, op_args_.db_cntx, GetPrimeValue()); + } + + PrimeValue& GetPrimeValue() { + return it_.it->second; + } + + JsonType* GetJson() { + return GetPrimeValue().GetJson(); + } + + private: + size_t GetMemoryUsage() const { + return static_cast(CompactObj::memory_resource())->used(); + } + + private: + const OpArgs& op_args_; + string_view key_; + DbSlice::ItAndUpdater it_; + + // Used to track the memory usage of the json object + size_t start_size_{0}; + bool set_size_was_called_{false}; + bool update_on_delete_; +}; + template using ParseResult = io::Result; ParseResult ParseJsonPathAsExpression(std::string_view path) { @@ -318,28 +392,63 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) { } } +/* Converts a JSONPath to a JSONPointer. + E.g. $[a][b][0] -> /a/b/0. + V1 JSONPath is not supported. */ +std::optional ConvertJsonPathToJsonPointer(string_view json_path) { + auto parsed_path = json::ParsePath(json_path); + + if (!parsed_path) { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Invalid JSONPath."; + return std::nullopt; + } + + std::string pointer; + const auto& path = parsed_path.value(); + for (const auto& node : path) { + const auto& type = node.type(); + if (type == json::SegmentType::IDENTIFIER) { + absl::StrAppend(&pointer, "/"sv, node.identifier()); + } else if (type == json::SegmentType::INDEX) { + const auto& index = node.index(); + + if (index.first != index.second) { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Index range is not supported."; + return std::nullopt; + } + + absl::StrAppend(&pointer, "/"sv, node.index().first); + } else { + VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path + << ". Unsupported segment type."; + return std::nullopt; + } + } + + return pointer; +} + // Use this method on the coordinator thread std::optional JsonFromString(std::string_view input) { return dfly::JsonFromString(input, PMR_NS::get_default_resource()); } -// Use this method on the shard thread +/* Use this method on the shard thread + + If you do memory tracking, make sure to initialize it before calling this method, and reset the + result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may + still hold the JSON value, which can lead to incorrect memory tracking. */ std::optional ShardJsonFromString(std::string_view input) { return dfly::JsonFromString(input, CompactObj::memory_resource()); } -OpResult SetJson(const OpArgs& op_args, string_view key, - string_view json_str) { - auto& db_slice = op_args.GetDbSlice(); +OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) { + auto it_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key); + RETURN_ON_BAD_STATUS(it_res); - auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); - RETURN_ON_BAD_STATUS(op_res); - - auto& res = *op_res; - - op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second); - - JsonMemTracker tracker; + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); std::optional parsed_json = ShardJsonFromString(json_str); if (!parsed_json) { @@ -352,15 +461,83 @@ OpResult SetJson(const OpArgs& op_args, string_view key, json::FromJsonType(*parsed_json, &fbb); fbb.Finish(); const auto& buf = fbb.GetBuffer(); - res.it->second.SetJson(buf.data(), buf.size()); + updater.GetPrimeValue().SetJson(buf.data(), buf.size()); } else { - res.it->second.SetJson(std::move(*parsed_json)); + updater.GetPrimeValue().SetJson(std::move(*parsed_json)); } - tracker.SetJsonSize(res.it->second, res.is_new); - op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second); + // We should do reset before setting the size of the json, because + // std::optional still holds the value and it will be deallocated + parsed_json.reset(); + updater.SetJsonSize(); - return std::move(res); + return OpStatus::OK; +} + +/* Sets a partial JSON value at the specified path. + True means that the value was set, false means that the value was not set. */ +OpResult SetPartialJson(const OpArgs& op_args, string_view key, + const WrappedJsonPath& json_path, string_view json_str, + bool is_nx_condition, bool is_xx_condition) { + auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); + RETURN_ON_BAD_STATUS(it_res); + + JsonAutoUpdater updater(op_args, key, *std::move(it_res)); + + /* This method would use copy for parsed_json and not move! + The reason being, that we are applying this multiple times for each match we found. + So for example if we have an array that this expression will match each entry in it then the + assign here is called N times. */ + std::optional parsed_json = ShardJsonFromString(json_str); + if (!parsed_json) { + VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; + return OpStatus::INVALID_JSON; + } + + bool path_exists = false; + bool value_was_set = false; + + // If the path exists, this callback will be called + auto mutate_cb = [&](std::optional, JsonType* val) -> MutateCallbackResult<> { + path_exists = true; + if (!is_nx_condition) { + value_was_set = true; + *val = JsonType(parsed_json.value(), + std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); + } + return {}; + }; + + auto mutate_res = json_path.ExecuteMutateCallback( + updater.GetJson(), mutate_cb, CallbackResultOptions::DefaultMutateOptions()); + + // Set a new value if the path doesn't exist and the xx condition is not set. + if (mutate_res && !path_exists && !is_xx_condition) { + auto pointer = ConvertJsonPathToJsonPointer(json_path.Path()); + if (!pointer) { + return OpStatus::SYNTAX_ERR; + } + + std::error_code ec; + jsoncons::jsonpointer::add(*updater.GetJson(), pointer.value(), std::move(parsed_json).value(), + ec); + if (ec) { + VLOG(1) << "Failed to add a JSON value to the following path: " << json_str + << " with the error: " << ec.message(); + return OpStatus::SYNTAX_ERR; + } + + value_was_set = true; + } + + if (value_was_set) { + // We should do reset before setting the size of the json, because + // std::optional still holds the value and it will be deallocated + parsed_json.reset(); + updater.SetJsonSize(); + } + + return value_was_set; } size_t NormalizeNegativeIndex(int index, size_t size) { @@ -517,44 +694,6 @@ string ConvertToJsonPointer(string_view json_path) { return result; } -/* Converts a JSONPath to a JSONPointer. - E.g. $[a][b][0] -> /a/b/0. - V1 JSONPath is not supported. */ -std::optional ConvertJsonPathToJsonPointer(string_view json_path) { - auto parsed_path = json::ParsePath(json_path); - - if (!parsed_path) { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Invalid JSONPath."; - return std::nullopt; - } - - std::string pointer; - const auto& path = parsed_path.value(); - for (const auto& node : path) { - const auto& type = node.type(); - if (type == json::SegmentType::IDENTIFIER) { - pointer += '/' + node.identifier(); - } else if (type == json::SegmentType::INDEX) { - const auto& index = node.index(); - - if (index.first != index.second) { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Index range is not supported."; - return std::nullopt; - } - - pointer += '/' + std::to_string(node.index().first); - } else { - VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path - << ". Unsupported segment type."; - return std::nullopt; - } - } - - return pointer; -} - size_t CountJsonFields(const JsonType& j) { size_t res = 0; json_type type = j.type(); @@ -1331,68 +1470,14 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, } } - auto st = SetJson(op_args, key, json_str); - RETURN_ON_BAD_STATUS(st); - return true; + OpStatus result = SetFullJson(op_args, key, json_str); + if (result == OpStatus::OK) { + return true; + } + return result; } - // Note that this operation would use copy and not move! - // The reason being, that we are applying this multiple times - // For each match we found. So for example if we have - // an array that this expression will match each entry in it - // then the assign here is called N times, where N == array.size(). - bool path_exists = false; - bool operation_result = false; - - optional parsed_json = ShardJsonFromString(json_str); - if (!parsed_json) { - VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; - return OpStatus::INVALID_JSON; - } - const JsonType& new_json = parsed_json.value(); - - // If the path exists, this callback will be called - auto mutate_cb = [&](std::optional, JsonType* val) -> MutateCallbackResult<> { - path_exists = true; - if (!is_nx_condition) { - operation_result = true; - *val = - JsonType(new_json, std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); - } - return {}; - }; - - // If the path doesn't exist, this callback will be called - auto insert_cb = [&](JsonType* json) { - // Set a new value if the path doesn't exist and the xx condition is not set. - if (!path_exists && !is_xx_condition) { - auto pointer = ConvertJsonPathToJsonPointer(json_path.Path()); - if (!pointer) { - return OpStatus::SYNTAX_ERR; - } - - std::error_code ec; - jsoncons::jsonpointer::add(*json, pointer.value(), new_json, ec); - if (ec) { - VLOG(1) << "Failed to add a JSON value to the following path: " << path - << " with the error: " << ec.message(); - return OpStatus::SYNTAX_ERR; - } - - operation_result = true; - } - - return OpStatus::OK; - }; - - // JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already - // existing json keys use copy assign, so we don't really need to account for the memory - // allocated by ShardJsonFromString above since it's not being moved here at all. - auto res = JsonMutateOperation(op_args, key, json_path, std::move(mutate_cb), - MutateOperationOptions{std::move(insert_cb)}); - RETURN_ON_BAD_STATUS(res); - - return operation_result; + return SetPartialJson(op_args, key, json_path, json_str, is_nx_condition, is_xx_condition); } OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, diff --git a/src/server/json_family_memory_test.cc b/src/server/json_family_memory_test.cc new file mode 100644 index 000000000..e414a3fd9 --- /dev/null +++ b/src/server/json_family_memory_test.cc @@ -0,0 +1,108 @@ +// Copyright 2025, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "base/gtest.h" +#include "base/logging.h" +#include "facade/facade_test.h" +#include "server/command_registry.h" +#include "server/json_family.h" +#include "server/test_utils.h" + +using namespace testing; +using namespace std; +using namespace util; + +ABSL_DECLARE_FLAG(bool, jsonpathv2); + +namespace dfly { + +class JsonFamilyMemoryTest : public BaseFamilyTest { + public: + static dfly::MiMemoryResource* GetMemoryResource() { + static thread_local mi_heap_t* heap = mi_heap_new(); + static thread_local dfly::MiMemoryResource memory_resource{heap}; + return &memory_resource; + } + + protected: + auto GetJsonMemoryUsageFromDb(std::string_view key) { + return Run({"MEMORY", "USAGE", key, "WITHOUTKEY"}); + } +}; + +size_t GetMemoryUsage() { + return static_cast(JsonFamilyMemoryTest::GetMemoryResource())->used(); +} + +size_t GetJsonMemoryUsageFromString(std::string_view json_str) { + size_t start = GetMemoryUsage(); + auto json = dfly::JsonFromString(json_str, JsonFamilyMemoryTest::GetMemoryResource()); + if (!json) { + return 0; + } + + // The same behaviour as in CompactObj + void* ptr = + JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType)); + JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value()); + DCHECK(json_on_heap); + + size_t result = GetMemoryUsage() - start; + + // Free the memory + json_on_heap->~JsonType(); + JsonFamilyMemoryTest::GetMemoryResource()->deallocate(json_on_heap, sizeof(JsonType), + alignof(JsonType)); + return result; +} + +TEST_F(JsonFamilyMemoryTest, SimpleSet) { + std::string_view big_json = R"({"a":"some big string asdkasdkasdfkkasjdkfjka"})"; + size_t start_size = GetJsonMemoryUsageFromString(big_json); + + auto resp = Run({"JSON.SET", "j1", "$", big_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view small_json = R"({"a":" "})"; + size_t next_size = GetJsonMemoryUsageFromString(small_json); + + resp = Run({"JSON.SET", "j1", "$", small_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(next_size)); + + // Again set big json + resp = Run({"JSON.SET", "j1", "$", big_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); +} + +TEST_F(JsonFamilyMemoryTest, PartialSet) { + std::string_view start_json = R"({"a":"some text", "b":" "})"; + size_t start_size = GetJsonMemoryUsageFromString(start_json); + + auto resp = Run({"JSON.SET", "j1", "$", start_json}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); + + std::string_view json_after_set = R"({"a":"some text", "b":"some another text"})"; + size_t size_after_set = GetJsonMemoryUsageFromString(json_after_set); + + resp = Run({"JSON.SET", "j1", "$.b", "\"some another text\""}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(size_after_set)); + + // Again set start json + resp = Run({"JSON.SET", "j1", "$.b", "\" \""}); + EXPECT_EQ(resp, "OK"); + resp = GetJsonMemoryUsageFromDb("j1"); + EXPECT_THAT(resp, IntArg(start_size)); +} + +} // namespace dfly diff --git a/src/server/search/search_family_test.cc b/src/server/search/search_family_test.cc index 38dc0a9bd..7411a2ea0 100644 --- a/src/server/search/search_family_test.cc +++ b/src/server/search/search_family_test.cc @@ -2753,4 +2753,18 @@ TEST_F(SearchFamilyTest, RenameDocumentBetweenIndices) { EXPECT_EQ(Run({"rename", "idx2:{doc}1", "idx1:{doc}1"}), "OK"); } +TEST_F(SearchFamilyTest, JsonSetIndexesBug) { + auto resp = Run({"JSON.SET", "j1", "$", R"({"text":"some text"})"}); + EXPECT_THAT(resp, "OK"); + + resp = Run( + {"FT.CREATE", "index", "ON", "json", "SCHEMA", "$.text", "AS", "text", "TEXT", "SORTABLE"}); + EXPECT_THAT(resp, "OK"); + + resp = Run({"JSON.SET", "j1", "$", R"({"asd}"})"}); + EXPECT_THAT(resp, ErrArg("ERR failed to parse JSON")); + + resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@text"}); + EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("text", "some text"))); +} } // namespace dfly