mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Support script configuration (#889)
Store script parameters for each script that allow configuring it's transactions multi mode. They can be configured either for a specific scripts with `SCRIPT CONIFG <sha> [params...]` or changed globally as defaults with `default_lua_config`. The current supported options are `allow-undeclared-keys` and `disable-atomicity`. Based on those flags, we determine the correct multi mode. `disable-atomicity` allow running in non-atomic mode, whereas being atomic and enabling `allow-undeclared-keys` requires the global mode.
This commit is contained in:
parent
eb5fd2867f
commit
edbd43a3b3
9 changed files with 348 additions and 185 deletions
|
@ -427,7 +427,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) {
|
|||
RdbSaver* saver = flow->saver.get();
|
||||
|
||||
if (saver->Mode() == SaveMode::SUMMARY) {
|
||||
auto scripts = sf_->script_mgr()->GetLuaScripts();
|
||||
auto scripts = sf_->script_mgr()->GetAll();
|
||||
StringVec script_bodies;
|
||||
for (auto& script : scripts) {
|
||||
script_bodies.push_back(move(script.second));
|
||||
|
|
|
@ -46,7 +46,7 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");
|
|||
ABSL_FLAG(int, multi_exec_mode, 1,
|
||||
"Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
|
||||
"incrementally, 4 for non atomic");
|
||||
ABSL_FLAG(int, multi_eval_mode, 2,
|
||||
ABSL_FLAG(int, multi_eval_mode, 1,
|
||||
"Set EVAL atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
|
||||
"incrementally, 4 for non atomic");
|
||||
|
||||
|
@ -987,17 +987,17 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto script = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, script]() { ss->ReturnInterpreter(script); };
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
string result;
|
||||
Interpreter::AddResult add_result = script->AddFunction(body, &result);
|
||||
Interpreter::AddResult add_result = interpreter->AddFunction(body, &result);
|
||||
if (add_result == Interpreter::COMPILE_ERR) {
|
||||
return (*cntx)->SendError(result, facade::kScriptErrType);
|
||||
}
|
||||
|
||||
if (add_result == Interpreter::ADD_OK) {
|
||||
server_family_.script_mgr()->InsertFunction(result, body);
|
||||
server_family_.script_mgr()->Insert(result, body);
|
||||
}
|
||||
|
||||
EvalArgs eval_args;
|
||||
|
@ -1006,7 +1006,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
eval_args.args = args.subspan(3 + num_keys);
|
||||
|
||||
uint64_t start = absl::GetCurrentTimeNanos();
|
||||
EvalInternal(eval_args, script, cntx);
|
||||
EvalInternal(eval_args, interpreter, cntx);
|
||||
|
||||
uint64_t end = absl::GetCurrentTimeNanos();
|
||||
ss->RecordCallLatency(result, (end - start) / 1000);
|
||||
|
@ -1022,21 +1022,8 @@ void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
string_view sha = ArgS(args, 1);
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto script = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, script]() { ss->ReturnInterpreter(script); };
|
||||
|
||||
bool exists = script->Exists(sha);
|
||||
|
||||
if (!exists) {
|
||||
const char* body = (sha.size() == 40) ? server_family_.script_mgr()->Find(sha) : nullptr;
|
||||
if (!body) {
|
||||
return (*cntx)->SendError(facade::kScriptNotFound);
|
||||
}
|
||||
|
||||
string res;
|
||||
CHECK_EQ(Interpreter::ADD_OK, script->AddFunction(body, &res));
|
||||
CHECK_EQ(res, sha);
|
||||
}
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
EvalArgs ev_args;
|
||||
ev_args.sha = sha;
|
||||
|
@ -1044,7 +1031,7 @@ void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) {
|
|||
ev_args.args = args.subspan(3 + num_keys);
|
||||
|
||||
uint64_t start = absl::GetCurrentTimeNanos();
|
||||
EvalInternal(ev_args, script, cntx);
|
||||
EvalInternal(ev_args, interpreter, cntx);
|
||||
|
||||
uint64_t end = absl::GetCurrentTimeNanos();
|
||||
ss->RecordCallLatency(sha, (end - start) / 1000);
|
||||
|
@ -1057,6 +1044,67 @@ vector<bool> DetermineKeyShards(CmdArgList keys) {
|
|||
return out;
|
||||
}
|
||||
|
||||
optional<ScriptMgr::ScriptParams> LoadScipt(string_view sha, ScriptMgr* script_mgr,
|
||||
Interpreter* interpreter) {
|
||||
auto ss = ServerState::tlocal();
|
||||
|
||||
if (!interpreter->Exists(sha)) {
|
||||
auto script_data = script_mgr->Find(sha);
|
||||
if (!script_data)
|
||||
return std::nullopt;
|
||||
|
||||
string res;
|
||||
CHECK_EQ(Interpreter::ADD_OK, interpreter->AddFunction(script_data->body, &res));
|
||||
CHECK_EQ(res, sha);
|
||||
|
||||
return script_data;
|
||||
}
|
||||
|
||||
auto params = ss->GetScriptParams(sha);
|
||||
CHECK(params); // We update all caches from script manager
|
||||
return params;
|
||||
}
|
||||
|
||||
// Determine multi mode based on script params.
|
||||
Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) {
|
||||
if (params.atomic && params.undeclared_keys)
|
||||
return Transaction::GLOBAL;
|
||||
else if (params.atomic)
|
||||
return Transaction::LOCK_AHEAD;
|
||||
else
|
||||
return Transaction::NON_ATOMIC;
|
||||
}
|
||||
|
||||
// Start multi transaction for eval. Returns true if transaction was scheduled.
|
||||
// Skips scheduling if multi mode requies declaring keys, but no keys were declared.
|
||||
bool StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params,
|
||||
Transaction* trans) {
|
||||
Transaction::MultiMode multi_mode = DetermineMultiMode(params);
|
||||
|
||||
if (keys.empty() &&
|
||||
(multi_mode == Transaction::LOCK_AHEAD || multi_mode == Transaction::LOCK_INCREMENTAL))
|
||||
return false;
|
||||
|
||||
switch (multi_mode) {
|
||||
case Transaction::GLOBAL:
|
||||
trans->StartMultiGlobal(dbid);
|
||||
return true;
|
||||
case Transaction::LOCK_AHEAD:
|
||||
trans->StartMultiLockedAhead(dbid, keys);
|
||||
return true;
|
||||
case Transaction::LOCK_INCREMENTAL:
|
||||
trans->StartMultiLockedIncr(dbid, DetermineKeyShards(keys));
|
||||
return true;
|
||||
case Transaction::NON_ATOMIC:
|
||||
trans->StartMultiNonAtomic();
|
||||
return true;
|
||||
default:
|
||||
CHECK(false) << "Invalid mode";
|
||||
};
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
||||
ConnectionContext* cntx) {
|
||||
DCHECK(!eval_args.sha.empty());
|
||||
|
@ -1066,18 +1114,9 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
|||
return (*cntx)->SendError(facade::kScriptNotFound);
|
||||
}
|
||||
|
||||
bool exists = interpreter->Exists(eval_args.sha);
|
||||
|
||||
if (!exists) {
|
||||
const char* body = server_family_.script_mgr()->Find(eval_args.sha);
|
||||
if (!body) {
|
||||
return (*cntx)->SendError(facade::kScriptNotFound);
|
||||
}
|
||||
|
||||
string res;
|
||||
CHECK_EQ(Interpreter::ADD_OK, interpreter->AddFunction(body, &res));
|
||||
CHECK_EQ(res, eval_args.sha);
|
||||
}
|
||||
auto params = LoadScipt(eval_args.sha, server_family_.script_mgr(), interpreter);
|
||||
if (!params)
|
||||
return (*cntx)->SendError(facade::kScriptNotFound);
|
||||
|
||||
string error;
|
||||
|
||||
|
@ -1092,24 +1131,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
|||
}
|
||||
DCHECK(cntx->transaction);
|
||||
|
||||
bool scheduled = false;
|
||||
int multi_mode = absl::GetFlag(FLAGS_multi_eval_mode);
|
||||
DCHECK(multi_mode >= Transaction::GLOBAL && multi_mode <= Transaction::NON_ATOMIC);
|
||||
|
||||
if (multi_mode == Transaction::GLOBAL) {
|
||||
scheduled = true;
|
||||
cntx->transaction->StartMultiGlobal(cntx->db_index());
|
||||
} else if (multi_mode == Transaction::LOCK_INCREMENTAL && !eval_args.keys.empty()) {
|
||||
scheduled = true;
|
||||
vector<bool> shards = DetermineKeyShards(eval_args.keys);
|
||||
cntx->transaction->StartMultiLockedIncr(cntx->db_index(), shards);
|
||||
} else if (multi_mode == Transaction::LOCK_AHEAD && !eval_args.keys.empty()) {
|
||||
scheduled = true;
|
||||
cntx->transaction->StartMultiLockedAhead(cntx->db_index(), eval_args.keys);
|
||||
} else if (multi_mode == Transaction::NON_ATOMIC) {
|
||||
scheduled = true;
|
||||
cntx->transaction->StartMultiNonAtomic();
|
||||
};
|
||||
bool scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx->transaction);
|
||||
|
||||
interpreter->SetGlobalArray("KEYS", eval_args.keys);
|
||||
interpreter->SetGlobalArray("ARGV", eval_args.args);
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
#include "base/flags.h"
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/interpreter.h"
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/main_service.h"
|
||||
|
@ -15,7 +16,7 @@
|
|||
#include "server/transaction.h"
|
||||
|
||||
ABSL_DECLARE_FLAG(int, multi_exec_mode);
|
||||
ABSL_DECLARE_FLAG(int, multi_eval_mode);
|
||||
ABSL_DECLARE_FLAG(std::string, default_lua_config);
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -332,9 +333,10 @@ TEST_F(MultiTest, FlushDb) {
|
|||
}
|
||||
|
||||
TEST_F(MultiTest, Eval) {
|
||||
int multi_mode = absl::GetFlag(FLAGS_multi_eval_mode);
|
||||
if (multi_mode == Transaction::GLOBAL || multi_mode == Transaction::NON_ATOMIC)
|
||||
absl::SetFlag(&FLAGS_multi_eval_mode, Transaction::LOCK_AHEAD);
|
||||
if (auto config = absl::GetFlag(FLAGS_default_lua_config); config != "") {
|
||||
LOG(WARNING) << "Skipped Eval test because default_lua_config is set";
|
||||
return;
|
||||
}
|
||||
|
||||
RespExpr resp;
|
||||
|
||||
|
@ -379,8 +381,6 @@ TEST_F(MultiTest, Eval) {
|
|||
ASSERT_THAT(resp, ArrLen(3));
|
||||
const auto& arr = resp.GetVec();
|
||||
EXPECT_THAT(arr, ElementsAre("a", "b", "c"));
|
||||
|
||||
absl::SetFlag(&FLAGS_multi_eval_mode, multi_mode);
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, Watch) {
|
||||
|
@ -503,6 +503,11 @@ TEST_F(MultiTest, MultiOOO) {
|
|||
|
||||
// Lua scripts lock their keys ahead and thus can run out of order.
|
||||
TEST_F(MultiTest, EvalOOO) {
|
||||
if (auto config = absl::GetFlag(FLAGS_default_lua_config); config != "") {
|
||||
LOG(WARNING) << "Skipped Eval test because default_lua_config is set";
|
||||
return;
|
||||
}
|
||||
|
||||
const char* kScript = "redis.call('MGET', unpack(KEYS)); return 'OK'";
|
||||
|
||||
// Check single call.
|
||||
|
@ -527,11 +532,7 @@ TEST_F(MultiTest, EvalOOO) {
|
|||
}
|
||||
|
||||
auto metrics = GetMetrics();
|
||||
int mode = absl::GetFlag(FLAGS_multi_eval_mode);
|
||||
if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC)
|
||||
EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt);
|
||||
else
|
||||
EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt);
|
||||
EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt);
|
||||
}
|
||||
|
||||
// Run MULTI/EXEC commands in parallel, where each command is:
|
||||
|
@ -593,23 +594,7 @@ TEST_F(MultiTest, MultiCauseUnblocking) {
|
|||
f2.Join();
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, EvalUndeclared) {
|
||||
int start_mode = absl::GetFlag(FLAGS_multi_eval_mode);
|
||||
int allowed_modes[] = {Transaction::GLOBAL, Transaction::NON_ATOMIC};
|
||||
|
||||
Run({"set", "undeclared-k", "works"});
|
||||
const char* kScript = "return redis.call('GET', 'undeclared-k')";
|
||||
|
||||
for (int multi_mode : allowed_modes) {
|
||||
absl::SetFlag(&FLAGS_multi_eval_mode, multi_mode);
|
||||
auto res = Run({"eval", kScript, "0"});
|
||||
EXPECT_EQ(res, "works");
|
||||
}
|
||||
|
||||
absl::SetFlag(&FLAGS_multi_eval_mode, start_mode);
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, GlobalFallback) {
|
||||
TEST_F(MultiTest, ExecGlobalFallback) {
|
||||
// Check global command MOVE falls back to global mode from lock ahead.
|
||||
absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::LOCK_AHEAD);
|
||||
Run({"multi"});
|
||||
|
@ -627,6 +612,41 @@ TEST_F(MultiTest, GlobalFallback) {
|
|||
EXPECT_EQ(1, GetMetrics().ooo_tx_transaction_cnt);
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, ScriptConfigTest) {
|
||||
if (auto config = absl::GetFlag(FLAGS_default_lua_config); config != "") {
|
||||
LOG(WARNING) << "Skipped Eval test because default_lua_config is set";
|
||||
return;
|
||||
}
|
||||
|
||||
const char* kUndeclared1 = "return redis.call('GET', 'random-key-1');";
|
||||
const char* kUndeclared2 = "return redis.call('GET', 'random-key-2');";
|
||||
|
||||
Run({"set", "random-key-1", "works"});
|
||||
Run({"set", "random-key-2", "works"});
|
||||
|
||||
// Check SCRIPT CONFIG is applied correctly to loaded scripts.
|
||||
{
|
||||
auto sha_resp = Run({"script", "load", kUndeclared1});
|
||||
auto sha = facade::ToSV(sha_resp.GetBuf());
|
||||
|
||||
EXPECT_THAT(Run({"evalsha", sha, "0"}), ErrArg("undeclared"));
|
||||
|
||||
EXPECT_THAT(Run({"script", "config", sha, "allow-undeclared-keys"}), "OK");
|
||||
EXPECT_THAT(Run({"evalsha", sha, "0"}), "works");
|
||||
}
|
||||
|
||||
// Check SCRIPT CONFIG can be applied by sha before loading.
|
||||
{
|
||||
char sha_buf[41];
|
||||
Interpreter::FuncSha1(kUndeclared2, sha_buf);
|
||||
string_view sha{sha_buf, 40};
|
||||
|
||||
EXPECT_THAT(Run({"script", "config", sha, "allow-undeclared-keys"}), "OK");
|
||||
|
||||
EXPECT_THAT(Run({"eval", kUndeclared2, "0"}), "works");
|
||||
}
|
||||
}
|
||||
|
||||
// Run multi-exec transactions that move values from a source list
|
||||
// to destination list through two contended channels.
|
||||
TEST_F(MultiTest, ContendedList) {
|
||||
|
|
|
@ -2019,7 +2019,7 @@ error_code RdbLoader::HandleAux() {
|
|||
Interpreter::AddResult add_result = script->AddFunction(body, &result);
|
||||
if (add_result == Interpreter::ADD_OK) {
|
||||
if (script_mgr_)
|
||||
script_mgr_->InsertFunction(result, body);
|
||||
script_mgr_->Insert(result, body);
|
||||
} else if (add_result == Interpreter::COMPILE_ERR) {
|
||||
LOG(ERROR) << "Error when compiling lua scripts";
|
||||
}
|
||||
|
|
|
@ -5,26 +5,55 @@
|
|||
#include "server/script_mgr.h"
|
||||
|
||||
#include <absl/cleanup/cleanup.h>
|
||||
#include <absl/strings/ascii.h>
|
||||
#include <absl/strings/match.h>
|
||||
#include <absl/strings/numbers.h>
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <absl/strings/str_split.h>
|
||||
|
||||
#include <string>
|
||||
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/interpreter.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/server_state.h"
|
||||
|
||||
ABSL_FLAG(std::string, default_lua_config, "",
|
||||
"Configure default mode for running Lua scripts: \n - Use 'allow-undeclared-keys' to "
|
||||
"allow accessing undeclared keys, \n - Use 'disable-atomicity' to allow "
|
||||
"running scripts non-atomically. \nSpecify multiple values "
|
||||
"separated by space, for example 'allow-undeclared-keys disable-atomicity' runs scripts "
|
||||
"non-atomically and allows accessing undeclared keys");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
using namespace facade;
|
||||
|
||||
ScriptMgr::ScriptMgr() {
|
||||
// Build default script config
|
||||
std::string config = absl::GetFlag(FLAGS_default_lua_config);
|
||||
|
||||
static_assert(ScriptParams{}.atomic && !ScriptParams{}.undeclared_keys);
|
||||
|
||||
auto parts = absl::StrSplit(config, absl::ByAnyChar(",; "), absl::SkipEmpty());
|
||||
for (auto pragma : parts) {
|
||||
CHECK(ScriptParams::ApplyPragma(pragma, &default_params_))
|
||||
<< "Bad format of default_lua_config flag";
|
||||
}
|
||||
}
|
||||
|
||||
ScriptMgr::ScriptKey::ScriptKey(string_view sha) : array{} {
|
||||
DCHECK_EQ(sha.size(), size());
|
||||
memcpy(data(), sha.data(), size());
|
||||
}
|
||||
|
||||
void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view subcmd = ArgS(args, 0);
|
||||
|
||||
if (args.size() == 1 && subcmd == "HELP") {
|
||||
if (subcmd == "HELP") {
|
||||
string_view kHelp[] = {
|
||||
"SCRIPT <subcommand> [<arg> [value] [opt] ...]",
|
||||
"Subcommands are:",
|
||||
|
@ -32,6 +61,11 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
" Return information about the existence of the scripts in the script cache.",
|
||||
"LOAD <script>",
|
||||
" Load a script into the scripts cache without executing it.",
|
||||
"CONFIGURE <sha> [options ...]",
|
||||
" The following options are possible: ",
|
||||
" - Use 'allow-undeclared-keys' to allow accessing undeclared keys",
|
||||
" - Use 'disable-atomicity' to allow running scripts non-atomically to improve "
|
||||
"performance",
|
||||
"LIST",
|
||||
" Lists loaded scripts.",
|
||||
"LATENCY",
|
||||
|
@ -41,110 +75,87 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendSimpleStrArr(kHelp, ABSL_ARRAYSIZE(kHelp));
|
||||
}
|
||||
|
||||
if (subcmd == "EXISTS") {
|
||||
vector<uint8_t> res(args.size() - 1, 0);
|
||||
for (size_t i = 1; i < args.size(); ++i) {
|
||||
string_view sha = ArgS(args, i);
|
||||
if (Find(sha)) {
|
||||
res[i - 1] = 1;
|
||||
}
|
||||
}
|
||||
if (subcmd == "EXISTS" && args.size() > 1)
|
||||
return ExistsCmd(args, cntx);
|
||||
|
||||
(*cntx)->StartArray(res.size());
|
||||
for (uint8_t v : res) {
|
||||
(*cntx)->SendLong(v);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (subcmd == "LIST")
|
||||
return ListCmd(cntx);
|
||||
|
||||
if (subcmd == "LIST") {
|
||||
return ListScripts(cntx);
|
||||
}
|
||||
if (subcmd == "LATENCY")
|
||||
return LatencyCmd(cntx);
|
||||
|
||||
if (subcmd == "LATENCY") {
|
||||
return PrintLatency(cntx);
|
||||
}
|
||||
if (subcmd == "LOAD" && args.size() == 2)
|
||||
return LoadCmd(args, cntx);
|
||||
|
||||
if (subcmd == "LOAD" && args.size() == 2) {
|
||||
string_view body = ArgS(args, 1);
|
||||
|
||||
if (body.empty()) {
|
||||
char sha[41];
|
||||
Interpreter::FuncSha1(body, sha);
|
||||
return (*cntx)->SendBulkString(sha);
|
||||
}
|
||||
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
// no need to lock the interpreter since we do not mess the stack.
|
||||
string error_or_id;
|
||||
Interpreter::AddResult add_result = interpreter->AddFunction(body, &error_or_id);
|
||||
|
||||
if (add_result == Interpreter::ALREADY_EXISTS) {
|
||||
return (*cntx)->SendBulkString(error_or_id);
|
||||
}
|
||||
if (add_result == Interpreter::COMPILE_ERR) {
|
||||
return (*cntx)->SendError(error_or_id);
|
||||
}
|
||||
|
||||
InsertFunction(error_or_id, body);
|
||||
|
||||
return (*cntx)->SendBulkString(error_or_id);
|
||||
}
|
||||
if (subcmd == "CONFIG" && args.size() > 2)
|
||||
return ConfigCmd(args, cntx);
|
||||
|
||||
string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd,
|
||||
"'. Try SCRIPT HELP.");
|
||||
cntx->reply_builder()->SendError(err, kSyntaxErrType);
|
||||
}
|
||||
|
||||
bool ScriptMgr::InsertFunction(std::string_view id, std::string_view body) {
|
||||
ScriptKey key;
|
||||
CHECK_EQ(key.size(), id.size());
|
||||
memcpy(key.data(), id.data(), key.size());
|
||||
|
||||
lock_guard lk(mu_);
|
||||
auto [it, inserted] = db_.emplace(key, nullptr);
|
||||
if (inserted) {
|
||||
it->second.reset(new char[body.size() + 1]);
|
||||
memcpy(it->second.get(), body.data(), body.size());
|
||||
it->second[body.size()] = '\0';
|
||||
}
|
||||
return inserted;
|
||||
}
|
||||
|
||||
const char* ScriptMgr::Find(std::string_view sha) const {
|
||||
if (sha.size() != 40)
|
||||
return nullptr;
|
||||
|
||||
ScriptKey key;
|
||||
memcpy(key.data(), sha.data(), key.size());
|
||||
|
||||
lock_guard lk(mu_);
|
||||
auto it = db_.find(key);
|
||||
if (it == db_.end())
|
||||
return nullptr;
|
||||
|
||||
return it->second.get();
|
||||
}
|
||||
|
||||
vector<pair<string, string>> ScriptMgr::GetLuaScripts() const {
|
||||
vector<pair<string, string>> res;
|
||||
|
||||
lock_guard lk(mu_);
|
||||
res.reserve(db_.size());
|
||||
for (const auto& k_v : db_) {
|
||||
string key(k_v.first.data(), k_v.first.size());
|
||||
|
||||
res.emplace_back(move(key), k_v.second.get());
|
||||
void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
|
||||
vector<uint8_t> res(args.size() - 1, 0);
|
||||
for (size_t i = 1; i < args.size(); ++i) {
|
||||
if (string_view sha = ArgS(args, i); Find(sha)) {
|
||||
res[i - 1] = 1;
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
(*cntx)->StartArray(res.size());
|
||||
for (uint8_t v : res) {
|
||||
(*cntx)->SendLong(v);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
void ScriptMgr::ListScripts(ConnectionContext* cntx) {
|
||||
vector<pair<string, string>> scripts = GetLuaScripts();
|
||||
void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view body = ArgS(args, 1);
|
||||
|
||||
if (body.empty()) {
|
||||
char sha[41];
|
||||
Interpreter::FuncSha1(body, sha);
|
||||
return (*cntx)->SendBulkString(sha);
|
||||
}
|
||||
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
auto interpreter = ss->BorrowInterpreter();
|
||||
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
||||
|
||||
// no need to lock the interpreter since we do not mess the stack.
|
||||
string error_or_id;
|
||||
Interpreter::AddResult add_result = interpreter->AddFunction(body, &error_or_id);
|
||||
|
||||
if (add_result == Interpreter::ALREADY_EXISTS) {
|
||||
return (*cntx)->SendBulkString(error_or_id);
|
||||
}
|
||||
if (add_result == Interpreter::COMPILE_ERR) {
|
||||
return (*cntx)->SendError(error_or_id);
|
||||
}
|
||||
|
||||
Insert(error_or_id, body);
|
||||
|
||||
return (*cntx)->SendBulkString(error_or_id);
|
||||
}
|
||||
|
||||
void ScriptMgr::ConfigCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||
lock_guard lk{mu_};
|
||||
ScriptKey key{ArgS(args, 1)};
|
||||
auto& data = db_[key];
|
||||
|
||||
for (auto pragma : args.subspan(2)) {
|
||||
if (!ScriptParams::ApplyPragma(facade::ToSV(pragma), &data))
|
||||
return (*cntx)->SendError("Invalid config format");
|
||||
}
|
||||
|
||||
UpdateScriptCaches(key, data);
|
||||
|
||||
return (*cntx)->SendOk();
|
||||
}
|
||||
|
||||
void ScriptMgr::ListCmd(ConnectionContext* cntx) const {
|
||||
vector<pair<string, string>> scripts = GetAll();
|
||||
(*cntx)->StartArray(scripts.size());
|
||||
for (const auto& k_v : scripts) {
|
||||
(*cntx)->StartArray(2);
|
||||
|
@ -153,7 +164,7 @@ void ScriptMgr::ListScripts(ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void ScriptMgr::PrintLatency(ConnectionContext* cntx) {
|
||||
void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
|
||||
absl::flat_hash_map<std::string, base::Histogram> result;
|
||||
boost::fibers::mutex mu;
|
||||
|
||||
|
@ -174,4 +185,66 @@ void ScriptMgr::PrintLatency(ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
bool ScriptMgr::Insert(std::string_view id, std::string_view body) {
|
||||
bool updated = false;
|
||||
|
||||
lock_guard lk{mu_};
|
||||
auto [it, _] = db_.emplace(id, InternalScriptData{default_params_, nullptr});
|
||||
|
||||
if (auto& body_ptr = it->second.body; !body_ptr) {
|
||||
updated = true;
|
||||
|
||||
body_ptr.reset(new char[body.size() + 1]);
|
||||
memcpy(body_ptr.get(), body.data(), body.size());
|
||||
body_ptr[body.size()] = '\0';
|
||||
}
|
||||
|
||||
UpdateScriptCaches(id, it->second);
|
||||
|
||||
return updated;
|
||||
}
|
||||
|
||||
optional<ScriptMgr::ScriptData> ScriptMgr::Find(std::string_view sha) const {
|
||||
if (sha.size() != ScriptKey{}.size())
|
||||
return std::nullopt;
|
||||
|
||||
lock_guard lk{mu_};
|
||||
if (auto it = db_.find(sha); it != db_.end() && it->second.body)
|
||||
return ScriptData{it->second, it->second.body.get()};
|
||||
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
vector<pair<string, string>> ScriptMgr::GetAll() const {
|
||||
vector<pair<string, string>> res;
|
||||
|
||||
lock_guard lk{mu_};
|
||||
res.reserve(db_.size());
|
||||
for (const auto& [sha, data] : db_) {
|
||||
res.emplace_back(string{sha.data(), sha.size()}, data.body.get());
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void ScriptMgr::UpdateScriptCaches(ScriptKey sha, ScriptParams params) const {
|
||||
shard_set->pool()->Await([&sha, ¶ms](auto index, auto* pb) {
|
||||
ServerState::tlocal()->SetScriptParams(sha, params);
|
||||
});
|
||||
}
|
||||
|
||||
bool ScriptMgr::ScriptParams::ApplyPragma(string_view pragma, ScriptParams* params) {
|
||||
if (pragma == "disable-atomicity") {
|
||||
params->atomic = false;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (pragma == "allow-undeclared-keys") {
|
||||
params->undeclared_keys = true;
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
|
||||
#include <array>
|
||||
#include <boost/fiber/mutex.hpp>
|
||||
#include <optional>
|
||||
|
||||
#include "server/conn_context.h"
|
||||
|
||||
|
@ -17,25 +18,58 @@ class EngineShardSet;
|
|||
|
||||
// This class has a state through the lifetime of a server because it manipulates scripts
|
||||
class ScriptMgr {
|
||||
public:
|
||||
struct ScriptParams {
|
||||
bool atomic = true; // Whether script must run atomically.
|
||||
bool undeclared_keys = false; // Whether script accesses undeclared keys.
|
||||
|
||||
// Return true if pragma was valid, false otherwise.
|
||||
// Valid pragmas are:
|
||||
// - allow-undeclared-keys -> undeclared_keys=true
|
||||
// - disable-atomicity -> atomic=false
|
||||
static bool ApplyPragma(std::string_view pragma, ScriptParams* params);
|
||||
};
|
||||
|
||||
struct ScriptData : public ScriptParams {
|
||||
const char* body = nullptr;
|
||||
};
|
||||
|
||||
struct ScriptKey : public std::array<char, 40> {
|
||||
ScriptKey() = default;
|
||||
ScriptKey(std::string_view sha);
|
||||
};
|
||||
|
||||
public:
|
||||
ScriptMgr();
|
||||
|
||||
void Run(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
bool InsertFunction(std::string_view sha, std::string_view body);
|
||||
// Insert script. Returns true if inserted new script.
|
||||
bool Insert(std::string_view sha, std::string_view body);
|
||||
|
||||
// Returns body as null-terminated c-string. NULL if sha is not found.
|
||||
const char* Find(std::string_view sha) const;
|
||||
// Get script body by sha, returns nullptr if not found.
|
||||
std::optional<ScriptData> Find(std::string_view sha) const;
|
||||
|
||||
// Returns a list of all scripts in the database with their sha and body.
|
||||
std::vector<std::pair<std::string, std::string>> GetLuaScripts() const;
|
||||
std::vector<std::pair<std::string, std::string>> GetAll() const;
|
||||
|
||||
private:
|
||||
void ListScripts(ConnectionContext* cntx);
|
||||
void PrintLatency(ConnectionContext* cntx);
|
||||
void ExistsCmd(CmdArgList args, ConnectionContext* cntx) const;
|
||||
void LoadCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
|
||||
void ListCmd(ConnectionContext* cntx) const;
|
||||
void LatencyCmd(ConnectionContext* cntx) const;
|
||||
|
||||
using ScriptKey = std::array<char, 40>;
|
||||
absl::flat_hash_map<ScriptKey, std::unique_ptr<char[]>> db_; // protected by mu_
|
||||
void UpdateScriptCaches(ScriptKey sha, ScriptParams params) const;
|
||||
|
||||
private:
|
||||
struct InternalScriptData : public ScriptParams {
|
||||
std::unique_ptr<char[]> body{};
|
||||
};
|
||||
|
||||
ScriptParams default_params_;
|
||||
|
||||
absl::flat_hash_map<ScriptKey, InternalScriptData> db_;
|
||||
mutable ::boost::fibers::mutex mu_;
|
||||
};
|
||||
|
||||
|
|
|
@ -934,7 +934,7 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
|||
};
|
||||
|
||||
auto get_scripts = [this] {
|
||||
auto scripts = script_mgr_->GetLuaScripts();
|
||||
auto scripts = script_mgr_->GetAll();
|
||||
StringVec script_bodies;
|
||||
for (const auto& script : scripts) {
|
||||
script_bodies.push_back(move(script.second));
|
||||
|
|
|
@ -10,6 +10,7 @@
|
|||
#include "base/histogram.h"
|
||||
#include "core/interpreter.h"
|
||||
#include "server/common.h"
|
||||
#include "server/script_mgr.h"
|
||||
#include "util/sliding_counter.h"
|
||||
|
||||
typedef struct mi_heap_s mi_heap_t;
|
||||
|
@ -172,6 +173,15 @@ class ServerState { // public struct - to allow initialization.
|
|||
call_latency_histos_[sha].Add(latency_usec);
|
||||
}
|
||||
|
||||
void SetScriptParams(const ScriptMgr::ScriptKey& key, ScriptMgr::ScriptParams params) {
|
||||
cached_script_params_[key] = params;
|
||||
}
|
||||
|
||||
std::optional<ScriptMgr::ScriptParams> GetScriptParams(const ScriptMgr::ScriptKey& key) {
|
||||
auto it = cached_script_params_.find(key);
|
||||
return it != cached_script_params_.end() ? std::optional{it->second} : std::nullopt;
|
||||
}
|
||||
|
||||
Stats stats;
|
||||
|
||||
private:
|
||||
|
@ -180,6 +190,8 @@ class ServerState { // public struct - to allow initialization.
|
|||
journal::Journal* journal_ = nullptr;
|
||||
|
||||
InterpreterManager interpreter_mgr_;
|
||||
absl::flat_hash_map<ScriptMgr::ScriptKey, ScriptMgr::ScriptParams> cached_script_params_;
|
||||
|
||||
GlobalState gstate_ = GlobalState::ACTIVE;
|
||||
|
||||
using Counter = util::SlidingCounter<7>;
|
||||
|
|
|
@ -91,7 +91,8 @@ so Dragonfly must run in global (1) or non-atomic (4) multi eval mode.
|
|||
"""
|
||||
|
||||
|
||||
@dfly_multi_test_args({'multi_eval_mode': 1, 'proactor_threads': 4}, {'multi_eval_mode': 4, 'proactor_threads': 4})
|
||||
@dfly_multi_test_args({'default_lua_config': 'allow-undeclared-keys', 'proactor_threads': 4},
|
||||
{'default_lua_config': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4})
|
||||
@pytest.mark.asyncio
|
||||
async def test_django_cacheops_script(async_client, num_keys=500):
|
||||
script = async_client.register_script(DJANGO_CACHEOPS_SCRIPT)
|
||||
|
@ -158,7 +159,8 @@ the task system should work reliably.
|
|||
"""
|
||||
|
||||
|
||||
@dfly_multi_test_args({'multi_eval_mode': 1, 'proactor_threads': 4}, {'multi_eval_mode': 4, 'proactor_threads': 4})
|
||||
@dfly_multi_test_args({'default_lua_config': 'allow-undeclared-keys', 'proactor_threads': 4},
|
||||
{'default_lua_config': 'allow-undeclared-keys disable-atomicity', 'proactor_threads': 4})
|
||||
@pytest.mark.asyncio
|
||||
async def test_golang_asynq_script(async_pool, num_queues=10, num_tasks=100):
|
||||
async def enqueue_worker(queue):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue