feat: global eval in exec (#1443)

Enables execution of global lua scripts inside multi/exec transactions if the defualt script config enables global execution for scripts. This change is only a fix and does not provide any safeguards against other execution scenarios (namely enabling globality with script flags). In the future, the proper execution mode should be determined more carefully by inspecting the scripts to be executed

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
This commit is contained in:
Vladislav 2023-07-01 22:12:05 +03:00 committed by GitHub
parent 542b9783b7
commit cfca751848
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 64 additions and 26 deletions

View file

@ -58,15 +58,18 @@ class StoredCmd {
struct ConnectionState {
// MULTI-EXEC transaction related data.
struct ExecInfo {
enum ExecState { EXEC_INACTIVE, EXEC_COLLECT, EXEC_ERROR };
enum ExecState { EXEC_INACTIVE, EXEC_COLLECT, EXEC_RUNNING, EXEC_ERROR };
ExecInfo() = default;
// ExecInfo is immovable due to being referenced from DbSlice.
ExecInfo(ExecInfo&&) = delete;
// Return true if ExecInfo is active (after MULTI)
bool IsActive() {
return state != EXEC_INACTIVE;
bool IsCollecting() const {
return state == EXEC_COLLECT;
}
bool IsRunning() const {
return state == EXEC_RUNNING;
}
// Resets to blank state after EXEC or DISCARD

View file

@ -679,7 +679,7 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionCon
string_view cmd_str = ArgS(args, 0);
absl::Cleanup multi_error([exec_info = &dfly_cntx->conn_state.exec_info] {
if (exec_info->IsActive()) {
if (exec_info->IsCollecting()) {
exec_info->state = ConnectionState::ExecInfo::EXEC_ERROR;
}
});
@ -724,7 +724,7 @@ bool Service::VerifyCommand(const CommandId* cid, CmdArgList args, ConnectionCon
}
bool is_write_cmd = cid->opt_mask() & CO::WRITE;
bool under_multi = dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd;
bool under_multi = dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd;
if (!etl.is_master && is_write_cmd && !dfly_cntx->is_replicating) {
(*dfly_cntx)->SendError("-READONLY You can't write against a read only replica.");
@ -790,6 +790,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
bool under_script = bool(dfly_cntx->conn_state.script_info);
bool under_multi = dfly_cntx->conn_state.exec_info.IsRunning();
if (VLOG_IS_ON(2) &&
cntx->owner()) { // owner may not exists in case of this being called from replica context
@ -806,10 +807,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (!VerifyCommand(cid, args, dfly_cntx))
return;
bool is_trans_cmd = CO::IsTransKind(cid->name());
etl.connection_stats.cmd_count_map[cid->name()]++;
auto args_no_cmd = args.subspan(1);
if (dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd) {
bool is_trans_cmd = CO::IsTransKind(cid->name());
if (dfly_cntx->conn_state.exec_info.IsCollecting() && !is_trans_cmd) {
// TODO: protect against aggregating huge transactions.
StoredCmd stored_cmd{cid, args_no_cmd};
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
@ -823,7 +825,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Create command transaction
intrusive_ptr<Transaction> dist_trans;
if (under_script) {
bool dispatching_in_multi = under_script || under_multi;
if (dispatching_in_multi) {
DCHECK(dfly_cntx->transaction);
if (cid->IsTransactional()) {
dfly_cntx->transaction->MultiSwitchCmd(cid);
@ -857,7 +861,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
// Collect stats for all regular transactions and all multi transactions from scripts, except EVAL
// itself. EXEC does not use DispatchCommand for dispatching.
bool collect_stats =
dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || under_script);
dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || dispatching_in_multi);
if (!InvokeCmd(args.subspan(1), cid, dfly_cntx, collect_stats)) {
dfly_cntx->reply_builder()->SendError("Internal Error");
dfly_cntx->reply_builder()->CloseConnection();
@ -866,7 +870,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
end_usec = ProactorBase::GetMonotonicTimeNs();
request_latency_usec.IncBy(cid->name(), (end_usec - start_usec) / 1000);
if (!under_script) {
if (!dispatching_in_multi) {
dfly_cntx->transaction = nullptr;
}
}
@ -1060,7 +1064,7 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_info.IsActive()) {
if (cntx->conn_state.exec_info.IsCollecting()) {
return (*cntx)->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
@ -1263,6 +1267,12 @@ bool StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams param
Transaction* trans) {
Transaction::MultiMode multi_mode = DetermineMultiMode(params);
// Check if eval is already part of a running multi transaction
if (trans->GetMultiMode() != Transaction::NOT_DETERMINED) {
DCHECK_LE(trans->GetMultiMode(), multi_mode); // Check the transaction covers our requirements
return false;
}
if (keys.empty() && multi_mode == Transaction::LOCK_AHEAD)
return false;
@ -1365,7 +1375,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
if (!cntx->conn_state.exec_info.IsActive()) {
if (!cntx->conn_state.exec_info.IsCollecting()) {
return rb->SendError("DISCARD without MULTI");
}
@ -1451,9 +1461,9 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {
// Return true if transaction was scheduled, false if scheduling was not required.
bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
CmdArgVec* tmp_keys) {
bool global = false;
bool transactional = false;
CmdArgVec* tmp_keys, bool global_scripts) {
bool global = global_scripts;
bool transactional = global_scripts;
for (const auto& scmd : exec_info->body) {
transactional |= scmd.Cid()->IsTransactional();
global |= scmd.Cid()->opt_mask() & CO::GLOBAL_TRANS;
@ -1491,7 +1501,7 @@ bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo*
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
if (!cntx->conn_state.exec_info.IsActive()) {
if (!cntx->conn_state.exec_info.IsCollecting()) {
return rb->SendError("EXEC without MULTI");
}
@ -1511,13 +1521,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
ExecEvalState state = DetermineEvalPresense(exec_info.body);
if (state == ExecEvalState::SOME) {
auto error =
"Dragonfly does not allow execution of a server-side Lua script inside "
"MULTI/EXEC block";
return rb->SendError(error);
}
CmdArgVec arg_vec, tmp_keys;
@ -1550,7 +1553,11 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
return;
}
bool scheduled = StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys);
// Check if script most LIKELY has global eval transactions
bool global_script = (state == ExecEvalState::SOME) && script_mgr()->AreGlobalByDefault();
bool scheduled =
StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys, global_script);
// EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) {
@ -1558,12 +1565,14 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
return rb->SendNull();
}
exec_info.state = ConnectionState::ExecInfo::EXEC_RUNNING;
VLOG(1) << "StartExec " << exec_info.body.size();
SinkReplyBuilder::ReplyAggregator agg(rb);
rb->StartArray(exec_info.body.size());
if (!exec_info.body.empty()) {
if (absl::GetFlag(FLAGS_multi_exec_squash)) {
if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx);
} else {
for (auto& scmd : exec_info.body) {

View file

@ -59,6 +59,8 @@ TEST_F(MultiTest, VerifyConstants) {
}
TEST_F(MultiTest, MultiAndEval) {
GTEST_SKIP() << "Eval is allowed in multi experimentally";
ShardId sid1 = Shard(kKey1, num_threads_ - 1);
ShardId sid2 = Shard(kKey2, num_threads_ - 1);
ShardId sid3 = Shard(kKey3, num_threads_ - 1);

View file

@ -282,6 +282,10 @@ void ScriptMgr::UpdateScriptCaches(ScriptKey sha, ScriptParams params) const {
});
}
bool ScriptMgr::AreGlobalByDefault() const {
return default_params_.undeclared_keys && default_params_.atomic;
}
GenericError ScriptMgr::ScriptParams::ApplyFlags(string_view config, ScriptParams* params) {
auto parts = absl::StrSplit(config, absl::ByAnyChar(",; "), absl::SkipEmpty());
for (auto flag : parts) {

View file

@ -54,6 +54,9 @@ class ScriptMgr {
// Returns a list of all scripts in the database with their sha and body.
std::vector<std::pair<std::string, ScriptData>> GetAll() const;
// Returns if scripts run as global transactions by default
bool AreGlobalByDefault() const;
private:
void ExistsCmd(CmdArgList args, ConnectionContext* cntx) const;
void LoadCmd(CmdArgList args, ConnectionContext* cntx);

View file

@ -222,3 +222,20 @@ async def test_eval_error_propagation(async_client):
except aioredis.RedisError as e:
if not does_abort:
assert False, "Error should have been ignored: " + cmd
@dfly_args({"proactor_threads": 1, "default_lua_flags": "allow-undeclared-keys"})
async def test_global_eval_in_multi(async_client: aioredis.Redis):
GLOBAL_SCRIPT = """
return redis.call('GET', 'any-key');
"""
await async_client.set('any-key', 'works')
pipe = async_client.pipeline(transaction=True)
pipe.set('another-key', 'ok')
pipe.eval(GLOBAL_SCRIPT, 0)
res = await pipe.execute()
print(res)
assert res[1] == 'works'