chore: switch json object to pmr allocator (#2483)

* chore: switch json object to pmr allocator

1. Move json object creation code to the shard-thread inside rdb_load
2. Now json_family never references "json" type, always dfly::JsonType
3. Switch JsonType to pmr::json.
4. Make sure we pass the correct memory_resource when creating json object from string.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-01-26 12:47:46 +02:00 committed by GitHub
parent a5b9401449
commit ea5955962e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 96 additions and 153 deletions

View file

@ -577,14 +577,15 @@ TEST_F(CompactObjectTest, JsonTypeWithPathTest) {
ASSERT_TRUE(json_array.has_value());
cobj_.SetJson(std::move(json_array.value()));
ASSERT_TRUE(cobj_.ObjType() == OBJ_JSON); // and now this is a JSON type
auto f = [](const std::string& /*path*/, JsonType& book) {
auto f = [](const auto& /*path*/, JsonType& book) {
if (book.at("category") == "memoir" && !book.contains("price")) {
book.try_emplace("price", 140.0);
}
};
JsonType* json = cobj_.GetJson();
ASSERT_TRUE(json != nullptr);
jsonpath::json_replace(*json, "$.books[*]", f);
auto allocator_set = jsoncons::combine_allocators(json->get_allocator());
jsonpath::json_replace(allocator_set, *json, "$.books[*]"sv, f);
// Check whether we've changed the entry for json in place
// we should have prices only for memoir books

View file

@ -1,28 +1,26 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "core/json_object.h"
extern "C" {
#include "redis/object.h"
#include "redis/zmalloc.h"
}
#include <jsoncons/json.hpp>
#include "base/logging.h"
#include "core/compact_object.h"
using namespace jsoncons;
using namespace std;
namespace dfly {
std::optional<JsonType> JsonFromString(std::string_view input) {
using namespace jsoncons;
std::error_code ec;
optional<JsonType> JsonFromString(string_view input) {
error_code ec;
auto JsonErrorHandler = [](json_errc ec, const ser_context&) {
VLOG(1) << "Error while decode JSON: " << make_error_code(ec).message();
return false;
};
json_decoder<JsonType> decoder;
json_decoder<JsonType> decoder(
std::pmr::polymorphic_allocator<char>{CompactObj::memory_resource()});
json_parser parser(basic_json_decode_options<char>{}, JsonErrorHandler);
parser.update(input);
@ -31,7 +29,7 @@ std::optional<JsonType> JsonFromString(std::string_view input) {
if (decoder.is_valid()) {
return decoder.get_result();
}
return {};
return nullopt;
}
} // namespace dfly

View file

@ -4,32 +4,26 @@
#pragma once
#include <jsoncons/json.hpp>
#include <jsoncons_ext/jsonpath/jsonpath.hpp>
#include <memory>
#include <optional>
#include <string_view>
extern "C" {
#include "redis/redis_aux.h"
}
// Note about this file - once we have the issue with jsonpath in jsoncons resolved
// we would add the implementation for the allocator here as well. Right now this
// file is a little bit empty, but for external "users" such as json_family they
// should include this when creating JSON object from string that we're getting
// from the commands.
namespace jsoncons {
struct sorted_policy;
template <typename CharT, typename Policy, typename Allocator> class basic_json;
} // namespace jsoncons
namespace dfly {
// This is temporary, there is an issue right now with jsoncons about using jsonpath
// with custom allocator. once it would resolved, we would change this to use custom allocator
// that allocate memory from mimalloc
using JsonType = jsoncons::basic_json<char, jsoncons::sorted_policy, std::allocator<char>>;
using JsonType = jsoncons::pmr::json;
// Build a json object from string. If the string is not legal json, will return nullopt
std::optional<JsonType> JsonFromString(std::string_view input);
inline auto MakeJsonPathExpr(std::string_view path, std::error_code& ec)
-> jsoncons::jsonpath::jsonpath_expression<JsonType> {
return jsoncons::jsonpath::make_expression<JsonType, std::allocator<char>>(
jsoncons::allocator_set<JsonType::allocator_type, std::allocator<char>>(), path, ec);
}
} // namespace dfly

View file

@ -35,12 +35,17 @@ TEST_F(JsonTest, Basic) {
}
)";
json j = json::parse(data);
pmr::json j = pmr::json::parse(data);
EXPECT_TRUE(j.contains("reputons"));
jsonpath::json_replace(j, "$.reputons[*].rating", 1.1);
EXPECT_EQ(1.1, j["reputons"][0]["rating"].as_double());
}
TEST_F(JsonTest, SetEmpty) {
pmr::json dest{json_object_arg}; // crashes on UB without the tag.
dest["bar"] = "foo";
}
TEST_F(JsonTest, Query) {
json j = R"(
{"a":{}, "b":{"a":1}, "c":{"a":1, "b":2}}

View file

@ -1054,7 +1054,7 @@ uint64_t SortedMap::DfImpl::Scan(uint64_t cursor,
/***************************************************************************/
/* SortedMap */
/***************************************************************************/
SortedMap::SortedMap(PMR_NS::memory_resource* mr) : impl_(RdImpl()), mr_res_(mr) {
SortedMap::SortedMap(PMR_NS::memory_resource* mr) : impl_(RdImpl()) {
if (absl::GetFlag(FLAGS_use_zset_tree)) {
impl_ = DfImpl();
}

View file

@ -268,7 +268,6 @@ class SortedMap {
};
std::variant<RdImpl, DfImpl> impl_;
PMR_NS::memory_resource* mr_res_;
};
// Used by CompactObject.

View file

@ -10,7 +10,7 @@
namespace dfly {
class JournalReader;
struct JournalReader;
// Coordinator for multi shard execution.
class MultiShardExecution {

View file

@ -90,14 +90,17 @@ string JsonTypeToName(const JsonType& val) {
return std::string{};
}
JsonExpression ParseJsonPath(string_view path, error_code* ec) {
io::Result<JsonExpression> ParseJsonPath(string_view path) {
if (path == ".") {
// RedisJson V1 uses the dot for root level access.
// There are more incompatibilities with legacy paths which are not supported.
path = "$"sv;
}
return jsonpath::make_expression<JsonType>(path, *ec);
std::error_code ec;
JsonExpression res = MakeJsonPathExpr(path, ec);
if (ec)
return nonstd::make_unexpected(ec);
return res;
}
template <typename T>
@ -436,7 +439,7 @@ OpResult<string> OpJsonGet(const OpArgs& op_args, string_view key,
return expr ? expr->evaluate(json_entry) : json_entry;
};
json out;
JsonType out{json_object_arg}; // see https://github.com/danielaparker/jsoncons/issues/482
if (expressions.size() == 1) {
out = eval_wrapped(expressions[0].second);
} else {
@ -558,7 +561,7 @@ OpResult<string> OpDoubleArithmetic(const OpArgs& op_args, string_view key, stri
bool is_result_overflow = false;
double int_part;
bool has_fractional_part = (modf(num, &int_part) != 0);
json output(json_array_arg);
JsonType output(json_array_arg);
auto cb = [&](const auto&, JsonType& val) {
if (val.is_number()) {
@ -575,7 +578,7 @@ OpResult<string> OpDoubleArithmetic(const OpArgs& op_args, string_view key, stri
}
output.push_back(val);
} else {
output.push_back(json::null());
output.push_back(JsonType::null());
}
};
@ -624,12 +627,12 @@ OpResult<long> OpDel(const OpArgs& op_args, string_view key, string_view path) {
return total_deletions;
}
json patch(json_array_arg, {});
JsonType patch(json_array_arg, {});
reverse(deletion_items.begin(), deletion_items.end()); // deletion should finish at root keys.
for (const auto& item : deletion_items) {
string pointer = ConvertToJsonPointer(item);
total_deletions++;
json patch_item(json_object_arg, {{"op", "remove"}, {"path", pointer}});
JsonType patch_item(json_object_arg, {{"op", "remove"}, {"path", pointer}});
patch.emplace_back(patch_item);
}
@ -1140,6 +1143,18 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
} // namespace
// GCC extension of returning a value of multiple statements. The last statement is returned.
#define PARSE_PATH_ARG(path) \
({ \
io::Result<JsonExpression> expr_result = ParseJsonPath(path); \
if (!expr_result) { \
VLOG(1) << "Invalid JSONPath syntax: " << expr_result.error().message(); \
cntx->SendError(kSyntaxErr); \
return; \
} \
std::move(*expr_result); \
})
void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
@ -1186,14 +1201,7 @@ void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) {
path = ArgS(args, 1);
}
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpResp(t->GetOpArgs(shard), key, std::move(expression));
@ -1239,16 +1247,9 @@ void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
return;
}
error_code ec;
string_view key = ArgS(args, 1);
string_view path = ArgS(args, 2);
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return func(t->GetOpArgs(shard), key, std::move(expression));
@ -1267,15 +1268,8 @@ void JsonFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 1U);
error_code ec;
string_view path = ArgS(args, args.size() - 1);
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
Transaction* transaction = cntx->transaction;
unsigned shard_count = shard_set->size();
@ -1283,7 +1277,7 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
mget_resp[sid] = OpJsonMGet(ParseJsonPath(path, &ec), t, shard);
mget_resp[sid] = OpJsonMGet(*ParseJsonPath(path), t, shard);
return OpStatus::OK;
};
@ -1325,14 +1319,7 @@ void JsonFamily::ArrIndex(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
optional<JsonType> search_value = JsonFromString(ArgS(args, 2));
if (!search_value) {
@ -1487,14 +1474,7 @@ void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) {
}
}
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpArrPop(t->GetOpArgs(shard), key, path, index);
@ -1562,14 +1542,7 @@ void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpObjKeys(t->GetOpArgs(shard), key, std::move(expression));
@ -1680,14 +1653,7 @@ void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpType(t->GetOpArgs(shard), key, std::move(expression));
@ -1716,14 +1682,7 @@ void JsonFamily::ArrLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpArrLen(t->GetOpArgs(shard), key, std::move(expression));
@ -1743,14 +1702,7 @@ void JsonFamily::ObjLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpObjLen(t->GetOpArgs(shard), key, std::move(expression));
@ -1770,14 +1722,7 @@ void JsonFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view path = ArgS(args, 1);
error_code ec;
JsonExpression expression = ParseJsonPath(path, &ec);
if (ec) {
VLOG(1) << "Invalid JSONPath syntax: " << ec.message();
cntx->SendError(kSyntaxErr);
return;
}
JsonExpression expression = PARSE_PATH_ARG(path);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpStrLen(t->GetOpArgs(shard), key, std::move(expression));
@ -1822,12 +1767,13 @@ void JsonFamily::Get(CmdArgList args, ConnectionContext* cntx) {
string_view expr_str = parser.Next();
if (expr_str != ".") {
error_code ec;
expr = ParseJsonPath(expr_str, &ec);
if (ec) {
LOG(WARNING) << "path '" << expr_str << "': Invalid JSONPath syntax: " << ec.message();
io::Result<JsonExpression> res = ParseJsonPath(expr_str);
if (!res) {
LOG(WARNING) << "path '" << expr_str
<< "': Invalid JSONPath syntax: " << res.error().message();
return cntx->SendError(kSyntaxErr);
}
expr.emplace(std::move(*res));
}
expressions.emplace_back(expr_str, std::move(expr));

View file

@ -377,7 +377,6 @@ class RdbLoaderBase::OpaqueObjLoader {
void operator()(const base::PODArray<char>& str);
void operator()(const LzfString& lzfstr);
void operator()(const unique_ptr<LoadTrace>& ptr);
void operator()(const JsonType& jt);
std::error_code ec() const {
return ec_;
@ -462,10 +461,6 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr
}
}
void RdbLoaderBase::OpaqueObjLoader::operator()(const JsonType& json) {
pv_->SetJson(JsonType{json});
}
void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();
@ -1080,6 +1075,12 @@ void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
unsigned char* lp = (uint8_t*)zmalloc(bytes);
std::memcpy(lp, src_lp, bytes);
pv_->InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
} else if (rdb_type_ == RDB_TYPE_JSON) {
auto json = JsonFromString(blob);
if (!json) {
ec_ = RdbError(errc::bad_json_string);
}
pv_->SetJson(std::move(*json));
} else {
LOG(FATAL) << "Unsupported rdb type " << rdb_type_;
}
@ -1400,20 +1401,18 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadStreams();
break;
case RDB_TYPE_JSON:
iores = ReadJson();
break;
case RDB_TYPE_SET_LISTPACK:
// We need to deal with protocol versions 9 and older because in these
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
if (rdb_version_ < 10 && rdbtype == RDB_TYPE_JSON_OLD) {
if (rdb_version_ < 10) {
// consider it RDB_TYPE_JSON_OLD
iores = ReadJson();
break;
} else {
iores = ReadGeneric(rdbtype);
}
if (rdbtype == RDB_TYPE_JSON) {
iores = ReadJson();
break;
}
iores = ReadGeneric(rdbtype);
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
@ -1829,14 +1828,12 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
}
auto RdbLoaderBase::ReadJson() -> io::Result<OpaqueObj> {
string json_str;
SET_OR_UNEXPECT(FetchGenericString(), json_str);
RdbVariant dest;
error_code ec = ReadStringObj(&dest);
if (ec)
return make_unexpected(ec);
auto json = JsonFromString(json_str);
if (!json)
return Unexpected(errc::bad_json_string);
return OpaqueObj{std::move(*json), RDB_TYPE_JSON};
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
}
template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {

View file

@ -41,8 +41,8 @@ class RdbLoaderBase {
uint64_t uncompressed_len;
};
using RdbVariant = std::variant<long long, base::PODArray<char>, LzfString,
std::unique_ptr<LoadTrace>, JsonType>;
using RdbVariant =
std::variant<long long, base::PODArray<char>, LzfString, std::unique_ptr<LoadTrace>>;
struct OpaqueObj {
RdbVariant obj;

View file

@ -192,7 +192,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
case OBJ_MODULE:
return RDB_TYPE_MODULE_2;
case OBJ_JSON:
return RDB_TYPE_JSON_OLD;
return RDB_TYPE_JSON; // save with RDB_TYPE_JSON, deprecate RDB_TYPE_JSON_OLD after July
// 2024.
}
LOG(FATAL) << "Unknown encoding " << compact_enc << " for type " << type;
return 0; /* avoid warning */

View file

@ -172,7 +172,8 @@ JsonAccessor::JsonPathContainer* JsonAccessor::GetPath(std::string_view field) c
}
error_code ec;
auto path_expr = jsoncons::jsonpath::make_expression<JsonType>(field, ec);
auto path_expr = MakeJsonPathExpr(field, ec);
if (ec) {
LOG(WARNING) << "Invalid Json path: " << field << ' ' << ec.message();
return nullptr;

View file

@ -10,8 +10,6 @@
#include <absl/strings/str_format.h>
#include <atomic>
#include <jsoncons/json.hpp>
#include <jsoncons_ext/jsonpath/jsonpath.hpp>
#include <variant>
#include <vector>
@ -41,7 +39,7 @@ static const set<string_view> kIgnoredOptions = {"WEIGHT", "SEPARATOR"};
bool IsValidJsonPath(string_view path) {
error_code ec;
jsoncons::jsonpath::make_expression<JsonType>(path, ec);
MakeJsonPathExpr(path, ec);
return !ec;
}

View file

@ -16,6 +16,9 @@
#include "server/detail/table.h"
#include "server/top_keys.h"
extern "C" {
#include "redis/redis_aux.h"
}
namespace dfly {
using PrimeKey = detail::PrimeKey;