mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(server): Implement SCRIPT FLUSH command (#2493)
* fix(server): Implement SCRIPT FLUSH command
This commit is contained in:
parent
90a9f05e36
commit
73fe5a4eb2
7 changed files with 62 additions and 0 deletions
|
@ -895,4 +895,11 @@ void InterpreterManager::Return(Interpreter* ir) {
|
||||||
waker_.notify();
|
waker_.notify();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void InterpreterManager::Reset() {
|
||||||
|
waker_.await([this]() { return available_.size() == storage_.size(); });
|
||||||
|
|
||||||
|
available_.clear();
|
||||||
|
storage_.clear();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -149,6 +149,8 @@ class InterpreterManager {
|
||||||
|
|
||||||
void Return(Interpreter*);
|
void Return(Interpreter*);
|
||||||
|
|
||||||
|
void Reset();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
EventCount waker_;
|
EventCount waker_;
|
||||||
std::vector<Interpreter*> available_;
|
std::vector<Interpreter*> available_;
|
||||||
|
|
|
@ -256,6 +256,29 @@ TEST_F(DflyEngineTest, EvalSha) {
|
||||||
EXPECT_THAT(resp, "c6459b95a0e81df97af6fdd49b1a9e0287a57363");
|
EXPECT_THAT(resp, "c6459b95a0e81df97af6fdd49b1a9e0287a57363");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DflyEngineTest, ScriptFlush) {
|
||||||
|
auto resp = Run({"script", "load", "return 5"});
|
||||||
|
EXPECT_THAT(resp, ArgType(RespExpr::STRING));
|
||||||
|
string sha{ToSV(resp.GetBuf())};
|
||||||
|
resp = Run({"evalsha", sha, "0"});
|
||||||
|
EXPECT_THAT(5, resp.GetInt());
|
||||||
|
resp = Run({"script", "exists", sha});
|
||||||
|
EXPECT_THAT(1, resp.GetInt());
|
||||||
|
|
||||||
|
resp = Run({"script", "flush"});
|
||||||
|
resp = Run({"script", "exists", sha});
|
||||||
|
EXPECT_THAT(0, resp.GetInt());
|
||||||
|
EXPECT_THAT(Run({"evalsha", sha, "0"}), ErrArg("NOSCRIPT No matching script. Please use EVAL."));
|
||||||
|
|
||||||
|
resp = Run({"script", "load", "return 5"});
|
||||||
|
EXPECT_THAT(resp, ArgType(RespExpr::STRING));
|
||||||
|
sha = string{ToSV(resp.GetBuf())};
|
||||||
|
resp = Run({"evalsha", sha, "0"});
|
||||||
|
EXPECT_THAT(5, resp.GetInt());
|
||||||
|
resp = Run({"script", "exists", sha});
|
||||||
|
EXPECT_THAT(1, resp.GetInt());
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, Hello) {
|
TEST_F(DflyEngineTest, Hello) {
|
||||||
auto resp = Run({"hello"});
|
auto resp = Run({"hello"});
|
||||||
ASSERT_THAT(resp, ArrLen(14));
|
ASSERT_THAT(resp, ArrLen(14));
|
||||||
|
|
|
@ -63,6 +63,8 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||||
"Subcommands are:",
|
"Subcommands are:",
|
||||||
"EXISTS <sha1> [<sha1> ...]",
|
"EXISTS <sha1> [<sha1> ...]",
|
||||||
" Return information about the existence of the scripts in the script cache.",
|
" Return information about the existence of the scripts in the script cache.",
|
||||||
|
"FLUSH",
|
||||||
|
" Flush the Lua scripts cache. Very dangerous on replicas.",
|
||||||
"LOAD <script>",
|
"LOAD <script>",
|
||||||
" Load a script into the scripts cache without executing it.",
|
" Load a script into the scripts cache without executing it.",
|
||||||
"FLAGS <sha> [flags ...]",
|
"FLAGS <sha> [flags ...]",
|
||||||
|
@ -83,6 +85,9 @@ void ScriptMgr::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||||
if (subcmd == "EXISTS" && args.size() > 1)
|
if (subcmd == "EXISTS" && args.size() > 1)
|
||||||
return ExistsCmd(args, cntx);
|
return ExistsCmd(args, cntx);
|
||||||
|
|
||||||
|
if (subcmd == "FLUSH")
|
||||||
|
return FlushCmd(args, cntx);
|
||||||
|
|
||||||
if (subcmd == "LIST")
|
if (subcmd == "LIST")
|
||||||
return ListCmd(cntx);
|
return ListCmd(cntx);
|
||||||
|
|
||||||
|
@ -116,6 +121,12 @@ void ScriptMgr::ExistsCmd(CmdArgList args, ConnectionContext* cntx) const {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ScriptMgr::FlushCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
FlushAllScript();
|
||||||
|
|
||||||
|
return cntx->SendOk();
|
||||||
|
}
|
||||||
|
|
||||||
void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
||||||
string_view body = ArgS(args, 1);
|
string_view body = ArgS(args, 1);
|
||||||
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
auto rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||||
|
@ -272,6 +283,16 @@ optional<ScriptMgr::ScriptData> ScriptMgr::Find(std::string_view sha) const {
|
||||||
return std::nullopt;
|
return std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ScriptMgr::FlushAllScript() {
|
||||||
|
lock_guard lk{mu_};
|
||||||
|
db_.clear();
|
||||||
|
|
||||||
|
shard_set->pool()->Await([](auto index, auto* pb) {
|
||||||
|
ServerState* ss = ServerState::tlocal();
|
||||||
|
ss->ResetInterpreter();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
vector<pair<string, ScriptMgr::ScriptData>> ScriptMgr::GetAll() const {
|
vector<pair<string, ScriptMgr::ScriptData>> ScriptMgr::GetAll() const {
|
||||||
vector<pair<string, ScriptData>> res;
|
vector<pair<string, ScriptData>> res;
|
||||||
|
|
||||||
|
|
|
@ -54,11 +54,14 @@ class ScriptMgr {
|
||||||
// Returns a list of all scripts in the database with their sha and body.
|
// Returns a list of all scripts in the database with their sha and body.
|
||||||
std::vector<std::pair<std::string, ScriptData>> GetAll() const;
|
std::vector<std::pair<std::string, ScriptData>> GetAll() const;
|
||||||
|
|
||||||
|
void FlushAllScript();
|
||||||
|
|
||||||
// Returns if scripts run as global transactions by default
|
// Returns if scripts run as global transactions by default
|
||||||
bool AreGlobalByDefault() const;
|
bool AreGlobalByDefault() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void ExistsCmd(CmdArgList args, ConnectionContext* cntx) const;
|
void ExistsCmd(CmdArgList args, ConnectionContext* cntx) const;
|
||||||
|
void FlushCmd(CmdArgList args, ConnectionContext* cntx);
|
||||||
void LoadCmd(CmdArgList args, ConnectionContext* cntx);
|
void LoadCmd(CmdArgList args, ConnectionContext* cntx);
|
||||||
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
|
void ConfigCmd(CmdArgList args, ConnectionContext* cntx);
|
||||||
void ListCmd(ConnectionContext* cntx) const;
|
void ListCmd(ConnectionContext* cntx) const;
|
||||||
|
|
|
@ -181,6 +181,10 @@ void ServerState::ReturnInterpreter(Interpreter* ir) {
|
||||||
interpreter_mgr_.Return(ir);
|
interpreter_mgr_.Return(ir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ServerState::ResetInterpreter() {
|
||||||
|
interpreter_mgr_.Reset();
|
||||||
|
}
|
||||||
|
|
||||||
ServerState* ServerState::SafeTLocal() {
|
ServerState* ServerState::SafeTLocal() {
|
||||||
// https://stackoverflow.com/a/75622732
|
// https://stackoverflow.com/a/75622732
|
||||||
asm volatile("");
|
asm volatile("");
|
||||||
|
|
|
@ -181,6 +181,8 @@ class ServerState { // public struct - to allow initialization.
|
||||||
// Return interpreter to internal manager to be re-used.
|
// Return interpreter to internal manager to be re-used.
|
||||||
void ReturnInterpreter(Interpreter*);
|
void ReturnInterpreter(Interpreter*);
|
||||||
|
|
||||||
|
void ResetInterpreter();
|
||||||
|
|
||||||
// Returns sum of all requests in the last 6 seconds
|
// Returns sum of all requests in the last 6 seconds
|
||||||
// (not including the current one).
|
// (not including the current one).
|
||||||
uint32_t MovingSum6() const {
|
uint32_t MovingSum6() const {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue