fix(json_family): Fix memory tracking for the JSON.SET command. THIRD PR (#5069)

* fix(json_family): Fix memory tracking for the JSON.SET command

fixes dragonflydb#5054

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

---------

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2025-05-07 08:56:43 +02:00 committed by GitHub
parent b3e0bcfb31
commit 3d79664a19
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 326 additions and 116 deletions

View file

@ -118,6 +118,7 @@ cxx_test(zset_family_test dfly_test_lib LABELS DFLY)
cxx_test(geo_family_test dfly_test_lib LABELS DFLY)
cxx_test(blocking_controller_test dfly_test_lib LABELS DFLY)
cxx_test(json_family_test dfly_test_lib LABELS DFLY)
cxx_test(json_family_memory_test dfly_test_lib LABELS DFLY)
cxx_test(journal/journal_test dfly_test_lib LABELS DFLY)
cxx_test(hll_family_test dfly_test_lib LABELS DFLY)
cxx_test(bloom_family_test dfly_test_lib LABELS DFLY)
@ -131,6 +132,7 @@ if (WITH_ASAN OR WITH_USAN)
target_compile_definitions(multi_test PRIVATE SANITIZERS)
target_compile_definitions(search_family_test PRIVATE SANITIZERS)
target_compile_definitions(json_family_test PRIVATE SANITIZERS)
target_compile_definitions(json_family_memory_test PRIVATE SANITIZERS)
target_compile_definitions(dragonfly_test PRIVATE SANITIZERS)
endif()
cxx_test(search/aggregator_test dfly_test_lib LABELS DFLY)
@ -142,4 +144,5 @@ add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test journal_test
redis_parser_test stream_family_test string_family_test
bitops_family_test set_family_test zset_family_test geo_family_test
hll_family_test cluster_config_test cluster_family_test acl_family_test)
hll_family_test cluster_config_test cluster_family_test acl_family_test
json_family_memory_test)

View file

