fix: make sure SCRIPT FLUSH concludes (#2565)

InterpreterManager::Reset creates now a new storage for interpreters,
 and waits for the old ones to be returned.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-09 18:55:14 +02:00 committed by GitHub
parent 881edb501e
commit 908efff7bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 40 additions and 14 deletions

View file

@ -516,6 +516,8 @@ auto Interpreter::AddFunction(string_view sha, string_view body, string* result)
}
bool Interpreter::Exists(string_view sha) const {
DCHECK(lua_);
if (sha.size() != 40)
return false;
@ -532,7 +534,7 @@ bool Interpreter::Exists(string_view sha) const {
}
auto Interpreter::RunFunction(string_view sha, std::string* error) -> RunResult {
DVLOG(1) << "RunFunction " << sha << " " << lua_gettop(lua_);
DVLOG(2) << "RunFunction " << sha << " " << lua_gettop(lua_);
DCHECK_EQ(40u, sha.size());
@ -940,17 +942,38 @@ Interpreter* InterpreterManager::Get() {
}
void InterpreterManager::Return(Interpreter* ir) {
DCHECK_LE(storage_.data(), ir); // ensure the pointer
DCHECK_GE(storage_.data() + storage_.size(), ir); // belongs to storage_
available_.push_back(ir);
waker_.notify();
if (ir >= storage_.data() && ir < storage_.data() + storage_.size()) {
available_.push_back(ir);
waker_.notify();
} else if (return_untracked_ > 0) {
return_untracked_--;
if (return_untracked_ == 0) {
reset_ec_.notify();
}
} else {
LOG(DFATAL) << "Returning untracked interpreter";
}
}
void InterpreterManager::Reset() {
waker_.await([this]() { return available_.size() == storage_.size(); });
lock_guard guard{reset_mu_};
// we perform double buffer swapping with storage and wait for the old interepreters to be
// returned.
return_untracked_ = storage_.size() - available_.size();
std::vector<Interpreter> next_storage;
next_storage.reserve(storage_.capacity());
next_storage.resize(storage_.size());
next_storage.swap(storage_);
available_.clear();
storage_.clear();
for (auto& ir : storage_) {
available_.push_back(&ir);
}
reset_ec_.await([this]() { return return_untracked_ == 0; });
VLOG(1) << "InterpreterManager::Reset ended";
}
} // namespace dfly

View file

@ -9,7 +9,7 @@
#include <string_view>
#include "core/core_types.h"
#include "core/fibers.h"
#include "util/fibers/synchronization.h"
typedef struct lua_State lua_State;
@ -141,7 +141,6 @@ class InterpreterManager {
// We pre-allocate the backing storage during initialization and
// start storing pointers to slots in the available vector.
storage_.reserve(num);
available_.reserve(num);
}
// Borrow interpreter. Always return it after usage.
@ -152,9 +151,13 @@ class InterpreterManager {
void Reset();
private:
EventCount waker_;
util::fb2::EventCount waker_, reset_ec_;
std::vector<Interpreter*> available_;
std::vector<Interpreter> storage_;
util::fb2::Mutex reset_mu_; // Acts as a singleton.
unsigned return_untracked_ = 0; // Number of returned interpreters during reset.
};
} // namespace dfly

View file

@ -1679,8 +1679,8 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
}
optional<ScriptMgr::ScriptParams> LoadScipt(string_view sha, ScriptMgr* script_mgr,
Interpreter* interpreter) {
optional<ScriptMgr::ScriptParams> LoadScript(string_view sha, ScriptMgr* script_mgr,
Interpreter* interpreter) {
auto ss = ServerState::tlocal();
if (!interpreter->Exists(sha)) {
@ -1808,7 +1808,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
return cntx->SendError(facade::kScriptNotFound);
}
auto params = LoadScipt(eval_args.sha, server_family_.script_mgr(), interpreter);
auto params = LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
if (!params)
return cntx->SendError(facade::kScriptNotFound);

View file

@ -287,7 +287,7 @@ void ScriptMgr::FlushAllScript() {
lock_guard lk{mu_};
db_.clear();
shard_set->pool()->Await([](auto index, auto* pb) {
shard_set->pool()->AwaitFiberOnAll([](auto* pb) {
ServerState* ss = ServerState::tlocal();
ss->ResetInterpreter();
});