mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: fix eval in multi interpreter borrow (#1999)
* fix: fix eval in multi interpreter borrow --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
e84d9a65d8
commit
60ec6a3a27
2 changed files with 45 additions and 14 deletions
|
@ -19,6 +19,7 @@ namespace dfly {
|
|||
class EngineShardSet;
|
||||
class ConnectionContext;
|
||||
class ChannelStore;
|
||||
class Interpreter;
|
||||
struct FlowInfo;
|
||||
|
||||
// Stores command id and arguments for delayed invocation.
|
||||
|
@ -81,12 +82,16 @@ struct ConnectionState {
|
|||
|
||||
ExecState state = EXEC_INACTIVE;
|
||||
std::vector<StoredCmd> body;
|
||||
// List of keys registered with WATCH
|
||||
std::vector<std::pair<DbIndex, std::string>> watched_keys;
|
||||
// Set if a watched key was changed before EXEC
|
||||
std::atomic_bool watched_dirty = false;
|
||||
// Number of times watch was called on an existing key.
|
||||
uint32_t watched_existed = 0;
|
||||
|
||||
std::vector<std::pair<DbIndex, std::string>> watched_keys; // List of keys registered by WATCH
|
||||
std::atomic_bool watched_dirty = false; // Set if a watched key was changed before EXEC
|
||||
uint32_t watched_existed = 0; // Number of times watch was called on an existing key
|
||||
|
||||
// If the transaction contains EVAL calls, preborrow an interpreter that will be used for all of
|
||||
// them. This has to be done to avoid potentially blocking when borrowing interpreters amid
|
||||
// executing the multi transaction, which can create deadlocks by blocking other transactions
|
||||
// that already borrowed all available interpreters but wait for keys to be unlocked.
|
||||
Interpreter* preborrowed_interpreter = nullptr;
|
||||
};
|
||||
|
||||
// Lua-script related data.
|
||||
|
|
|
@ -605,6 +605,32 @@ optional<Transaction::MultiMode> DeduceExecMode(ExecEvalState state,
|
|||
return multi_mode;
|
||||
}
|
||||
|
||||
// Either take the interpreter from the preborrowed multi exec transaction or borrow one.
|
||||
struct BorrowedInterpreter {
|
||||
explicit BorrowedInterpreter(ConnectionContext* cntx) {
|
||||
if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) {
|
||||
DCHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING);
|
||||
interpreter_ = borrowed;
|
||||
} else {
|
||||
interpreter_ = ServerState::tlocal()->BorrowInterpreter();
|
||||
owned_ = true;
|
||||
}
|
||||
}
|
||||
|
||||
~BorrowedInterpreter() {
|
||||
if (owned_)
|
||||
ServerState::tlocal()->ReturnInterpreter(interpreter_);
|
||||
}
|
||||
|
||||
operator Interpreter*() {
|
||||
return interpreter_;
|
||||
}
|
||||
|
||||
private:
|
||||
Interpreter* interpreter_ = nullptr;
|
||||
bool owned_ = false;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
Service::Service(ProactorPool* pp)
|
||||
|
@ -1396,10 +1422,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendNull();
|
||||
}
|
||||
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
BorrowedInterpreter interpreter{cntx};
|
||||
auto res = server_family_.script_mgr()->Insert(body, interpreter);
|
||||
if (!res)
|
||||
return (*cntx)->SendError(res.error().Format(), facade::kScriptErrType);
|
||||
|
@ -1413,10 +1436,7 @@ void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) {
|
|||
ToLower(&args[0]);
|
||||
string_view sha = ArgS(args, 0);
|
||||
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
BorrowedInterpreter interpreter{cntx};
|
||||
CallSHA(args, sha, interpreter, cntx);
|
||||
}
|
||||
|
||||
|
@ -1821,6 +1841,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
SinkReplyBuilder::ReplyAggregator agg(rb);
|
||||
rb->StartArray(exec_info.body.size());
|
||||
|
||||
if (state != ExecEvalState::NONE)
|
||||
exec_info.preborrowed_interpreter = ServerState::tlocal()->BorrowInterpreter();
|
||||
|
||||
if (!exec_info.body.empty()) {
|
||||
if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) {
|
||||
MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this);
|
||||
|
@ -1851,6 +1874,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
if (exec_info.preborrowed_interpreter)
|
||||
ServerState::tlocal()->ReturnInterpreter(exec_info.preborrowed_interpreter);
|
||||
|
||||
if (scheduled) {
|
||||
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
|
||||
cntx->transaction->UnlockMulti();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue