fix(json_family): Fix memory tracking for JSON mutate operations

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2025-05-06 09:58:09 +02:00
parent 2b4b2a4ac6
commit 91ef2af169
No known key found for this signature in database
GPG key ID: 92DAADC9F15EE26E

View file

@ -748,47 +748,19 @@ OpResult<JsonCallbackResult<T>> JsonReadOnlyOperation(const OpArgs& op_args, std
return json_path.ExecuteReadOnlyCallback<T>(json_val, cb, options.cb_result_options); return json_path.ExecuteReadOnlyCallback<T>(json_val, cb, options.cb_result_options);
} }
struct MutateOperationOptions {
using PostMutateCallback = absl::FunctionRef<OpStatus(JsonType*)>;
std::optional<PostMutateCallback> post_mutate_cb;
CallbackResultOptions cb_result_options = CallbackResultOptions::DefaultMutateOptions();
};
template <typename T> template <typename T>
OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(const OpArgs& op_args, OpResult<JsonCallbackResult<optional<T>>> JsonMutateOperation(
std::string_view key, const OpArgs& op_args, std::string_view key, const WrappedJsonPath& json_path,
const WrappedJsonPath& json_path, JsonPathMutateCallback<T> cb,
JsonPathMutateCallback<T> cb, CallbackResultOptions cb_result_options = CallbackResultOptions::DefaultMutateOptions()) {
MutateOperationOptions options = {}) {
auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON); auto it_res = op_args.GetDbSlice().FindMutable(op_args.db_cntx, key, OBJ_JSON);
RETURN_ON_BAD_STATUS(it_res); RETURN_ON_BAD_STATUS(it_res);
JsonMemTracker mem_tracker; JsonAutoUpdater updater(op_args, key, *std::move(it_res));
PrimeValue& pv = it_res->it->second; auto mutate_res = json_path.ExecuteMutateCallback(updater.GetJson(), cb, cb_result_options);
JsonType* json_val = pv.GetJson(); updater.SetJsonSize();
DCHECK(json_val) << "should have a valid JSON object for key '" << key << "' the type for it is '"
<< pv.ObjType() << "'";
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, pv);
auto mutate_res = json_path.ExecuteMutateCallback(json_val, cb, options.cb_result_options);
// Call post mutate callback
if (mutate_res && options.post_mutate_cb) {
auto res = options.post_mutate_cb.value()(json_val);
// We can not return result here, because we need to update the size
if (res != OpStatus::OK) {
mutate_res = res;
}
}
// we need to manually run this before the PostUpdater run
mem_tracker.SetJsonSize(pv, false);
it_res->post_updater.Run();
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
return mutate_res; return mutate_res;
} }
@ -1223,7 +1195,7 @@ auto OpArrPop(const OpArgs& op_args, string_view key, WrappedJsonPath& path, int
return {false, std::move(str)}; return {false, std::move(str)};
}; };
return JsonMutateOperation<std::string>(op_args, key, path, std::move(cb), return JsonMutateOperation<std::string>(op_args, key, path, std::move(cb),
{{}, CallbackResultOptions{OnEmpty::kSendNil}}); CallbackResultOptions{OnEmpty::kSendNil});
} }
// Returns numeric vector that represents the new length of the array at each path. // Returns numeric vector that represents the new length of the array at each path.