fix(server): handle running script load inside multi (#4074)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-11-10 09:34:40 +02:00 committed by GitHub
parent 75c961e7ed
commit 2d49a28c15
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 123 additions and 78 deletions

View file

@ -18,6 +18,8 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/compact_object.h"
#include "core/interpreter.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
@ -453,4 +455,29 @@ void ThreadLocalMutex::unlock() {
}
}
BorrowedInterpreter::BorrowedInterpreter(Transaction* tx, ConnectionState* state) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
CHECK(!state->squashing_info);
if (auto borrowed = state->exec_info.preborrowed_interpreter; borrowed) {
// Ensure a preborrowed interpreter is only set for an already running MULTI transaction.
CHECK_EQ(state->exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);
interpreter_ = borrowed;
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
CHECK(!tx->IsScheduled());
interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
}
}
BorrowedInterpreter::~BorrowedInterpreter() {
if (owned_)
ServerState::tlocal()->ReturnInterpreter(interpreter_);
}
} // namespace dfly

View file

@ -47,6 +47,8 @@ using RdbTypeFreqMap = absl::flat_hash_map<unsigned, size_t>;
class CommandId;
class Transaction;
class EngineShard;
class ConnectionState;
class Interpreter;
struct LockTagOptions {
bool enabled = false;
@ -353,6 +355,29 @@ template <typename Mutex> class ABSL_SCOPED_LOCKABLE SharedLock {
bool is_locked_;
};
// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
BorrowedInterpreter(Transaction* tx, ConnectionState* state);
~BorrowedInterpreter();
// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
DCHECK(owned_);
owned_ = false;
return interpreter_;
}
operator Interpreter*() {
return interpreter_;
}
private:
Interpreter* interpreter_ = nullptr;
bool owned_ = false;
};
extern size_t serialization_max_chunk_size;
} // namespace dfly

View file

@ -72,6 +72,13 @@ size_t StoredCmd::NumArgs() const {
return sizes_.size();
}
std::string StoredCmd::FirstArg() const {
if (sizes_.size() == 0) {
return {};
}
return buffer_.substr(0, sizes_[0]);
}
facade::ReplyMode StoredCmd::ReplyMode() const {
return reply_mode_;
}

View file

@ -46,6 +46,8 @@ class StoredCmd {
Fill(absl::MakeSpan(*dest));
}
std::string FirstArg() const;
const CommandId* Cid() const;
facade::ReplyMode ReplyMode() const;

View file

