mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
feat(cluster): Allow appending RDB to existing store (#3505)
* feat(cluster): Allow appending RDB to existing store The goal of this PR is to support the loadoing of multiple RDB files into a single server, like when migrating from a Valkey cluster to Dragonfly with a different number of nodes. It makes the following changes: * Removes `DEBUG LOAD`, as we already have `DFLY LOAD` * Adds `APPEND` option to `DFLY LOAD` (i.e. `DFLY LOAD <filename> APPEND`) that loads an RDB without first flushing the data store, overriding existing keys * Does not load keys belonging to unowned slots, if in cluster mode Fixes #2840
This commit is contained in:
parent
5b546df94d
commit
ad3ebf61d2
12 changed files with 175 additions and 78 deletions
|
@ -597,7 +597,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) {
|
||||||
EXPECT_EQ(Run({"save", "df"}), "OK");
|
EXPECT_EQ(Run({"save", "df"}), "OK");
|
||||||
|
|
||||||
auto save_info = service_->server_family().GetLastSaveInfo();
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK");
|
EXPECT_EQ(Run({"dfly", "load", save_info.file_name}), "OK");
|
||||||
EXPECT_EQ(CheckedInt({"dbsize"}), 50000);
|
EXPECT_EQ(CheckedInt({"dbsize"}), 50000);
|
||||||
|
|
||||||
ConfigSingleNodeCluster("abcd1234");
|
ConfigSingleNodeCluster("abcd1234");
|
||||||
|
|
|
@ -373,7 +373,6 @@ void DebugCmd::Run(CmdArgList args) {
|
||||||
" arguments. Each descriptor is prefixed by its frequency count",
|
" arguments. Each descriptor is prefixed by its frequency count",
|
||||||
"OBJECT <key> [COMPRESS]",
|
"OBJECT <key> [COMPRESS]",
|
||||||
" Show low-level info about `key` and associated value.",
|
" Show low-level info about `key` and associated value.",
|
||||||
"LOAD <filename>",
|
|
||||||
"RELOAD [option ...]",
|
"RELOAD [option ...]",
|
||||||
" Save the RDB on disk and reload it back to memory. Valid <option> values:",
|
" Save the RDB on disk and reload it back to memory. Valid <option> values:",
|
||||||
" * NOSAVE: the database will be loaded from an existing RDB file.",
|
" * NOSAVE: the database will be loaded from an existing RDB file.",
|
||||||
|
@ -431,10 +430,6 @@ void DebugCmd::Run(CmdArgList args) {
|
||||||
return Watched();
|
return Watched();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (subcmd == "LOAD" && args.size() == 2) {
|
|
||||||
return Load(ArgS(args, 1));
|
|
||||||
}
|
|
||||||
|
|
||||||
if (subcmd == "OBJECT" && args.size() >= 2) {
|
if (subcmd == "OBJECT" && args.size() >= 2) {
|
||||||
string_view key = ArgS(args, 1);
|
string_view key = ArgS(args, 1);
|
||||||
args.remove_prefix(2);
|
args.remove_prefix(2);
|
||||||
|
@ -500,7 +495,19 @@ void DebugCmd::Reload(CmdArgList args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
string last_save_file = sf_.GetLastSaveInfo().file_name;
|
string last_save_file = sf_.GetLastSaveInfo().file_name;
|
||||||
Load(last_save_file);
|
|
||||||
|
sf_.FlushAll(cntx_);
|
||||||
|
|
||||||
|
if (auto fut_ec = sf_.Load(last_save_file, ServerFamily::LoadExistingKeys::kFail); fut_ec) {
|
||||||
|
GenericError ec = fut_ec->Get();
|
||||||
|
if (ec) {
|
||||||
|
string msg = ec.Format();
|
||||||
|
LOG(WARNING) << "Could not load file " << msg;
|
||||||
|
return cntx_->SendError(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cntx_->SendOk();
|
||||||
}
|
}
|
||||||
|
|
||||||
void DebugCmd::Replica(CmdArgList args) {
|
void DebugCmd::Replica(CmdArgList args) {
|
||||||
|
@ -529,52 +536,6 @@ void DebugCmd::Replica(CmdArgList args) {
|
||||||
return cntx_->SendError(UnknownSubCmd("replica", "DEBUG"));
|
return cntx_->SendError(UnknownSubCmd("replica", "DEBUG"));
|
||||||
}
|
}
|
||||||
|
|
||||||
void DebugCmd::Load(string_view filename) {
|
|
||||||
if (!ServerState::tlocal()->is_master) {
|
|
||||||
return cntx_->SendError("Replica cannot load data");
|
|
||||||
}
|
|
||||||
|
|
||||||
auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
|
||||||
|
|
||||||
if (new_state != GlobalState::LOADING) {
|
|
||||||
LOG(WARNING) << new_state << " in progress, ignored";
|
|
||||||
return cntx_->SendError("Could not load file");
|
|
||||||
}
|
|
||||||
|
|
||||||
absl::Cleanup rev_state = [this] {
|
|
||||||
sf_.service().SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
|
|
||||||
};
|
|
||||||
|
|
||||||
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
|
|
||||||
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
|
|
||||||
flush_trans->InitByArgs(cntx_->ns, 0, {});
|
|
||||||
VLOG(1) << "Performing flush";
|
|
||||||
error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll);
|
|
||||||
if (ec) {
|
|
||||||
LOG(ERROR) << "Error flushing db " << ec.message();
|
|
||||||
}
|
|
||||||
|
|
||||||
fs::path path(filename);
|
|
||||||
|
|
||||||
if (filename.empty()) {
|
|
||||||
fs::path dir_path(GetFlag(FLAGS_dir));
|
|
||||||
string filename = GetFlag(FLAGS_dbfilename);
|
|
||||||
dir_path.append(filename);
|
|
||||||
path = dir_path;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (auto fut_ec = sf_.Load(path.generic_string()); fut_ec) {
|
|
||||||
GenericError ec = fut_ec->Get();
|
|
||||||
if (ec) {
|
|
||||||
string msg = ec.Format();
|
|
||||||
LOG(WARNING) << "Could not load file " << msg;
|
|
||||||
return cntx_->SendError(msg);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cntx_->SendOk();
|
|
||||||
}
|
|
||||||
|
|
||||||
optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args) {
|
optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args) {
|
||||||
if (args.size() < 2) {
|
if (args.size() < 2) {
|
||||||
cntx_->SendError(UnknownSubCmd("populate", "DEBUG"));
|
cntx_->SendError(UnknownSubCmd("populate", "DEBUG"));
|
||||||
|
|
|
@ -30,9 +30,6 @@ class DebugCmd {
|
||||||
|
|
||||||
void Run(CmdArgList args);
|
void Run(CmdArgList args);
|
||||||
|
|
||||||
// A public function that loads a snapshot.
|
|
||||||
void Load(std::string_view filename);
|
|
||||||
|
|
||||||
static void Shutdown();
|
static void Shutdown();
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
|
|
@ -163,11 +163,33 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return ReplicaOffset(args, cntx);
|
return ReplicaOffset(args, cntx);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sub_cmd == "LOAD" && args.size() == 2) {
|
if (sub_cmd == "LOAD") {
|
||||||
DebugCmd debug_cmd{sf_, cntx};
|
return Load(args, cntx);
|
||||||
debug_cmd.Load(ArgS(args, 1));
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sub_cmd == "HELP") {
|
||||||
|
string_view help_arr[] = {
|
||||||
|
"DFLY <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
|
||||||
|
"THREAD",
|
||||||
|
" Returns connection thread index and number of threads",
|
||||||
|
"THREAD <thread-id>",
|
||||||
|
" Migrates connection to thread <thread-id>",
|
||||||
|
"EXPIRE",
|
||||||
|
" Collects all expired items.",
|
||||||
|
"REPLICAOFFSET",
|
||||||
|
" Returns LSN (log sequence number) per shard. These are the sequential ids of the ",
|
||||||
|
" journal entry.",
|
||||||
|
"LOAD <filename> [APPEND]",
|
||||||
|
" Loads <filename> RDB/DFS file into the data store.",
|
||||||
|
" * APPEND: Existing keys are NOT removed before loading the file, conflicting ",
|
||||||
|
" keys (that exist in both data store and in file) are overridden.",
|
||||||
|
"HELP",
|
||||||
|
" Prints this help.",
|
||||||
|
};
|
||||||
|
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||||
|
return rb->SendSimpleStrArr(help_arr);
|
||||||
|
}
|
||||||
|
|
||||||
cntx->SendError(kSyntaxErr);
|
cntx->SendError(kSyntaxErr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -500,6 +522,41 @@ void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DflyCmd::Load(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
CmdArgParser parser{args};
|
||||||
|
parser.ExpectTag("LOAD");
|
||||||
|
string_view filename = parser.Next();
|
||||||
|
ServerFamily::LoadExistingKeys existing_keys = ServerFamily::LoadExistingKeys::kFail;
|
||||||
|
|
||||||
|
if (parser.HasNext()) {
|
||||||
|
parser.ExpectTag("APPEND");
|
||||||
|
existing_keys = ServerFamily::LoadExistingKeys::kOverride;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parser.HasNext()) {
|
||||||
|
parser.Error();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (parser.HasError()) {
|
||||||
|
return cntx->SendError(kSyntaxErr);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (existing_keys == ServerFamily::LoadExistingKeys::kFail) {
|
||||||
|
sf_->FlushAll(cntx);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (auto fut_ec = sf_->Load(filename, existing_keys); fut_ec) {
|
||||||
|
GenericError ec = fut_ec->Get();
|
||||||
|
if (ec) {
|
||||||
|
string msg = ec.Format();
|
||||||
|
LOG(WARNING) << "Could not load file " << msg;
|
||||||
|
return cntx->SendError(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cntx->SendOk();
|
||||||
|
}
|
||||||
|
|
||||||
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
|
OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
|
||||||
DCHECK(!flow->full_sync_fb.IsJoinable());
|
DCHECK(!flow->full_sync_fb.IsJoinable());
|
||||||
DCHECK(shard);
|
DCHECK(shard);
|
||||||
|
|
|
@ -199,6 +199,8 @@ class DflyCmd {
|
||||||
// Return journal records num sent for each flow of replication.
|
// Return journal records num sent for each flow of replication.
|
||||||
void ReplicaOffset(CmdArgList args, ConnectionContext* cntx);
|
void ReplicaOffset(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
|
void Load(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
// Start full sync in thread. Start FullSyncFb. Called for each flow.
|
// Start full sync in thread. Start FullSyncFb. Called for each flow.
|
||||||
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
|
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
|
||||||
|
|
||||||
|
|
|
@ -35,6 +35,8 @@ extern "C" {
|
||||||
#include "core/sorted_map.h"
|
#include "core/sorted_map.h"
|
||||||
#include "core/string_map.h"
|
#include "core/string_map.h"
|
||||||
#include "core/string_set.h"
|
#include "core/string_set.h"
|
||||||
|
#include "server/cluster/cluster_defs.h"
|
||||||
|
#include "server/cluster/cluster_family.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
#include "server/hset_family.h"
|
#include "server/hset_family.h"
|
||||||
|
@ -2481,7 +2483,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
||||||
|
|
||||||
auto& res = *op_res;
|
auto& res = *op_res;
|
||||||
res.it->first.SetSticky(item->is_sticky);
|
res.it->first.SetSticky(item->is_sticky);
|
||||||
if (!res.is_new) {
|
if (!override_existing_keys_ && !res.is_new) {
|
||||||
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
|
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2520,6 +2522,13 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
|
||||||
return ec;
|
return ec;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!load_unowned_slots_ && cluster::IsClusterEnabled()) {
|
||||||
|
const cluster::ClusterConfig* cluster_config = cluster::ClusterFamily::cluster_config();
|
||||||
|
if (cluster_config != nullptr && !cluster_config->IsMySlot(item->key)) {
|
||||||
|
return kOk; // Ignoring item
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/* Check if the key already expired. This function is used when loading
|
/* Check if the key already expired. This function is used when loading
|
||||||
* an RDB file from disk, either at startup, or when an RDB was
|
* an RDB file from disk, either at startup, or when an RDB was
|
||||||
* received from the master. In the latter case, the master is
|
* received from the master. In the latter case, the master is
|
||||||
|
|
|
@ -181,7 +181,16 @@ class RdbLoader : protected RdbLoaderBase {
|
||||||
|
|
||||||
~RdbLoader();
|
~RdbLoader();
|
||||||
|
|
||||||
|
void SetOverrideExistingKeys(bool override) {
|
||||||
|
override_existing_keys_ = override;
|
||||||
|
}
|
||||||
|
|
||||||
|
void SetLoadUnownedSlots(bool load_unowned) {
|
||||||
|
load_unowned_slots_ = load_unowned;
|
||||||
|
}
|
||||||
|
|
||||||
std::error_code Load(::io::Source* src);
|
std::error_code Load(::io::Source* src);
|
||||||
|
|
||||||
void set_source_limit(size_t n) {
|
void set_source_limit(size_t n) {
|
||||||
source_limit_ = n;
|
source_limit_ = n;
|
||||||
}
|
}
|
||||||
|
@ -273,6 +282,8 @@ class RdbLoader : protected RdbLoaderBase {
|
||||||
|
|
||||||
private:
|
private:
|
||||||
Service* service_;
|
Service* service_;
|
||||||
|
bool override_existing_keys_ = false;
|
||||||
|
bool load_unowned_slots_ = false;
|
||||||
ScriptMgr* script_mgr_;
|
ScriptMgr* script_mgr_;
|
||||||
std::vector<ItemsBuf> shard_buf_;
|
std::vector<ItemsBuf> shard_buf_;
|
||||||
|
|
||||||
|
|
|
@ -167,7 +167,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
auto save_info = service_->server_family().GetLastSaveInfo();
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
resp = Run({"debug", "load", save_info.file_name});
|
resp = Run({"dfly", "load", save_info.file_name});
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
ASSERT_EQ(50000, CheckedInt({"dbsize"}));
|
ASSERT_EQ(50000, CheckedInt({"dbsize"}));
|
||||||
}
|
}
|
||||||
|
@ -182,7 +182,7 @@ TEST_F(RdbTest, RdbLoaderOnReadCompressedDataShouldNotEnterEnsureReadFlow) {
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
auto save_info = service_->server_family().GetLastSaveInfo();
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
resp = Run({"debug", "load", save_info.file_name});
|
resp = Run({"dfly", "load", save_info.file_name});
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -265,7 +265,7 @@ TEST_F(RdbTest, ReloadExpired) {
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
auto save_info = service_->server_family().GetLastSaveInfo();
|
auto save_info = service_->server_family().GetLastSaveInfo();
|
||||||
AdvanceTime(2000);
|
AdvanceTime(2000);
|
||||||
resp = Run({"debug", "load", save_info.file_name});
|
resp = Run({"dfly", "load", save_info.file_name});
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
resp = Run({"get", "key"});
|
resp = Run({"get", "key"});
|
||||||
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
|
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
|
||||||
|
@ -543,4 +543,26 @@ TEST_F(RdbTest, SBF) {
|
||||||
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
|
EXPECT_THAT(Run({"BF.EXISTS", "k", "1"}), IntArg(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(RdbTest, DflyLoadAppend) {
|
||||||
|
// Create an RDB with (k1,1) value in it saved as `filename`
|
||||||
|
EXPECT_EQ(Run({"set", "k1", "1"}), "OK");
|
||||||
|
EXPECT_EQ(Run({"save", "df"}), "OK");
|
||||||
|
string filename = service_->server_family().GetLastSaveInfo().file_name;
|
||||||
|
|
||||||
|
// Without APPEND option - db should be flushed
|
||||||
|
EXPECT_EQ(Run({"set", "k1", "TO-BE-FLUSHED"}), "OK");
|
||||||
|
EXPECT_EQ(Run({"set", "k2", "TO-BE-FLUSHED"}), "OK");
|
||||||
|
EXPECT_EQ(Run({"dfly", "load", filename}), "OK");
|
||||||
|
EXPECT_THAT(Run({"dbsize"}), IntArg(1));
|
||||||
|
EXPECT_EQ(Run({"get", "k1"}), "1");
|
||||||
|
|
||||||
|
// With APPEND option - db shouldn't be flushed, but k1 should be overridden
|
||||||
|
EXPECT_EQ(Run({"set", "k1", "TO-BE-OVERRIDDEN"}), "OK");
|
||||||
|
EXPECT_EQ(Run({"set", "k2", "2"}), "OK");
|
||||||
|
EXPECT_EQ(Run({"dfly", "load", filename, "append"}), "OK");
|
||||||
|
EXPECT_THAT(Run({"dbsize"}), IntArg(2));
|
||||||
|
EXPECT_EQ(Run({"get", "k1"}), "1");
|
||||||
|
EXPECT_EQ(Run({"get", "k2"}), "2");
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -425,6 +425,7 @@ error_code Replica::InitiatePSync() {
|
||||||
}
|
}
|
||||||
|
|
||||||
RdbLoader loader(NULL);
|
RdbLoader loader(NULL);
|
||||||
|
loader.SetLoadUnownedSlots(true);
|
||||||
loader.set_source_limit(snapshot_size);
|
loader.set_source_limit(snapshot_size);
|
||||||
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
|
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
|
||||||
// Also to allow updating last_io_time_.
|
// Also to allow updating last_io_time_.
|
||||||
|
@ -935,6 +936,7 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
|
||||||
flow_id_(flow_id) {
|
flow_id_(flow_id) {
|
||||||
executor_ = std::make_unique<JournalExecutor>(service);
|
executor_ = std::make_unique<JournalExecutor>(service);
|
||||||
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
|
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
|
||||||
|
rdb_loader_->SetLoadUnownedSlots(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
DflyShardReplica::~DflyShardReplica() {
|
DflyShardReplica::~DflyShardReplica() {
|
||||||
|
|
|
@ -874,7 +874,7 @@ void ServerFamily::LoadFromSnapshot() {
|
||||||
if (load_path_result) {
|
if (load_path_result) {
|
||||||
const std::string load_path = *load_path_result;
|
const std::string load_path = *load_path_result;
|
||||||
if (!load_path.empty()) {
|
if (!load_path.empty()) {
|
||||||
load_result_ = Load(load_path);
|
load_result_ = Load(load_path, LoadExistingKeys::kFail);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) {
|
if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) {
|
||||||
|
@ -935,13 +935,40 @@ struct AggregateLoadResult {
|
||||||
std::atomic<size_t> keys_read;
|
std::atomic<size_t> keys_read;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void ServerFamily::FlushAll(ConnectionContext* cntx) {
|
||||||
|
const CommandId* cid = service_.FindCmd("FLUSHALL");
|
||||||
|
boost::intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
|
||||||
|
flush_trans->InitByArgs(cntx->ns, 0, {});
|
||||||
|
VLOG(1) << "Performing flush";
|
||||||
|
error_code ec = Drakarys(flush_trans.get(), DbSlice::kDbAll);
|
||||||
|
if (ec) {
|
||||||
|
LOG(ERROR) << "Error flushing db " << ec.message();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Load starts as many fibers as there are files to load each one separately.
|
// Load starts as many fibers as there are files to load each one separately.
|
||||||
// It starts one more fiber that waits for all load fibers to finish and returns the first
|
// It starts one more fiber that waits for all load fibers to finish and returns the first
|
||||||
// error (if any occured) with a future.
|
// error (if any occured) with a future.
|
||||||
std::optional<fb2::Future<GenericError>> ServerFamily::Load(const std::string& load_path) {
|
std::optional<fb2::Future<GenericError>> ServerFamily::Load(string_view load_path,
|
||||||
|
LoadExistingKeys existing_keys) {
|
||||||
|
fs::path path(load_path);
|
||||||
|
|
||||||
|
if (load_path.empty()) {
|
||||||
|
fs::path dir_path(GetFlag(FLAGS_dir));
|
||||||
|
string filename = GetFlag(FLAGS_dbfilename);
|
||||||
|
dir_path.append(filename);
|
||||||
|
path = dir_path;
|
||||||
|
}
|
||||||
|
|
||||||
DCHECK_GT(shard_count(), 0u);
|
DCHECK_GT(shard_count(), 0u);
|
||||||
|
|
||||||
auto paths_result = snapshot_storage_->LoadPaths(load_path);
|
if (ServerState::tlocal() && !ServerState::tlocal()->is_master) {
|
||||||
|
fb2::Future<GenericError> future;
|
||||||
|
future.Resolve(string("Replica cannot load data"));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
auto paths_result = snapshot_storage_->LoadPaths(path.generic_string());
|
||||||
if (!paths_result) {
|
if (!paths_result) {
|
||||||
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();
|
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();
|
||||||
|
|
||||||
|
@ -952,7 +979,7 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(const std::string& l
|
||||||
|
|
||||||
std::vector<std::string> paths = *paths_result;
|
std::vector<std::string> paths = *paths_result;
|
||||||
|
|
||||||
LOG(INFO) << "Loading " << load_path;
|
LOG(INFO) << "Loading " << path.generic_string();
|
||||||
|
|
||||||
auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||||
if (new_state != GlobalState::LOADING) {
|
if (new_state != GlobalState::LOADING) {
|
||||||
|
@ -979,8 +1006,8 @@ std::optional<fb2::Future<GenericError>> ServerFamily::Load(const std::string& l
|
||||||
proactor = pool.GetNextProactor();
|
proactor = pool.GetNextProactor();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto load_fiber = [this, aggregated_result, path = std::move(path)]() {
|
auto load_fiber = [this, aggregated_result, existing_keys, path = std::move(path)]() {
|
||||||
auto load_result = LoadRdb(path);
|
auto load_result = LoadRdb(path, existing_keys);
|
||||||
if (load_result.has_value())
|
if (load_result.has_value())
|
||||||
aggregated_result->keys_read.fetch_add(*load_result);
|
aggregated_result->keys_read.fetch_add(*load_result);
|
||||||
else
|
else
|
||||||
|
@ -1040,13 +1067,18 @@ void ServerFamily::SnapshotScheduling() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<size_t> ServerFamily::LoadRdb(const std::string& rdb_file) {
|
io::Result<size_t> ServerFamily::LoadRdb(const std::string& rdb_file,
|
||||||
|
LoadExistingKeys existing_keys) {
|
||||||
error_code ec;
|
error_code ec;
|
||||||
io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file);
|
io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file);
|
||||||
if (res) {
|
if (res) {
|
||||||
io::FileSource fs(*res);
|
io::FileSource fs(*res);
|
||||||
|
|
||||||
RdbLoader loader{&service_};
|
RdbLoader loader{&service_};
|
||||||
|
if (existing_keys == LoadExistingKeys::kOverride) {
|
||||||
|
loader.SetOverrideExistingKeys(true);
|
||||||
|
}
|
||||||
|
|
||||||
ec = loader.Load(&fs);
|
ec = loader.Load(&fs);
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
VLOG(1) << "Done loading RDB from " << rdb_file << ", keys loaded: " << loader.keys_loaded();
|
VLOG(1) << "Done loading RDB from " << rdb_file << ", keys loaded: " << loader.keys_loaded();
|
||||||
|
|
|
@ -196,9 +196,13 @@ class ServerFamily {
|
||||||
|
|
||||||
LastSaveInfo GetLastSaveInfo() const;
|
LastSaveInfo GetLastSaveInfo() const;
|
||||||
|
|
||||||
|
void FlushAll(ConnectionContext* cntx);
|
||||||
|
|
||||||
// Load snapshot from file (.rdb file or summary.dfs file) and return
|
// Load snapshot from file (.rdb file or summary.dfs file) and return
|
||||||
// future with error_code.
|
// future with error_code.
|
||||||
std::optional<util::fb2::Future<GenericError>> Load(const std::string& file_name);
|
enum class LoadExistingKeys { kFail, kOverride };
|
||||||
|
std::optional<util::fb2::Future<GenericError>> Load(std::string_view file_name,
|
||||||
|
LoadExistingKeys existing_keys);
|
||||||
|
|
||||||
bool TEST_IsSaving() const;
|
bool TEST_IsSaving() const;
|
||||||
|
|
||||||
|
@ -286,7 +290,7 @@ class ServerFamily {
|
||||||
void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, ActionOnConnectionFail on_error);
|
void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, ActionOnConnectionFail on_error);
|
||||||
|
|
||||||
// Returns the number of loaded keys if successful.
|
// Returns the number of loaded keys if successful.
|
||||||
io::Result<size_t> LoadRdb(const std::string& rdb_file);
|
io::Result<size_t> LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys);
|
||||||
|
|
||||||
void SnapshotScheduling();
|
void SnapshotScheduling();
|
||||||
|
|
||||||
|
|
|
@ -57,7 +57,7 @@ async def test_consistency(df_factory, format: str, seeder_opts: dict):
|
||||||
await async_client.execute_command("SAVE", format)
|
await async_client.execute_command("SAVE", format)
|
||||||
assert await async_client.flushall()
|
assert await async_client.flushall()
|
||||||
await async_client.execute_command(
|
await async_client.execute_command(
|
||||||
"DEBUG",
|
"DFLY",
|
||||||
"LOAD",
|
"LOAD",
|
||||||
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
||||||
)
|
)
|
||||||
|
@ -85,7 +85,7 @@ async def test_multidb(df_factory, format: str):
|
||||||
await async_client.execute_command("SAVE", format)
|
await async_client.execute_command("SAVE", format)
|
||||||
assert await async_client.flushall()
|
assert await async_client.flushall()
|
||||||
await async_client.execute_command(
|
await async_client.execute_command(
|
||||||
"DEBUG",
|
"DFLY",
|
||||||
"LOAD",
|
"LOAD",
|
||||||
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
f"{dbfilename}.rdb" if format == "RDB" else f"{dbfilename}-summary.dfs",
|
||||||
)
|
)
|
||||||
|
@ -271,7 +271,7 @@ async def test_s3_snapshot(self, async_client):
|
||||||
await async_client.execute_command("SAVE DF snapshot")
|
await async_client.execute_command("SAVE DF snapshot")
|
||||||
assert await async_client.flushall()
|
assert await async_client.flushall()
|
||||||
await async_client.execute_command(
|
await async_client.execute_command(
|
||||||
"DEBUG LOAD "
|
"DFLY LOAD "
|
||||||
+ os.environ["DRAGONFLY_S3_BUCKET"]
|
+ os.environ["DRAGONFLY_S3_BUCKET"]
|
||||||
+ str(self.tmp_dir)
|
+ str(self.tmp_dir)
|
||||||
+ "/snapshot-summary.dfs"
|
+ "/snapshot-summary.dfs"
|
||||||
|
@ -451,7 +451,7 @@ async def test_tiered_entries(async_client: aioredis.Redis):
|
||||||
await async_client.execute_command("SAVE", "DF")
|
await async_client.execute_command("SAVE", "DF")
|
||||||
assert await async_client.flushall()
|
assert await async_client.flushall()
|
||||||
await async_client.execute_command(
|
await async_client.execute_command(
|
||||||
"DEBUG",
|
"DFLY",
|
||||||
"LOAD",
|
"LOAD",
|
||||||
"tiered-entries-summary.dfs",
|
"tiered-entries-summary.dfs",
|
||||||
)
|
)
|
||||||
|
@ -488,7 +488,7 @@ async def test_tiered_entries_throttle(async_client: aioredis.Redis):
|
||||||
|
|
||||||
load_task = asyncio.create_task(
|
load_task = asyncio.create_task(
|
||||||
async_client.execute_command(
|
async_client.execute_command(
|
||||||
"DEBUG",
|
"DFLY",
|
||||||
"LOAD",
|
"LOAD",
|
||||||
"tiered-entries-summary.dfs",
|
"tiered-entries-summary.dfs",
|
||||||
)
|
)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue