feat: search index persistence (#1721)

* feat: WIP search index persistence

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

* Update src/server/search/doc_index.cc

Co-authored-by: Kostas Kyrimis  <kostaskyrim@gmail.com>
Signed-off-by: Vladislav <vladislav.oleshko@gmail.com>

* fix: foxes

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Signed-off-by: Vladislav <vladislav.oleshko@gmail.com>
Co-authored-by: Kostas Kyrimis <kostaskyrim@gmail.com>
This commit is contained in:
Vladislav 2023-08-24 13:41:10 +03:00 committed by GitHub
parent bd87fb75fa
commit 84871b8dce
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 277 additions and 76 deletions

View file

@ -19,6 +19,7 @@ extern "C" {
#include <absl/cleanup/cleanup.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_split.h>
#include <lz4frame.h>
#include <zstd.h>
@ -2188,16 +2189,7 @@ error_code RdbLoader::HandleAux() {
} else if (auxkey == "repl-offset") {
// TODO
} else if (auxkey == "lua") {
ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
string_view body{auxval};
if (script_mgr_) {
auto res = script_mgr_->Insert(body, interpreter);
if (!res)
LOG(ERROR) << "Error compiling script";
}
LoadScriptFromAux(move(auxval));
} else if (auxkey == "redis-ver") {
VLOG(1) << "Loading RDB produced by version " << auxval;
} else if (auxkey == "ctime") {
@ -2220,6 +2212,8 @@ error_code RdbLoader::HandleAux() {
}
} else if (auxkey == "redis-bits") {
/* Just ignored. */
} else if (auxkey == "search-index") {
LoadSearchIndexDefFromAux(move(auxval));
} else {
/* We ignore fields we don't understand, as by AUX field
* contract. */
@ -2339,4 +2333,50 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
return kOk;
}
void RdbLoader::LoadScriptFromAux(string&& body) {
ServerState* ss = ServerState::tlocal();
auto interpreter = ss->BorrowInterpreter();
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
if (script_mgr_) {
auto res = script_mgr_->Insert(body, interpreter);
if (!res)
LOG(ERROR) << "Error compiling script";
}
}
void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
facade::CapturingReplyBuilder crb{};
ConnectionContext cntx{nullptr, nullptr, &crb};
cntx.journal_emulated = true;
absl::Cleanup cntx_clean = [&cntx] { cntx.Inject(nullptr); };
uint32_t consumed = 0;
facade::RespVec resp_vec;
facade::RedisParser parser;
def += "\r\n"; // RESP terminator
absl::Span<uint8_t> buffer{reinterpret_cast<uint8_t*>(def.data()), def.size()};
auto res = parser.Parse(buffer, &consumed, &resp_vec);
if (res != facade::RedisParser::Result::OK) {
LOG(ERROR) << "Bad index definition: " << def;
return;
}
CmdArgVec arg_vec;
facade::RespExpr::VecToArgList(resp_vec, &arg_vec);
string ft_create = "FT.CREATE";
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});
service_->DispatchCommand(absl::MakeSpan(arg_vec), &cntx);
auto response = crb.Take();
if (auto err = facade::CapturingReplyBuilder::GetError(response); err) {
LOG(ERROR) << "Bad index definition: " << def << " " << err->first;
}
}
} // namespace dfly