@ -82,6 +82,80 @@ class JsonMemTracker {
size_t start_size_{0};
};
/* Helper class which must be initialized before any mutate operations on json.
It will track the memory usage of the json object and update the size in the CompactObj.
It also contains indexes updates, post update operations on the iterator. */
class JsonAutoUpdater {
public:
JsonAutoUpdater(const OpArgs& op_args, string_view key, DbSlice::ItAndUpdater it,
bool update_on_delete = false)
: op_args_(op_args), key_(key), it_(std::move(it)), update_on_delete_(update_on_delete) {
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it.it->second);
/* We need to initialize start memory usage after RemoveDoc because internally RemoveDoc has
static cache that can allocate/deallocate memory. Because of this, we will
overestimate/underestimate memory usage for json object. */
start_size_ = GetMemoryUsage();
}
JsonAutoUpdater(const JsonAutoUpdater&) = delete;
JsonAutoUpdater& operator=(const JsonAutoUpdater&) = delete;
JsonAutoUpdater(JsonAutoUpdater&&) = default;
JsonAutoUpdater& operator=(JsonAutoUpdater&&) = delete;
void SetJsonSize() {
set_size_was_called_ = true;
const size_t current = GetMemoryUsage();
int64_t diff = static_cast<int64_t>(current) - static_cast<int64_t>(start_size_);
GetPrimeValue().SetJsonSize(diff);
// Under any flow we must not end up with this special value.
DCHECK(GetPrimeValue().MallocUsed() != 0);
}
~JsonAutoUpdater() {
if (update_on_delete_ && !set_size_was_called_) {
SetJsonSize();
} else if (!set_size_was_called_) {
LOG(WARNING) << "JsonAutoUpdater destructor called without SetJsonSize() being called. This "
"may lead to memory tracking issues.";
}
it_.post_updater.Run();
/* We need to call AddDoc after SetJsonSize because internally AddDoc has static cache that can
allocate/deallocate memory. Because of this, we will overestimate/underestimate memory usage for
json object. */
op_args_.shard->search_indices()->AddDoc(key_, op_args_.db_cntx, GetPrimeValue());
}
PrimeValue& GetPrimeValue() {
return it_.it->second;
}
JsonType* GetJson() {
return GetPrimeValue().GetJson();
}
private:
size_t GetMemoryUsage() const {
return static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
}
private:
const OpArgs& op_args_;
string_view key_;
DbSlice::ItAndUpdater it_;
// Used to track the memory usage of the json object
size_t start_size_{0};
bool set_size_was_called_{false};
bool update_on_delete_;
};
template <typename T> using ParseResult = io::Result<T, std::string>;
ParseResult<JsonExpression> ParseJsonPathAsExpression(std::string_view path) {
@ -318,28 +392,63 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) {
}
}
/* Converts a JSONPath to a JSONPointer.
E.g. $[a][b][0] -> /a/b/0.
V1 JSONPath is not supported. */
std::optional<std::string> ConvertJsonPathToJsonPointer(string_view json_path) {
auto parsed_path = json::ParsePath(json_path);
if (!parsed_path) {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Invalid JSONPath.";
return std::nullopt;
}
std::string pointer;
const auto& path = parsed_path.value();
for (const auto& node : path) {
const auto& type = node.type();
if (type == json::SegmentType::IDENTIFIER) {
absl::StrAppend(&pointer, "/"sv, node.identifier());
} else if (type == json::SegmentType::INDEX) {
const auto& index = node.index();
if (index.first != index.second) {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Index range is not supported.";
return std::nullopt;
}
absl::StrAppend(&pointer, "/"sv, node.index().first);
} else {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Unsupported segment type.";
return std::nullopt;
}
}
return pointer;
}
// Use this method on the coordinator thread
std::optional<JsonType> JsonFromString(std::string_view input) {
return dfly::JsonFromString(input, PMR_NS::get_default_resource());
}
// Use this method on the shard thread
/* Use this method on the shard thread
If you do memory tracking, make sure to initialize it before calling this method, and reset the
result before invoking SetJsonSize. Note that even after calling std::move on an optional, it may
still hold the JSON value, which can lead to incorrect memory tracking. */
std::optional<JsonType> ShardJsonFromString(std::string_view input) {
return dfly::JsonFromString(input, CompactObj::memory_resource());
}
OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
string_view json_str) {
auto& db_slice = op_args.GetDbSlice();
OpStatus SetFullJson(const OpArgs& op_args, string_view key, string_view json_str) {
auto it_res = op_args.GetDbSlice().AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(it_res);
auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(op_res);
auto& res = *op_res;
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);
JsonMemTracker tracker;
JsonAutoUpdater updater(op_args, key, *std::move(it_res));
std::optional<JsonType> parsed_json = ShardJsonFromString(json_str);
if (!parsed_json) {
@ -352,15 +461,83 @@ OpResult<DbSlice::ItAndUpdater> SetJson(const OpArgs& op_args, string_view key,
json::FromJsonType(*parsed_json, &fbb);
fbb.Finish();
const auto& buf = fbb.GetBuffer();
res.it->second.SetJson(buf.data(), buf.size());
updater.GetPrimeValue().SetJson(buf.data(), buf.size());
} else {
res.it->second.SetJson(std::move(*parsed_json));
updater.GetPrimeValue().SetJson(std::move(*parsed_json));
}
tracker.SetJsonSize(res.it->second, res.is_new);
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
// We should do reset before setting the size of the json, because
// std::optional still holds the value and it will be deallocated
parsed_json.reset();
updater.SetJsonSize();
return std::move(res);
return OpStatus::OK;
}
/* Sets a partial JSON value at the specified path.
True means that the value was set, false means that the value was not set. */
OpResult<bool> SetPartialJson(const OpArgs& op_args, string_view key,
const WrappedJsonPath& json_path, string_view json_str,
bool is_nx_condition, bool is_xx_condition) {
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
RETURN_ON_BAD_STATUS(it_res);
JsonAutoUpdater updater(op_args, key, *std::move(it_res));
/* This method would use copy for parsed_json and not move!
The reason being, that we are applying this multiple times for each match we found.
So for example if we have an array that this expression will match each entry in it then the
assign here is called N times. */
std::optional<JsonType> parsed_json = ShardJsonFromString(json_str);
if (!parsed_json) {
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved";
return OpStatus::INVALID_JSON;
}
bool path_exists = false;
bool value_was_set = false;
// If the path exists, this callback will be called
auto mutate_cb = [&](std::optional<std::string_view>, JsonType* val) -> MutateCallbackResult<> {
path_exists = true;
if (!is_nx_condition) {
value_was_set = true;
*val = JsonType(parsed_json.value(),
std::pmr::polymorphic_allocator<char>{CompactObj::memory_resource()});
}
return {};
};
auto mutate_res = json_path.ExecuteMutateCallback<Nothing>(
updater.GetJson(), mutate_cb, CallbackResultOptions::DefaultMutateOptions());
// Set a new value if the path doesn't exist and the xx condition is not set.
if (mutate_res && !path_exists && !is_xx_condition) {
auto pointer = ConvertJsonPathToJsonPointer(json_path.Path());
if (!pointer) {
return OpStatus::SYNTAX_ERR;
}
std::error_code ec;
jsoncons::jsonpointer::add(*updater.GetJson(), pointer.value(), std::move(parsed_json).value(),
ec);
if (ec) {
VLOG(1) << "Failed to add a JSON value to the following path: " << json_str
<< " with the error: " << ec.message();
return OpStatus::SYNTAX_ERR;
}
value_was_set = true;
}
if (value_was_set) {
// We should do reset before setting the size of the json, because
// std::optional still holds the value and it will be deallocated
parsed_json.reset();
updater.SetJsonSize();
}
return value_was_set;
}
size_t NormalizeNegativeIndex(int index, size_t size) {
@ -517,44 +694,6 @@ string ConvertToJsonPointer(string_view json_path) {
return result;
}
/* Converts a JSONPath to a JSONPointer.
E.g. $[a][b][0] -> /a/b/0.
V1 JSONPath is not supported. */
std::optional<std::string> ConvertJsonPathToJsonPointer(string_view json_path) {
auto parsed_path = json::ParsePath(json_path);
if (!parsed_path) {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Invalid JSONPath.";
return std::nullopt;
}
std::string pointer;
const auto& path = parsed_path.value();
for (const auto& node : path) {
const auto& type = node.type();
if (type == json::SegmentType::IDENTIFIER) {
pointer += '/' + node.identifier();
} else if (type == json::SegmentType::INDEX) {
const auto& index = node.index();
if (index.first != index.second) {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Index range is not supported.";
return std::nullopt;
}
pointer += '/' + std::to_string(node.index().first);
} else {
VLOG(2) << "Error during conversion of JSONPath to JSONPointer: " << json_path
<< ". Unsupported segment type.";
return std::nullopt;
}
}
return pointer;
}
size_t CountJsonFields(const JsonType& j) {
size_t res = 0;
json_type type = j.type();
@ -1331,68 +1470,14 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
}
}
auto st = SetJson(op_args, key, json_str);
RETURN_ON_BAD_STATUS(st);
return true;
OpStatus result = SetFullJson(op_args, key, json_str);
if (result == OpStatus::OK) {
return true;
}
return result;
}
// Note that this operation would use copy and not move!
// The reason being, that we are applying this multiple times
// For each match we found. So for example if we have
// an array that this expression will match each entry in it
// then the assign here is called N times, where N == array.size().
bool path_exists = false;
bool operation_result = false;
optional<JsonType> parsed_json = ShardJsonFromString(json_str);
if (!parsed_json) {
VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved";
return OpStatus::INVALID_JSON;
}
const JsonType& new_json = parsed_json.value();
// If the path exists, this callback will be called
auto mutate_cb = [&](std::optional<std::string_view>, JsonType* val) -> MutateCallbackResult<> {
path_exists = true;
if (!is_nx_condition) {
operation_result = true;
*val =
JsonType(new_json, std::pmr::polymorphic_allocator<char>{CompactObj::memory_resource()});
}
return {};
};
// If the path doesn't exist, this callback will be called
auto insert_cb = [&](JsonType* json) {
// Set a new value if the path doesn't exist and the xx condition is not set.
if (!path_exists && !is_xx_condition) {
auto pointer = ConvertJsonPathToJsonPointer(json_path.Path());
if (!pointer) {
return OpStatus::SYNTAX_ERR;
}
std::error_code ec;
jsoncons::jsonpointer::add(*json, pointer.value(), new_json, ec);
if (ec) {
VLOG(1) << "Failed to add a JSON value to the following path: " << path
<< " with the error: " << ec.message();
return OpStatus::SYNTAX_ERR;
}
operation_result = true;
}
return OpStatus::OK;
};
// JsonMutateOperation uses it's own JsonMemTracker. It will work, because updates to already
// existing json keys use copy assign, so we don't really need to account for the memory
// allocated by ShardJsonFromString above since it's not being moved here at all.
auto res = JsonMutateOperation<Nothing>(op_args, key, json_path, std::move(mutate_cb),
MutateOperationOptions{std::move(insert_cb)});
RETURN_ON_BAD_STATUS(res);
return operation_result;
return SetPartialJson(op_args, key, json_path, json_str, is_nx_condition, is_xx_condition);
}
OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,

View file

@ -0,0 +1,108 @@
// Copyright 2025, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "server/command_registry.h"
#include "server/json_family.h"
#include "server/test_utils.h"
using namespace testing;
using namespace std;
using namespace util;
ABSL_DECLARE_FLAG(bool, jsonpathv2);
namespace dfly {
class JsonFamilyMemoryTest : public BaseFamilyTest {
public:
static dfly::MiMemoryResource* GetMemoryResource() {
static thread_local mi_heap_t* heap = mi_heap_new();
static thread_local dfly::MiMemoryResource memory_resource{heap};
return &memory_resource;
}
protected:
auto GetJsonMemoryUsageFromDb(std::string_view key) {
return Run({"MEMORY", "USAGE", key, "WITHOUTKEY"});
}
};
size_t GetMemoryUsage() {
return static_cast<MiMemoryResource*>(JsonFamilyMemoryTest::GetMemoryResource())->used();
}
size_t GetJsonMemoryUsageFromString(std::string_view json_str) {
size_t start = GetMemoryUsage();
auto json = dfly::JsonFromString(json_str, JsonFamilyMemoryTest::GetMemoryResource());
if (!json) {
return 0;
}
// The same behaviour as in CompactObj
void* ptr =
JsonFamilyMemoryTest::GetMemoryResource()->allocate(sizeof(JsonType), alignof(JsonType));
JsonType* json_on_heap = new (ptr) JsonType(std::move(json).value());
DCHECK(json_on_heap);
size_t result = GetMemoryUsage() - start;
// Free the memory
json_on_heap->~JsonType();
JsonFamilyMemoryTest::GetMemoryResource()->deallocate(json_on_heap, sizeof(JsonType),
alignof(JsonType));
return result;
}
TEST_F(JsonFamilyMemoryTest, SimpleSet) {
std::string_view big_json = R"({"a":"some big string asdkasdkasdfkkasjdkfjka"})";
size_t start_size = GetJsonMemoryUsageFromString(big_json);
auto resp = Run({"JSON.SET", "j1", "$", big_json});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(start_size));
std::string_view small_json = R"({"a":" "})";
size_t next_size = GetJsonMemoryUsageFromString(small_json);
resp = Run({"JSON.SET", "j1", "$", small_json});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(next_size));
// Again set big json
resp = Run({"JSON.SET", "j1", "$", big_json});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(start_size));
}
TEST_F(JsonFamilyMemoryTest, PartialSet) {
std::string_view start_json = R"({"a":"some text", "b":" "})";
size_t start_size = GetJsonMemoryUsageFromString(start_json);
auto resp = Run({"JSON.SET", "j1", "$", start_json});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(start_size));
std::string_view json_after_set = R"({"a":"some text", "b":"some another text"})";
size_t size_after_set = GetJsonMemoryUsageFromString(json_after_set);
resp = Run({"JSON.SET", "j1", "$.b", "\"some another text\""});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(size_after_set));
// Again set start json
resp = Run({"JSON.SET", "j1", "$.b", "\" \""});
EXPECT_EQ(resp, "OK");
resp = GetJsonMemoryUsageFromDb("j1");
EXPECT_THAT(resp, IntArg(start_size));
}
} // namespace dfly

View file

@ -2753,4 +2753,18 @@ TEST_F(SearchFamilyTest, RenameDocumentBetweenIndices) {
EXPECT_EQ(Run({"rename", "idx2:{doc}1", "idx1:{doc}1"}), "OK");
}
TEST_F(SearchFamilyTest, JsonSetIndexesBug) {
auto resp = Run({"JSON.SET", "j1", "$", R"({"text":"some text"})"});
EXPECT_THAT(resp, "OK");
resp = Run(
{"FT.CREATE", "index", "ON", "json", "SCHEMA", "$.text", "AS", "text", "TEXT", "SORTABLE"});
EXPECT_THAT(resp, "OK");
resp = Run({"JSON.SET", "j1", "$", R"({"asd}"})"});
EXPECT_THAT(resp, ErrArg("ERR failed to parse JSON"));
resp = Run({"FT.AGGREGATE", "index", "*", "GROUPBY", "1", "@text"});
EXPECT_THAT(resp, IsUnordArrayWithSize(IsMap("text", "some text")));
}
} // namespace dfly