mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: implement path mutation for JsonFlat (#2805)
* chore: implement path mutation for JsonFlat This is done by converting the flat blob into mutable c++ json and then building a new flat blob. Also, add JsonFlat encoding to CompactObject class. --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
845f01c7fd
commit
c8426cfd31
6 changed files with 106 additions and 58 deletions
|
@ -697,22 +697,33 @@ std::optional<int64_t> CompactObj::TryGetInt() const {
|
|||
|
||||
auto CompactObj::GetJson() const -> JsonType* {
|
||||
if (ObjType() == OBJ_JSON) {
|
||||
DCHECK_EQ(u_.json_obj.encoding, kEncodingJsonCons);
|
||||
return u_.json_obj.json_ptr;
|
||||
}
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
void CompactObj::SetJson(JsonType&& j) {
|
||||
if (taglen_ == JSON_TAG) { // already json
|
||||
if (taglen_ == JSON_TAG && u_.json_obj.encoding == kEncodingJsonCons) {
|
||||
// already json
|
||||
DCHECK(u_.json_obj.json_ptr != nullptr); // must be allocated
|
||||
*u_.json_obj.json_ptr = std::move(j);
|
||||
u_.json_obj.json_ptr->swap(j);
|
||||
} else {
|
||||
SetMeta(JSON_TAG);
|
||||
void* ptr = tl.local_mr->allocate(sizeof(JsonType), kAlignSize);
|
||||
void* ptr = tl.local_mr->allocate(sizeof(JsonType), alignof(JsonType));
|
||||
u_.json_obj.json_ptr = new (ptr) JsonType(std::move(j));
|
||||
u_.json_obj.encoding = kEncodingJsonCons;
|
||||
}
|
||||
}
|
||||
|
||||
void CompactObj::SetJson(const uint8_t* buf, size_t len) {
|
||||
SetMeta(JSON_TAG);
|
||||
u_.json_obj.flat_ptr = (uint8_t*)tl.local_mr->allocate(len, kAlignSize);
|
||||
memcpy(u_.json_obj.flat_ptr, buf, len);
|
||||
u_.json_obj.encoding = kEncodingJsonFlat;
|
||||
u_.json_obj.json_len = len;
|
||||
}
|
||||
|
||||
void CompactObj::SetSBF(uint64_t initial_capacity, double fp_prob, double grow_factor) {
|
||||
if (taglen_ == SBF_TAG) { // already json
|
||||
*u_.sbf = SBF(initial_capacity, fp_prob, grow_factor, tl.local_mr);
|
||||
|
@ -1009,9 +1020,12 @@ void CompactObj::Free() {
|
|||
tl.small_str_bytes -= u_.small_str.MallocUsed();
|
||||
u_.small_str.Free();
|
||||
} else if (taglen_ == JSON_TAG) {
|
||||
VLOG(1) << "Freeing JSON object";
|
||||
u_.json_obj.json_ptr->~JsonType();
|
||||
tl.local_mr->deallocate(u_.json_obj.json_ptr, sizeof(JsonType), kAlignSize);
|
||||
DVLOG(1) << "Freeing JSON object";
|
||||
if (u_.json_obj.encoding == kEncodingJsonCons) {
|
||||
DeleteMR<JsonType>(u_.json_obj.json_ptr);
|
||||
} else {
|
||||
tl.local_mr->deallocate(u_.json_obj.flat_ptr, u_.json_obj.json_len, kAlignSize);
|
||||
}
|
||||
} else if (taglen_ == SBF_TAG) {
|
||||
DeleteMR<SBF>(u_.sbf);
|
||||
} else {
|
||||
|
|
|
@ -18,6 +18,8 @@ constexpr unsigned kEncodingIntSet = 0;
|
|||
constexpr unsigned kEncodingStrMap = 1; // for set/map encodings of strings
|
||||
constexpr unsigned kEncodingStrMap2 = 2; // for set/map encodings of strings using DenseSet
|
||||
constexpr unsigned kEncodingListPack = 3;
|
||||
constexpr unsigned kEncodingJsonCons = 0;
|
||||
constexpr unsigned kEncodingJsonFlat = 1;
|
||||
|
||||
class SBF;
|
||||
|
||||
|
@ -296,6 +298,7 @@ class CompactObj {
|
|||
// you need to move an object that created with the function JsonFromString
|
||||
// into here, no copying is allowed!
|
||||
void SetJson(JsonType&& j);
|
||||
void SetJson(const uint8_t* buf, size_t len);
|
||||
|
||||
// pre condition - the type here is OBJ_JSON and was set with SetJson
|
||||
JsonType* GetJson() const;
|
||||
|
@ -380,9 +383,13 @@ class CompactObj {
|
|||
} __attribute__((packed));
|
||||
|
||||
struct JsonWrapper {
|
||||
JsonType* json_ptr = nullptr;
|
||||
size_t unneeded = 0;
|
||||
} __attribute__((packed));
|
||||
union {
|
||||
JsonType* json_ptr;
|
||||
uint8_t* flat_ptr;
|
||||
};
|
||||
uint32_t json_len = 0;
|
||||
uint8_t encoding = 0;
|
||||
};
|
||||
|
||||
// My main data structure. Union of representations.
|
||||
// RobjWrapper is kInlineLen=16 bytes, so we employ SSO of that size via inline_str.
|
||||
|
@ -393,7 +400,9 @@ class CompactObj {
|
|||
|
||||
SmallString small_str;
|
||||
detail::RobjWrapper r_obj;
|
||||
JsonWrapper json_obj;
|
||||
|
||||
// using 'packed' to reduce alignement of U to 1.
|
||||
JsonWrapper json_obj __attribute__((packed));
|
||||
SBF* sbf __attribute__((packed));
|
||||
int64_t ival __attribute__((packed));
|
||||
ExternalPtr ext_ptr;
|
||||
|
|
|
@ -375,43 +375,60 @@ TYPED_TEST(JsonPathTest, Mutate) {
|
|||
ASSERT_EQ(0, this->Parse("$[*]"));
|
||||
Path path = this->driver_.TakePath();
|
||||
|
||||
// Currently this code compiles only for JsonType.
|
||||
if constexpr (std::is_same_v<TypeParam, JsonType>) {
|
||||
TypeParam json = ValidJson<TypeParam>(R"([1, 2, 3, 5, 6])");
|
||||
MutateCallback cb = [&](optional<string_view>, JsonType* val) {
|
||||
int intval = val->as<int>();
|
||||
*val = intval + 1;
|
||||
return false;
|
||||
};
|
||||
TypeParam json = ValidJson<TypeParam>(R"([1, 2, 3, 5, 6])");
|
||||
MutateCallback cb = [&](optional<string_view>, JsonType* val) {
|
||||
int intval = val->as<int>();
|
||||
*val = intval + 1;
|
||||
return false;
|
||||
};
|
||||
|
||||
vector<int> arr;
|
||||
|
||||
if constexpr (std::is_same_v<TypeParam, JsonType>) {
|
||||
MutatePath(path, cb, &json);
|
||||
vector<int> arr;
|
||||
|
||||
for (JsonType& el : json.array_range()) {
|
||||
arr.push_back(to_int(el));
|
||||
}
|
||||
ASSERT_THAT(arr, ElementsAre(2, 3, 4, 6, 7));
|
||||
} else {
|
||||
flexbuffers::Builder fbb;
|
||||
MutatePath(path, cb, json, &fbb);
|
||||
FlatJson fj = flexbuffers::GetRoot(fbb.GetBuffer());
|
||||
auto vec = fj.AsVector();
|
||||
for (unsigned i = 0; i < vec.size(); ++i) {
|
||||
arr.push_back(to_int(vec[i]));
|
||||
}
|
||||
}
|
||||
ASSERT_THAT(arr, ElementsAre(2, 3, 4, 6, 7));
|
||||
|
||||
json = ValidJson<TypeParam>(R"(
|
||||
{"a":[7], "inner": {"a": {"bool": true, "c": 42}}}
|
||||
)");
|
||||
ASSERT_EQ(0, this->Parse("$..a.*"));
|
||||
path = this->driver_.TakePath();
|
||||
MutatePath(
|
||||
path,
|
||||
[&](optional<string_view> key, JsonType* val) {
|
||||
if (val->is_int64() && !key) { // array element
|
||||
*val = 42;
|
||||
return false;
|
||||
}
|
||||
if (val->is_bool()) {
|
||||
*val = false;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
},
|
||||
&json);
|
||||
json = ValidJson<TypeParam>(R"(
|
||||
{"a":[7], "inner": {"a": {"bool": true, "c": 42}}}
|
||||
)");
|
||||
ASSERT_EQ(0, this->Parse("$..a.*"));
|
||||
path = this->driver_.TakePath();
|
||||
|
||||
ASSERT_EQ(R"({"a":[42],"inner":{"a":{"bool":false}}})", json.to_string());
|
||||
MutateCallback cb2 = [&](optional<string_view> key, JsonType* val) {
|
||||
if (val->is_int64() && !key) { // array element
|
||||
*val = 42;
|
||||
return false;
|
||||
}
|
||||
if (val->is_bool()) {
|
||||
*val = false;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
auto expected = ValidJson<JsonType>(R"({"a":[42],"inner":{"a":{"bool":false}}})");
|
||||
if constexpr (std::is_same_v<TypeParam, JsonType>) {
|
||||
MutatePath(path, cb2, &json);
|
||||
|
||||
ASSERT_EQ(expected, json);
|
||||
} else {
|
||||
flexbuffers::Builder fbb;
|
||||
MutatePath(path, cb2, json, &fbb);
|
||||
FlatJson fj = flexbuffers::GetRoot(fbb.GetBuffer());
|
||||
ASSERT_EQ(expected, FromFlat(fj));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -261,4 +261,16 @@ void FromJsonType(const JsonType& src, flexbuffers::Builder* fbb) {
|
|||
fbb->EndVector(start, false, false);
|
||||
}
|
||||
|
||||
unsigned MutatePath(const Path& path, MutateCallback callback, FlatJson json,
|
||||
flexbuffers::Builder* fbb) {
|
||||
JsonType mut_json = FromFlat(json);
|
||||
unsigned res = MutatePath(path, std::move(callback), &mut_json);
|
||||
if (res) {
|
||||
FromJsonType(mut_json, fbb);
|
||||
fbb->Finish();
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace dfly::json
|
||||
|
|
|
@ -111,6 +111,8 @@ void EvaluatePath(const Path& path, FlatJson json, PathFlatCallback callback);
|
|||
|
||||
// returns number of matches found with the given path.
|
||||
unsigned MutatePath(const Path& path, MutateCallback callback, JsonType* json);
|
||||
unsigned MutatePath(const Path& path, MutateCallback callback, FlatJson json,
|
||||
flexbuffers::Builder* fbb);
|
||||
|
||||
// utility function to parse a jsonpath. Returns an error message if a parse error was
|
||||
// encountered.
|
||||
|
|
|
@ -87,8 +87,15 @@ facade::OpStatus SetJson(const OpArgs& op_args, string_view key, JsonType&& valu
|
|||
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);
|
||||
|
||||
res.it->second.SetJson(std::move(value));
|
||||
|
||||
if (absl::GetFlag(FLAGS_experimental_flat_json)) {
|
||||
flexbuffers::Builder fbb;
|
||||
json::FromJsonType(value, &fbb);
|
||||
fbb.Finish();
|
||||
const auto& buf = fbb.GetBuffer();
|
||||
res.it->second.SetJson(buf.data(), buf.size());
|
||||
} else {
|
||||
res.it->second.SetJson(std::move(value));
|
||||
}
|
||||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
@ -1226,22 +1233,9 @@ OpResult<bool> OpSet(const OpArgs& op_args, string_view key, string_view path,
|
|||
}
|
||||
}
|
||||
|
||||
if (absl::GetFlag(FLAGS_experimental_flat_json)) {
|
||||
flatbuffers::Parser parser;
|
||||
flexbuffers::Builder fbb;
|
||||
string tmp(json_str);
|
||||
CHECK_EQ(json_str.size(), strlen(tmp.c_str()));
|
||||
|
||||
parser.ParseFlexBuffer(tmp.c_str(), nullptr, &fbb);
|
||||
fbb.Finish();
|
||||
const auto& buffer = fbb.GetBuffer();
|
||||
string_view buf_view{reinterpret_cast<const char*>(buffer.data()), buffer.size()};
|
||||
SetCmd scmd(op_args, false);
|
||||
scmd.Set(SetCmd::SetParams{}, key, buf_view);
|
||||
} else {
|
||||
if (SetJson(op_args, key, std::move(parsed_json.value())) == OpStatus::OUT_OF_MEMORY) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
OpStatus st = SetJson(op_args, key, std::move(parsed_json.value()));
|
||||
if (st != OpStatus::OK) {
|
||||
return st;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue