From f3426bd559db66b24626f36191324b2ea9ea7171 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 8 Jan 2025 20:05:50 +0200 Subject: [PATCH] chore: update jsoncons version to 0.178 (#4368) Signed-off-by: Roman Gershman --- src/CMakeLists.txt | 4 +- src/core/compact_object.cc | 7 ++++ src/server/json_family.cc | 68 ++++++++++++++-------------------- tests/dragonfly/memory_test.py | 2 +- 4 files changed, 37 insertions(+), 44 deletions(-) diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b0611d0dc..4f15e2af2 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -74,10 +74,8 @@ set(REFLEX "${THIRD_PARTY_LIB_DIR}/reflex/bin/reflex") add_third_party( jsoncons GIT_REPOSITORY https://github.com/dragonflydb/jsoncons - # URL https://github.com/danielaparker/jsoncons/archive/refs/tags/v0.171.1.tar.gz - GIT_TAG Dragonfly + GIT_TAG Dragonfly.178 GIT_SHALLOW 1 - # PATCH_COMMAND patch -p1 -i "${CMAKE_SOURCE_DIR}/patches/jsoncons-v0.171.0.patch" CMAKE_PASS_FLAGS "-DJSONCONS_BUILD_TESTS=OFF -DJSONCONS_HAS_POLYMORPHIC_ALLOCATOR=ON" LIB "none" ) diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index 261a7f4bd..9b79315ed 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -878,6 +878,9 @@ void CompactObj::SetJson(JsonType&& j) { if (taglen_ == JSON_TAG && JsonEnconding() == kEncodingJsonCons) { DCHECK(u_.json_obj.cons.json_ptr != nullptr); // must be allocated u_.json_obj.cons.json_ptr->swap(j); + DCHECK(jsoncons::is_trivial_storage(u_.json_obj.cons.json_ptr->storage_kind()) || + u_.json_obj.cons.json_ptr->get_allocator().resource() == tl.local_mr); + // We do not set bytes_used as this is needed. Consider the two following cases: // 1. old json contains 50 bytes. The delta for new one is 50, so the total bytes // the new json occupies is 100. @@ -889,6 +892,10 @@ void CompactObj::SetJson(JsonType&& j) { SetMeta(JSON_TAG); u_.json_obj.cons.json_ptr = AllocateMR(std::move(j)); + + // With trivial storage json_ptr->get_allocator() throws an exception. + DCHECK(jsoncons::is_trivial_storage(u_.json_obj.cons.json_ptr->storage_kind()) || + u_.json_obj.cons.json_ptr->get_allocator().resource() == tl.local_mr); u_.json_obj.cons.bytes_used = 0; } diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 39164ef1a..c11c549ae 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -319,9 +319,18 @@ bool JsonAreEquals(const JsonType& lhs, const JsonType& rhs) { } } -template +// Use this method on the coordinator thread +std::optional JsonFromString(std::string_view input) { + return dfly::JsonFromString(input, PMR_NS::get_default_resource()); +} + +// Use this method on the shard thread +std::optional ShardJsonFromString(std::string_view input) { + return dfly::JsonFromString(input, CompactObj::memory_resource()); +} + OpResult SetJson(const OpArgs& op_args, string_view key, - Callback parse_json_value) { + string_view json_str) { auto& db_slice = op_args.GetDbSlice(); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); @@ -331,17 +340,20 @@ OpResult SetJson(const OpArgs& op_args, string_view ke op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second); - OpResult json_value = parse_json_value(); - RETURN_ON_BAD_STATUS(json_value); + std::optional parsed_json = ShardJsonFromString(json_str); + if (!parsed_json) { + VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; + return OpStatus::INVALID_VALUE; + } if (JsonEnconding() == kEncodingJsonFlat) { flexbuffers::Builder fbb; - json::FromJsonType(json_value.value(), &fbb); + json::FromJsonType(*parsed_json, &fbb); fbb.Finish(); const auto& buf = fbb.GetBuffer(); res.it->second.SetJson(buf.data(), buf.size()); } else { - res.it->second.SetJson(*std::move(json_value)); + res.it->second.SetJson(std::move(*parsed_json)); } op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second); return std::move(res); @@ -388,16 +400,6 @@ string JsonTypeToName(const JsonType& val) { return std::string{}; } -// Use this method on the coordinator thread -std::optional JsonFromString(std::string_view input) { - return dfly::JsonFromString(input, PMR_NS::get_default_resource()); -} - -// Use this method on the shard thread -std::optional ShardJsonFromString(std::string_view input) { - return dfly::JsonFromString(input, CompactObj::memory_resource()); -} - OpResult GetJson(const OpArgs& op_args, string_view key) { auto it_res = op_args.GetDbSlice().FindReadOnly(op_args.db_cntx, key, OBJ_JSON); if (!it_res.ok()) @@ -953,8 +955,8 @@ OpResult OpDel(const OpArgs& op_args, string_view key, string_view path, if (json_path.HoldsJsonPath()) { const json::Path& path = json_path.AsJsonPath(); - long deletions = json::MutatePath( - path, [](optional, JsonType* val) { return true; }, json_val); + long deletions = + json::MutatePath(path, [](optional, JsonType* val) { return true; }, json_val); return deletions; } @@ -1308,15 +1310,6 @@ auto OpResp(const OpArgs& op_args, string_view key, const WrappedJsonPath& json_ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, const WrappedJsonPath& json_path, std::string_view json_str, bool is_nx_condition, bool is_xx_condition) { - auto parse_json = [&json_str]() -> OpResult { - std::optional parsed_json = ShardJsonFromString(json_str); - if (!parsed_json) { - VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; - return OpStatus::SYNTAX_ERR; - } - return parsed_json.value(); - }; - // The whole key should be replaced. // NOTE: unlike in Redis, we are overriding the value when the path is "$" // this is regardless of the current key type. In redis if the key exists @@ -1335,13 +1328,7 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, } JsonMemTracker mem_tracker; - // We need to deep copy parsed_json.value() and not use move! The reason is that otherwise - // it's really difficult to properly track memory deltas because even if we move below, - // the deallocation of parsed_json won't happen in the scope of SetJson but in the scope - // of this function. Because of this, the memory tracking will be off. Another solution here, - // is to use absl::Cleanup and dispatch another Find() but that's too complicated because then - // you need to take into account the order of destructors. - OpResult st = SetJson(op_args, key, std::move(parse_json)); + OpResult st = SetJson(op_args, key, json_str); RETURN_ON_BAD_STATUS(st); mem_tracker.SetJsonSize(st->it->second, st->is_new); return true; @@ -1355,18 +1342,19 @@ OpResult OpSet(const OpArgs& op_args, string_view key, string_view path, bool path_exists = false; bool operation_result = false; - OpResult parsed_json = parse_json(); - RETURN_ON_BAD_STATUS(parsed_json); + optional parsed_json = ShardJsonFromString(json_str); + if (!parsed_json) { + VLOG(1) << "got invalid JSON string '" << json_str << "' cannot be saved"; + return OpStatus::INVALID_VALUE; + } const JsonType& new_json = parsed_json.value(); auto cb = [&](std::optional, JsonType* val) -> MutateCallbackResult<> { path_exists = true; if (!is_nx_condition) { operation_result = true; - static_assert( - std::is_same_v::propagate_on_container_copy_assignment, - std::false_type>); - *val = new_json; + *val = + JsonType(new_json, std::pmr::polymorphic_allocator{CompactObj::memory_resource()}); } return {}; }; diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index 37cd11318..d871371df 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -43,7 +43,7 @@ async def test_rss_used_mem_gap(df_factory, type, keys, val_size, elements): await asyncio.sleep(1) # Wait for another RSS heartbeat update in Dragonfly cmd = f"DEBUG POPULATE {keys} k {val_size} RAND TYPE {type} ELEMENTS {elements}" - print(f"Running {cmd}") + logging.info(f"Running {cmd}") await client.execute_command(cmd) await asyncio.sleep(2) # Wait for another RSS heartbeat update in Dragonfly