@ -684,32 +684,33 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
send->Invoke(std::move(resp));
}
enum class ExecEvalState {
enum class ExecScriptUse {
NONE = 0,
ALL = 1,
SOME = 2,
SCRIPT_LOAD = 1,
SCRIPT_RUN = 2,
};
ExecEvalState DetermineEvalPresense(const std::vector<StoredCmd>& body) {
unsigned eval_cnt = 0;
ExecScriptUse DetermineScriptPresense(const std::vector<StoredCmd>& body) {
bool script_load = false;
for (const auto& scmd : body) {
if (CO::IsEvalKind(scmd.Cid()->name())) {
eval_cnt++;
return ExecScriptUse::SCRIPT_RUN;
}
if ((scmd.Cid()->name() == "SCRIPT") && (absl::AsciiStrToUpper(scmd.FirstArg()) == "LOAD")) {
script_load = true;
}
}
if (eval_cnt == 0)
return ExecEvalState::NONE;
if (script_load)
return ExecScriptUse::SCRIPT_LOAD;
if (eval_cnt == body.size())
return ExecEvalState::ALL;
return ExecEvalState::SOME;
return ExecScriptUse::NONE;
}
// Returns the multi mode for that transaction. Returns NOT_DETERMINED if no scheduling
// is required.
Transaction::MultiMode DeduceExecMode(ExecEvalState state,
Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
const ConnectionState::ExecInfo& exec_info,
const ScriptMgr& script_mgr) {
// Check if script most LIKELY has global eval transactions
@ -717,7 +718,7 @@ Transaction::MultiMode DeduceExecMode(ExecEvalState state,
Transaction::MultiMode multi_mode =
static_cast<Transaction::MultiMode>(absl::GetFlag(FLAGS_multi_exec_mode));
if (state != ExecEvalState::NONE) {
if (state == ExecScriptUse::SCRIPT_RUN) {
contains_global = script_mgr.AreGlobalByDefault();
}
@ -765,50 +766,6 @@ string CreateExecDescriptor(const std::vector<StoredCmd>& stored_cmds, unsigned
return result;
}
// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
BorrowedInterpreter(Transaction* tx, ConnectionContext* cntx) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
CHECK(!cntx->conn_state.squashing_info);
if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) {
// Ensure a preborrowed interpreter is only set for an already running MULTI transaction.
CHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);
interpreter_ = borrowed;
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
CHECK(!tx->IsScheduled());
interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
}
}
~BorrowedInterpreter() {
if (owned_)
ServerState::tlocal()->ReturnInterpreter(interpreter_);
}
// Give up ownership of the interpreter, it must be returned manually.
Interpreter* Release() && {
DCHECK(owned_);
owned_ = false;
return interpreter_;
}
operator Interpreter*() {
return interpreter_;
}
private:
Interpreter* interpreter_ = nullptr;
bool owned_ = false;
};
string ConnectionLogContext(const facade::Connection* conn) {
if (conn == nullptr) {
return "(null-conn)";
@ -1873,7 +1830,7 @@ void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
return rb->SendNull();
}
BorrowedInterpreter interpreter{tx, cntx};
BorrowedInterpreter interpreter{tx, &cntx->conn_state};
auto res = server_family_.script_mgr()->Insert(body, interpreter);
if (!res)
return builder->SendError(res.error().Format(), facade::kScriptErrType);
@ -1887,7 +1844,7 @@ void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
ConnectionContext* cntx) {
string sha = absl::AsciiStrToLower(ArgS(args, 0));
BorrowedInterpreter interpreter{cntx->transaction, cntx};
BorrowedInterpreter interpreter{cntx->transaction, &cntx->conn_state};
CallSHA(args, sha, interpreter, builder, cntx);
}
@ -2254,12 +2211,13 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
cntx->last_command_debug.exec_body_len = exec_info.body.size();
// The transaction can contain scripts, determine their presence ahead to customize logic below.
ExecEvalState state = DetermineEvalPresense(exec_info.body);
// The transaction can contain script load script execution, determine their presence ahead to
// customize logic below.
ExecScriptUse state = DetermineScriptPresense(exec_info.body);
// We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup
if (state != ExecEvalState::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, cntx).Release();
// We borrow a single interpreter for all the EVALs/Script load inside. Returned by MultiCleanup
if (state != ExecScriptUse::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, &cntx->conn_state).Release();
}
// Determine according multi mode, not only only flag, but based on presence of global commands
@ -2293,7 +2251,7 @@ void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ServerState::tlocal()->exec_freq_count[descr]++;
}
if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE &&
if (absl::GetFlag(FLAGS_multi_exec_squash) && state != ExecScriptUse::SCRIPT_RUN &&
!cntx->conn_state.tracking_info_.IsTrackingOn()) {
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), rb, cntx, this);
} else {

View file

@ -1147,6 +1147,29 @@ TEST_F(MultiEvalTest, MultiAndEval) {
Run({"eval", "return 'OK';", "0"});
auto resp = Run({"exec"});
EXPECT_EQ(resp, "OK");
// We had a bug running script load inside multi
Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"exec"});
Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"get", "x"});
Run({"exec"});
Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"mset", "x1", "y1", "x2", "y2"});
Run({"exec"});
Run({"multi"});
Run({"script", "load", "return '5'"});
Run({"eval", "return redis.call('set', 'x', 'y')", "1", "x"});
Run({"get", "x"});
Run({"exec"});
Run({"get", "x"});
}
TEST_F(MultiTest, MultiTypes) {

View file

@ -67,7 +67,8 @@ ScriptMgr::ScriptKey::ScriptKey(string_view sha) : array{} {
memcpy(data(), sha.data(), size());
}
void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string subcmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (subcmd == "HELP") {
@ -110,7 +111,7 @@ void ScriptMgr::Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder)
return LatencyCmd(tx, builder);
if (subcmd == "LOAD" && args.size() == 2)
return LoadCmd(args, tx, builder);
return LoadCmd(args, tx, builder, cntx);
if (subcmd == "FLAGS" && args.size() > 2)
return ConfigCmd(args, tx, builder);
@ -144,7 +145,8 @@ void ScriptMgr::FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return builder->SendOk();
}
void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view body = ArgS(args, 1);
auto rb = static_cast<RedisReplyBuilder*>(builder);
if (body.empty()) {
@ -153,9 +155,7 @@ void ScriptMgr::LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return rb->SendBulkString(sha);
}
ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
BorrowedInterpreter interpreter{tx, &cntx->conn_state};
auto res = Insert(body, interpreter);
if (!res)

View file

@ -48,7 +48,7 @@ class ScriptMgr {
ScriptMgr();
void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void Run(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
// Insert script and return sha. Get possible error from compilation or parsing script flags.
io::Result<std::string, GenericError> Insert(std::string_view body, Interpreter* interpreter);
@ -69,7 +69,8 @@ class ScriptMgr {
private:
void ExistsCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) const;
void FlushCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void LoadCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void ConfigCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void ListCmd(Transaction* tx, SinkReplyBuilder* builder) const;
void LatencyCmd(Transaction* tx, SinkReplyBuilder* builder) const;

View file

@ -3008,7 +3008,7 @@ void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
script_mgr_->Run(std::move(args), tx, builder);
script_mgr_->Run(std::move(args), tx, builder, cntx);
}
void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,

View file

@ -399,10 +399,12 @@ OpStatus Transaction::InitByArgs(Namespace* ns, DbIndex index, CmdArgList args)
}
if ((cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) > 0) {
if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)
if (((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)) {
EnableAllShards();
else
} else {
EnableShard(0);
}
return OpStatus::OK;
}
@ -976,7 +978,7 @@ string Transaction::DEBUG_PrintFailState(ShardId sid) const {
void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
shard_data_.resize(1);
shard_data_.resize(IsActiveMulti() ? shard_set->size() : 1);
shard_data_.front().local_mask |= ACTIVE;
}