From 46292968ad06f52ea084a0d0122b75c1db74bfa7 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 13 Nov 2023 11:58:54 +0300 Subject: [PATCH] fix(search): Fix replication (#2159) * fix(search): Support replication Signed-off-by: Vladislav Oleshko --------- Signed-off-by: Vladislav Oleshko --- src/server/detail/save_stages_controller.cc | 31 +------------ src/server/detail/save_stages_controller.h | 2 - src/server/dflycmd.cc | 10 +---- src/server/main_service.h | 4 ++ src/server/rdb_load.cc | 49 +++++++++++++++++---- src/server/rdb_load.h | 12 +++++ src/server/rdb_save.cc | 30 ++++++++++++- src/server/rdb_save.h | 4 ++ src/server/replica.cc | 5 ++- src/server/search/search_family.cc | 6 ++- src/server/server_family.cc | 22 +++------ src/server/server_family.h | 4 ++ src/server/transaction.cc | 16 +++++++ src/server/transaction.h | 3 ++ tests/dragonfly/replication_test.py | 48 ++++++++++++++++++++ 15 files changed, 178 insertions(+), 68 deletions(-) diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index bded4f07b..7e38ee37f 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -11,7 +11,6 @@ #include "base/logging.h" #include "server/main_service.h" #include "server/script_mgr.h" -#include "server/search/doc_index.h" #include "server/transaction.h" #include "strings/human_readable.h" @@ -236,7 +235,7 @@ void SaveStagesController::SaveDfsSingle(EngineShard* shard) { auto& [snapshot, filename] = snapshots_[shard ? shard->shard_id() : shard_set->size()]; SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD; - auto glob_data = shard == nullptr ? GetGlobalData() : RdbSaver::GlobalData{}; + auto glob_data = shard == nullptr ? RdbSaver::GetGlobalData(service_) : RdbSaver::GlobalData{}; if (auto err = snapshot->Start(mode, filename, glob_data); err) { shared_err_ = err; @@ -258,7 +257,7 @@ void SaveStagesController::SaveRdb() { if (!is_cloud_) filename += ".tmp"; - if (auto err = snapshot->Start(SaveMode::RDB, filename, GetGlobalData()); err) { + if (auto err = snapshot->Start(SaveMode::RDB, filename, RdbSaver::GetGlobalData(service_)); err) { snapshot.reset(); return; } @@ -375,31 +374,5 @@ void SaveStagesController::RunStage(void (SaveStagesController::*cb)(unsigned)) } } -RdbSaver::GlobalData SaveStagesController::GetGlobalData() const { - StringVec script_bodies, search_indices; - - { - auto scripts = service_->script_mgr()->GetAll(); - script_bodies.reserve(scripts.size()); - for (auto& [sha, data] : scripts) - script_bodies.push_back(move(data.body)); - } - -#ifndef __APPLE__ - { - shard_set->Await(0, [&] { - auto* indices = EngineShard::tlocal()->search_indices(); - for (auto index_name : indices->GetIndexNames()) { - auto index_info = indices->GetIndex(index_name)->GetInfo(); - search_indices.emplace_back( - absl::StrCat(index_name, " ", index_info.BuildRestoreCommand())); - } - }); - } -#endif - - return RdbSaver::GlobalData{move(script_bodies), move(search_indices)}; -} - } // namespace detail } // namespace dfly diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 1d596155c..fc5062b8d 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -103,8 +103,6 @@ struct SaveStagesController : public SaveStagesInputs { void RunStage(void (SaveStagesController::*cb)(unsigned)); - RdbSaver::GlobalData GetGlobalData() const; - size_t GetSaveBuffersSize(); private: diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 08e8b6a93..83053c1dd 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -563,15 +563,7 @@ void DflyCmd::FullSyncFb(FlowInfo* flow, Context* cntx) { RdbSaver* saver = flow->saver.get(); if (saver->Mode() == SaveMode::SUMMARY || saver->Mode() == SaveMode::SINGLE_SHARD_WITH_SUMMARY) { - auto scripts = sf_->script_mgr()->GetAll(); - StringVec script_bodies; - for (auto& [sha, data] : scripts) { - // Always send original body (with header & without auto async calls) that determines the sha, - // It's stored only if it's different from the post-processed version. - string& body = data.orig_body.empty() ? data.body : data.orig_body; - script_bodies.push_back(std::move(body)); - } - ec = saver->SaveHeader({script_bodies, {}}); + ec = saver->SaveHeader(saver->GetGlobalData(&sf_->service())); } else { ec = saver->SaveHeader({}); } diff --git a/src/server/main_service.h b/src/server/main_service.h index fc36ebf8a..190cf40c7 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -113,6 +113,10 @@ class Service : public facade::ServiceInterface { return server_family_.script_mgr(); } + const ScriptMgr* script_mgr() const { + return server_family_.script_mgr(); + } + ServerFamily& server_family() { return server_family_; } diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index b55169ce6..f5159b050 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -43,9 +43,11 @@ extern "C" { #include "server/main_service.h" #include "server/rdb_extensions.h" #include "server/script_mgr.h" +#include "server/search/doc_index.h" #include "server/serializer_commons.h" #include "server/server_state.h" #include "server/set_family.h" +#include "server/transaction.h" #include "strings/human_readable.h" ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size); @@ -1913,8 +1915,11 @@ error_code RdbLoader::Load(io::Source* src) { VLOG(1) << "Read RDB_OPCODE_FULLSYNC_END"; RETURN_ON_ERR(EnsureRead(8)); mem_buf_->ConsumeInput(8); // ignore 8 bytes - if (full_sync_cut_cb) + + if (full_sync_cut_cb) { + FlushAllShards(); // Flush as the handler awakes post load handlers full_sync_cut_cb(); + } continue; } @@ -1985,10 +1990,7 @@ error_code RdbLoader::Load(io::Source* src) { } if (type == RDB_OPCODE_JOURNAL_BLOB) { - // We should flush all changes on the current db before applying incremental changes. - for (unsigned i = 0; i < shard_set->size(); ++i) { - FlushShardAsync(i); - } + FlushAllShards(); // Always flush before applying incremental on top RETURN_ON_ERR(HandleJournalBlob(service_)); continue; } @@ -2224,7 +2226,7 @@ error_code RdbLoader::HandleAux() { } else if (auxkey == "redis-bits") { /* Just ignored. */ } else if (auxkey == "search-index") { - LoadSearchIndexDefFromAux(move(auxval)); + LoadSearchIndexDefFromAux(std::move(auxval)); } else { /* We ignore fields we don't understand, as by AUX field * contract. */ @@ -2258,6 +2260,11 @@ void RdbLoader::FlushShardAsync(ShardId sid) { shard_set->Add(sid, std::move(cb)); } +void RdbLoader::FlushAllShards() { + for (ShardId i = 0; i < shard_set->size(); i++) + FlushShardAsync(i); +} + std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) { OpaqueObjLoader visitor(opaque.rdb_type, pv); std::visit(visitor, opaque.obj); @@ -2369,9 +2376,11 @@ void RdbLoader::LoadScriptFromAux(string&& body) { void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { facade::CapturingReplyBuilder crb{}; ConnectionContext cntx{nullptr, nullptr, &crb}; + cntx.is_replicating = true; cntx.journal_emulated = true; cntx.skip_acl_validation = true; + // Avoid deleting local crb absl::Cleanup cntx_clean = [&cntx] { cntx.Inject(nullptr); }; uint32_t consumed = 0; @@ -2379,7 +2388,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { facade::RedisParser parser; def += "\r\n"; // RESP terminator - absl::Span buffer{reinterpret_cast(def.data()), def.size()}; + io::MutableBytes buffer{reinterpret_cast(def.data()), def.size()}; auto res = parser.Parse(buffer, &consumed, &resp_vec); if (res != facade::RedisParser::Result::OK) { @@ -2387,9 +2396,9 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { return; } + // Prepend FT.CREATE to index definiton CmdArgVec arg_vec; facade::RespExpr::VecToArgList(resp_vec, &arg_vec); - string ft_create = "FT.CREATE"; arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()}); @@ -2401,4 +2410,28 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { } } +void RdbLoader::PerformPreLoad(Service* service) { + const CommandId* cmd = service->FindCmd("FT.DROPINDEX"); + if (cmd == nullptr) + return; // MacOS + + Transaction::RunOnceAsCommand(cmd, [](auto* trans, auto* es) { + for (const auto& name : es->search_indices()->GetIndexNames()) + es->search_indices()->DropIndex(name); + return OpStatus::OK; + }); +} + +void RdbLoader::PerformPostLoad(Service* service) { + const CommandId* cmd = service->FindCmd("FT.CREATE"); + if (cmd == nullptr) // On MacOS we don't include search so FT.CREATE won't exist. + return; + + // Rebuild all search indices as only their definitions are extracted from the snapshot + Transaction::RunOnceAsCommand(cmd, [](auto* trans, auto* es) { + es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es)); + return OpStatus::OK; + }); +} + } // namespace dfly diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 18d04706b..d1fc56827 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -200,6 +200,13 @@ class RdbLoader : protected RdbLoaderBase { full_sync_cut_cb = std::move(cb); } + // Perform pre load procedures after transitioning into the global LOADING state. + static void PerformPreLoad(Service* service); + + // Performs post load procedures while still remaining in global LOADING state. + // Called once immediately after loading the snapshot / full sync succeeded from the coordinator. + static void PerformPostLoad(Service* service); + private: struct Item { std::string key; @@ -229,9 +236,14 @@ class RdbLoader : protected RdbLoaderBase { void FinishLoad(absl::Time start_time, size_t* keys_loaded); void FlushShardAsync(ShardId sid); + void FlushAllShards(); + void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib); void LoadScriptFromAux(std::string&& value); + + // Load index definition from RESP string describing it in FT.CREATE format, + // issues an FT.CREATE call, but does not start indexing void LoadSearchIndexDefFromAux(std::string&& value); private: diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index b6291d8b3..2ddb78591 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -32,7 +32,9 @@ extern "C" { #include "core/string_set.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/main_service.h" #include "server/rdb_extensions.h" +#include "server/search/doc_index.h" #include "server/serializer_commons.h" #include "server/snapshot.h" #include "util/fibers/simple_channel.h" @@ -1130,6 +1132,32 @@ size_t RdbSaver::Impl::GetTotalBuffersSize() const { return total_bytes; } +RdbSaver::GlobalData RdbSaver::GetGlobalData(const Service* service) { + StringVec script_bodies, search_indices; + + { + auto scripts = service->script_mgr()->GetAll(); + script_bodies.reserve(scripts.size()); + for (auto& [sha, data] : scripts) + script_bodies.push_back(move(data.body)); + } + +#ifndef __APPLE__ + { + shard_set->Await(0, [&] { + auto* indices = EngineShard::tlocal()->search_indices(); + for (auto index_name : indices->GetIndexNames()) { + auto index_info = indices->GetIndex(index_name)->GetInfo(); + search_indices.emplace_back( + absl::StrCat(index_name, " ", index_info.BuildRestoreCommand())); + } + }); + } +#endif + + return RdbSaver::GlobalData{std::move(script_bodies), std::move(search_indices)}; +} + void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const { for (auto& ptr : shard_snapshots_) { const RdbTypeFreqMap& src_map = ptr->freq_map(); @@ -1265,7 +1293,7 @@ error_code RdbSaver::SaveAux(const GlobalData& glob_state) { if (!glob_state.search_indices.empty()) LOG(WARNING) << "Dragonfly search index data is incompatible with the RDB format"; } else { - // Search index definitions are not tied to shards and are saved in the summary file + // Search index definitions are not tied to shards and are saved in the summary file DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.search_indices.empty()); for (const string& s : glob_state.search_indices) RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("search-index", s)); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 033d70b78..cafc0380a 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -28,6 +28,7 @@ namespace dfly { uint8_t RdbObjectType(unsigned type, unsigned encoding); class EngineShard; +class Service; class AlignedBuffer : public ::io::Sink { public: @@ -111,6 +112,9 @@ class RdbSaver { // Get total size of all rdb serializer buffers and items currently placed in channel size_t GetTotalBuffersSize() const; + // Fetch global data to be serialized in summary part of a snapshot / full sync. + static GlobalData GetGlobalData(const Service* service); + private: class Impl; diff --git a/src/server/replica.cc b/src/server/replica.cc index 4059dbd6c..2d8eda3a4 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -489,6 +489,7 @@ error_code Replica::InitiateDflySync() { if (num_full_flows == num_df_flows_) { JournalExecutor{&service_}.FlushAll(); + RdbLoader::PerformPreLoad(&service_); } else if (num_full_flows == 0) { sync_type = "partial"; } else { @@ -516,15 +517,17 @@ error_code Replica::InitiateDflySync() { if (cntx_.IsCancelled()) return cntx_.GetError(); + RdbLoader::PerformPostLoad(&service_); + // Send DFLY STARTSTABLE. if (auto ec = SendNextPhaseRequest("STARTSTABLE"); ec) { return cntx_.ReportError(ec); } // Joining flows and resetting state is done by cleanup. - double seconds = double(absl::ToInt64Milliseconds(absl::Now() - start_time)) / 1000; LOG(INFO) << sync_type << " sync finished in " << strings::HumanReadableElapsedTime(seconds); + return cntx_.GetError(); } diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index aab73f0ce..2dd8a263b 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -586,8 +586,10 @@ void SearchFamily::Register(CommandRegistry* registry) { CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL; registry->StartFamily(); - *registry << CI{"FT.CREATE", CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate) - << CI{"FT.DROPINDEX", CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC(FtDropIndex) + *registry << CI{"FT.CREATE", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( + FtCreate) + << CI{"FT.DROPINDEX", CO::WRITE | CO::GLOBAL_TRANS, -2, 0, 0, acl::FT_SEARCH}.HFUNC( + FtDropIndex) << CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) // Underscore same as in RediSearch because it's "temporary" (long time already) << CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 3814b5882..defc19f3b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -367,21 +367,6 @@ bool IsReplicatingNoOne(string_view host, string_view port) { return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one"); } -void RebuildAllSearchIndices(Service* service) { - const CommandId* cmd = service->FindCmd("FT.CREATE"); - if (cmd == nullptr) { - // On MacOS we don't include search so FT.CREATE won't exist. - return; - } - - boost::intrusive_ptr trans{new Transaction{cmd}}; - trans->InitByArgs(0, {}); - trans->ScheduleSingleHop([](auto* trans, auto* es) { - es->search_indices()->RebuildAllIndices(trans->GetOpArgs(es)); - return OpStatus::OK; - }); -} - template void UpdateMax(T* maxv, T current) { *maxv = std::max(*maxv, current); } @@ -693,6 +678,8 @@ Future ServerFamily::Load(const std::string& load_path) { return {}; } + RdbLoader::PerformPreLoad(&service_); + auto& pool = service_.proactor_pool(); vector load_fibers; @@ -729,11 +716,14 @@ Future ServerFamily::Load(const std::string& load_path) { for (auto& fiber : load_fibers) { fiber.Join(); } + if (aggregated_result->first_error) { LOG(ERROR) << "Rdb load failed. " << (*aggregated_result->first_error).message(); exit(1); } - RebuildAllSearchIndices(&service_); + + RdbLoader::PerformPostLoad(&service_); + LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); ec_promise.set_value(*(aggregated_result->first_error)); diff --git a/src/server/server_family.h b/src/server/server_family.h index dfff5875b..d48361fa6 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -137,6 +137,10 @@ class ServerFamily { return script_mgr_.get(); } + const ScriptMgr* script_mgr() const { + return script_mgr_.get(); + } + void StatsMC(std::string_view section, facade::ConnectionContext* cntx); // if new_version is true, saves DF specific, non redis compatible snapshot. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 1121b35f3..d1e656489 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1409,6 +1409,22 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false); } +void Transaction::RunOnceAsCommand(const CommandId* cid, RunnableType cb) { + if (!ProactorBase::IsProactorThread()) + return shard_set->pool()->at(0)->Await([cid, cb] { return RunOnceAsCommand(cid, cb); }); + + DCHECK(cid); + DCHECK(cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL)); + DCHECK(ProactorBase::IsProactorThread()); + + boost::intrusive_ptr trans{new Transaction{cid}}; + trans->InitByArgs(0, {}); + trans->ScheduleSingleHop([cb](auto* trans, auto* es) { + cb(trans, es); + return OpStatus::OK; + }); +} + void Transaction::CancelBlocking() { if (coordinator_state_ & COORD_BLOCKED) { coordinator_state_ |= COORD_CANCELLED; diff --git a/src/server/transaction.h b/src/server/transaction.h index ac63ad301..fbdecd3b1 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -320,6 +320,9 @@ class Transaction { bool multi_commands, bool allow_await) const; void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + // Utility to run a single hop on a no-key command + static void RunOnceAsCommand(const CommandId* cid, RunnableType cb); + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index d967aa87e..2b68c511e 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1713,3 +1713,51 @@ async def test_network_disconnect_small_buffer(df_local_factory, df_seeder_facto master.stop() replica.stop() assert master.is_in_logs("Partial sync requested from stale LSN") + + +async def test_search(df_local_factory): + master = df_local_factory.create(proactor_threads=4) + replica = df_local_factory.create(proactor_threads=4) + + df_local_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + # First, create an index on replica + await c_replica.execute_command("FT.CREATE", "idx-r", "SCHEMA", "f1", "numeric") + for i in range(0, 10): + await c_replica.hset(f"k{i}", mapping={"f1": i}) + assert (await c_replica.ft("idx-r").search("@f1:[5 9]")).total == 5 + + # Second, create an index on master + await c_master.execute_command("FT.CREATE", "idx-m", "SCHEMA", "f2", "numeric") + for i in range(0, 10): + await c_master.hset(f"k{i}", mapping={"f2": i * 2}) + assert (await c_master.ft("idx-m").search("@f2:[6 10]")).total == 3 + + # Replicate + await c_replica.execute_command("REPLICAOF", "localhost", master.port) + await wait_available_async(c_replica) + + # Check master index was picked up and original index was deleted + assert (await c_replica.execute_command("FT._LIST")) == ["idx-m"] + + # Check query from master runs on replica + assert (await c_replica.ft("idx-m").search("@f2:[6 10]")).total == 3 + + # Set a new key + await c_master.hset("kNEW", mapping={"f2": 100}) + await asyncio.sleep(0.1) + + assert (await c_replica.ft("idx-m").search("@f2:[100 100]")).docs[0].id == "kNEW" + + # Create a new aux index on master + await c_master.execute_command("FT.CREATE", "idx-m2", "SCHEMA", "f2", "numeric", "sortable") + await asyncio.sleep(0.1) + + from redis.commands.search.query import Query + + assert (await c_replica.ft("idx-m2").search(Query("*").sort_by("f2").paging(0, 1))).docs[ + 0 + ].id == "k0"