mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat: ignore MULTI/EXEC if the transaction consists of EVAL commands (#1157)
feat: ignore MULTI/EXEC if the transaction consists of EVAL commands. Together with `default_lua_config` solves #781. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
688f8f51a3
commit
b09a36d553
7 changed files with 101 additions and 32 deletions
|
@ -138,9 +138,10 @@ struct Connection::DispatchOperations {
|
|||
void Connection::PipelineMessage::SetArgs(const RespVec& args) {
|
||||
auto* next = storage.data();
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
auto buf = args[i].GetBuf();
|
||||
RespExpr::Buffer buf = args[i].GetBuf();
|
||||
size_t s = buf.size();
|
||||
memcpy(next, buf.data(), s);
|
||||
if (s)
|
||||
memcpy(next, buf.data(), s);
|
||||
this->args[i] = MutableSlice(next, s);
|
||||
next += s;
|
||||
}
|
||||
|
|
|
@ -39,6 +39,13 @@ enum CommandOpt : uint32_t {
|
|||
|
||||
const char* OptName(CommandOpt fl);
|
||||
|
||||
constexpr inline bool IsEvalKind(std::string_view name) {
|
||||
return name.compare(0, 4, "EVAL") == 0;
|
||||
}
|
||||
|
||||
static_assert(IsEvalKind("EVAL") && IsEvalKind("EVALSHA"));
|
||||
static_assert(!IsEvalKind(""));
|
||||
|
||||
}; // namespace CO
|
||||
|
||||
class CommandId {
|
||||
|
|
|
@ -41,7 +41,8 @@ StoredCmd::StoredCmd(string&& buffer, const CommandId* cid, CmdArgList args, fac
|
|||
}
|
||||
|
||||
void StoredCmd::Fill(CmdArgList args) {
|
||||
CHECK_GE(args.size(), sizes_.size());
|
||||
DCHECK_GE(args.size(), sizes_.size());
|
||||
|
||||
unsigned offset = 0;
|
||||
for (unsigned i = 0; i < sizes_.size(); i++) {
|
||||
args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]};
|
||||
|
|
|
@ -37,6 +37,11 @@ class StoredCmd {
|
|||
// Between filling and invocation, cmd should NOT be moved.
|
||||
void Fill(CmdArgList args);
|
||||
|
||||
void Fill(CmdArgVec* dest) {
|
||||
dest->resize(sizes_.size());
|
||||
Fill(absl::MakeSpan(*dest));
|
||||
}
|
||||
|
||||
const CommandId* Cid() const;
|
||||
|
||||
facade::ReplyMode ReplyMode() const;
|
||||
|
|
|
@ -486,6 +486,29 @@ void TxTable(const http::QueryArgs& args, HttpContext* send) {
|
|||
send->Invoke(std::move(resp));
|
||||
}
|
||||
|
||||
enum class ExecEvalState {
|
||||
NONE = 0,
|
||||
ALL = 1,
|
||||
SOME = 2,
|
||||
};
|
||||
|
||||
ExecEvalState DetermineEvalPresense(const std::vector<StoredCmd>& body) {
|
||||
unsigned eval_cnt = 0;
|
||||
for (const auto& scmd : body) {
|
||||
if (CO::IsEvalKind(scmd.Cid()->name())) {
|
||||
eval_cnt++;
|
||||
}
|
||||
}
|
||||
|
||||
if (eval_cnt == 0)
|
||||
return ExecEvalState::NONE;
|
||||
|
||||
if (eval_cnt == body.size())
|
||||
return ExecEvalState::ALL;
|
||||
|
||||
return ExecEvalState::SOME;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
Service::Service(ProactorPool* pp) : pp_(*pp), server_family_(this) {
|
||||
|
@ -582,7 +605,9 @@ OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const C
|
|||
|
||||
const auto& key_index = *key_index_res;
|
||||
for (unsigned i = key_index.start; i < key_index.end; ++i) {
|
||||
if (!eval_info.keys.contains(ArgS(args, i))) {
|
||||
string_view key = ArgS(args, i);
|
||||
if (!eval_info.keys.contains(key)) {
|
||||
VLOG(1) << "Key " << key << " is not declared for command " << cid->name();
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
}
|
||||
|
@ -727,15 +752,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
etl.connection_stats.cmd_count_map[cid->name()]++;
|
||||
|
||||
if (dfly_cntx->conn_state.exec_info.IsActive() && !is_trans_cmd) {
|
||||
auto cmd_name = ArgS(args, 0);
|
||||
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
||||
auto error =
|
||||
absl::StrCat("'", cmd_name,
|
||||
"' Dragonfly does not allow execution of a server-side Lua script inside "
|
||||
"transaction block");
|
||||
SetMultiExecErrorFlag(dfly_cntx);
|
||||
return (*cntx)->SendError(error);
|
||||
}
|
||||
// TODO: protect against aggregating huge transactions.
|
||||
StoredCmd stored_cmd{cid, args.subspan(1)};
|
||||
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
|
||||
|
@ -1325,8 +1341,7 @@ template <typename F> void IterateAllKeys(ConnectionState::ExecInfo* exec_info,
|
|||
if (!scmd.Cid()->IsTransactional())
|
||||
continue;
|
||||
|
||||
arg_vec.resize(scmd.NumArgs());
|
||||
scmd.Fill(absl::MakeSpan(arg_vec));
|
||||
scmd.Fill(&arg_vec);
|
||||
|
||||
auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec));
|
||||
if (!key_res.ok())
|
||||
|
@ -1412,7 +1427,45 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
return rb->SendNull();
|
||||
}
|
||||
|
||||
CmdArgVec tmp_keys;
|
||||
ExecEvalState state = DetermineEvalPresense(exec_info.body);
|
||||
if (state == ExecEvalState::SOME) {
|
||||
auto error =
|
||||
"Dragonfly does not allow execution of a server-side Lua script inside "
|
||||
"MULTI/EXEC block";
|
||||
|
||||
return rb->SendError(error);
|
||||
}
|
||||
|
||||
CmdArgVec arg_vec, tmp_keys;
|
||||
|
||||
// We ignore transaction mode in case it's filled only with EVAL-like commands.
|
||||
// This is done to support OptimalBits/bull js framework
|
||||
// that for some reason uses MULTI to send multiple jobs via EVAL(SHA) commands,
|
||||
// instead of using pipeline mode.
|
||||
// TODO: to check with BullMQ developers if this is a bug or a feature.
|
||||
if (state == ExecEvalState::ALL) {
|
||||
string cmd_name;
|
||||
auto body = move(exec_info.body);
|
||||
exec_info.Clear();
|
||||
Transaction* trans = cntx->transaction;
|
||||
cntx->transaction = nullptr;
|
||||
|
||||
rb->StartArray(body.size());
|
||||
for (auto& scmd : body) {
|
||||
arg_vec.resize(scmd.NumArgs() + 1);
|
||||
// We need to copy command name to the first argument.
|
||||
cmd_name = scmd.Cid()->name();
|
||||
arg_vec.front() = MutableSlice{cmd_name.data(), cmd_name.size()};
|
||||
auto args = absl::MakeSpan(arg_vec);
|
||||
scmd.Fill(args.subspan(1));
|
||||
|
||||
DispatchCommand(args, cntx);
|
||||
}
|
||||
cntx->transaction = trans;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
bool scheduled = StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys);
|
||||
|
||||
// EXEC should not run if any of the watched keys expired.
|
||||
|
@ -1428,15 +1481,16 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (absl::GetFlag(FLAGS_multi_exec_squash)) {
|
||||
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx);
|
||||
} else {
|
||||
CmdArgVec arg_vec{};
|
||||
|
||||
for (auto& scmd : exec_info.body) {
|
||||
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
|
||||
|
||||
cntx->transaction->MultiSwitchCmd(scmd.Cid());
|
||||
cntx->cid = scmd.Cid();
|
||||
|
||||
arg_vec.resize(scmd.NumArgs());
|
||||
scmd.Fill(&arg_vec);
|
||||
|
||||
CmdArgList args = absl::MakeSpan(arg_vec);
|
||||
scmd.Fill(args);
|
||||
|
||||
if (scmd.Cid()->IsTransactional()) {
|
||||
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, args);
|
||||
|
|
|
@ -47,9 +47,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
|
|||
(cmd->Cid()->opt_mask() & CO::GLOBAL_TRANS))
|
||||
return SquashResult::NOT_SQUASHED;
|
||||
|
||||
tmp_keylist_.resize(cmd->NumArgs());
|
||||
cmd->Fill(&tmp_keylist_);
|
||||
auto args = absl::MakeSpan(tmp_keylist_);
|
||||
cmd->Fill(args);
|
||||
|
||||
auto keys = DetermineKeys(cmd->Cid(), args);
|
||||
if (!keys.ok())
|
||||
|
@ -94,9 +93,8 @@ void MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
|
|||
tx->MultiSwitchCmd(cmd->Cid());
|
||||
cntx_->cid = cmd->Cid();
|
||||
|
||||
tmp_keylist_.resize(cmd->NumArgs());
|
||||
cmd->Fill(&tmp_keylist_);
|
||||
auto args = absl::MakeSpan(tmp_keylist_);
|
||||
cmd->Fill(args);
|
||||
|
||||
if (cmd->Cid()->IsTransactional())
|
||||
tx->InitByArgs(cntx_->conn_state.db_index, args);
|
||||
|
|
|
@ -77,9 +77,11 @@ TEST_F(MultiTest, MultiAndEval) {
|
|||
|
||||
resp = Run({"get", kKey4});
|
||||
ASSERT_EQ(resp, "QUEUED");
|
||||
EXPECT_THAT(Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"}),
|
||||
ErrArg("'EVAL' Dragonfly does not allow execution of a server-side Lua script inside "
|
||||
"transaction block"));
|
||||
resp = Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"});
|
||||
ASSERT_EQ(resp, "QUEUED");
|
||||
|
||||
resp = Run({"exec"});
|
||||
ASSERT_THAT(resp, ErrArg("ERR Dragonfly does not allow execution of a server-side Lua"));
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, MultiAndFlush) {
|
||||
|
@ -240,13 +242,14 @@ TEST_F(MultiTest, MultiConsistent) {
|
|||
ASSERT_FALSE(service_->IsShardSetLocked());
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, MultiWeirdCommands) {
|
||||
// FIXME: issue https://github.com/dragonflydb/dragonfly/issues/457
|
||||
// once we would have fix for supporting EVAL from within transaction
|
||||
TEST_F(MultiTest, MultiAllEval) {
|
||||
Run({"multi"});
|
||||
EXPECT_THAT(Run({"eval", "return 42", "0"}),
|
||||
ErrArg("'EVAL' Dragonfly does not allow execution of a server-side Lua script inside "
|
||||
"transaction block"));
|
||||
EXPECT_EQ(Run({"eval", "return 42", "0"}), "QUEUED");
|
||||
EXPECT_EQ(Run({"eval", "return 77", "0"}), "QUEUED");
|
||||
auto resp = Run({"exec"});
|
||||
ASSERT_THAT(resp, ArrLen(2));
|
||||
EXPECT_THAT(resp.GetVec()[0], IntArg(42));
|
||||
EXPECT_THAT(resp.GetVec()[1], IntArg(77));
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, MultiRename) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue