mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: forbid parallel save operations (#2172)
* fix: forbid parallel save operations * feat: add SAVE option to takeover command
This commit is contained in:
parent
c10dac4db2
commit
e6f3522d59
18 changed files with 165 additions and 83 deletions
|
@ -34,18 +34,29 @@ CmdArgParser::CheckProxy::operator bool() const {
|
|||
return true;
|
||||
}
|
||||
|
||||
template <typename T> T CmdArgParser::NextProxy::Int() {
|
||||
template <typename T> T CmdArgParser::NextProxy::Num() {
|
||||
T out;
|
||||
if (absl::SimpleAtoi(operator std::string_view(), &out))
|
||||
return out;
|
||||
if constexpr (std::is_same_v<T, float>) {
|
||||
if (absl::SimpleAtof(operator std::string_view(), &out))
|
||||
return out;
|
||||
} else if constexpr (std::is_same_v<T, double>) {
|
||||
if (absl::SimpleAtod(operator std::string_view(), &out))
|
||||
return out;
|
||||
} else if constexpr (std::is_integral_v<T>) {
|
||||
if (absl::SimpleAtoi(operator std::string_view(), &out))
|
||||
return out;
|
||||
}
|
||||
|
||||
parser_->Report(INVALID_INT, idx_);
|
||||
return T{0};
|
||||
return {};
|
||||
}
|
||||
|
||||
template uint64_t CmdArgParser::NextProxy::Int<uint64_t>();
|
||||
template int64_t CmdArgParser::NextProxy::Int<int64_t>();
|
||||
template uint32_t CmdArgParser::NextProxy::Int<uint32_t>();
|
||||
template int32_t CmdArgParser::NextProxy::Int<int32_t>();
|
||||
template float CmdArgParser::NextProxy::Num<float>();
|
||||
template double CmdArgParser::NextProxy::Num<double>();
|
||||
template uint64_t CmdArgParser::NextProxy::Num<uint64_t>();
|
||||
template int64_t CmdArgParser::NextProxy::Num<int64_t>();
|
||||
template uint32_t CmdArgParser::NextProxy::Num<uint32_t>();
|
||||
template int32_t CmdArgParser::NextProxy::Num<int32_t>();
|
||||
|
||||
ErrorReply CmdArgParser::ErrorInfo::MakeReply() const {
|
||||
switch (type) {
|
||||
|
|
|
@ -48,7 +48,7 @@ struct CmdArgParser {
|
|||
return std::string{operator std::string_view()};
|
||||
}
|
||||
|
||||
template <typename T> T Int();
|
||||
template <typename T> T Num();
|
||||
|
||||
// Detect value based on cases.
|
||||
// Returns default if the argument is not present among the cases list,
|
||||
|
@ -123,10 +123,19 @@ struct CmdArgParser {
|
|||
}
|
||||
|
||||
// Consume next value
|
||||
NextProxy Next() {
|
||||
template <class T = NextProxy> auto Next() {
|
||||
if (cur_i_ >= args_.size())
|
||||
Report(OUT_OF_BOUNDS, cur_i_);
|
||||
return NextProxy{this, cur_i_++};
|
||||
|
||||
NextProxy next{this, cur_i_++};
|
||||
if constexpr (std::is_arithmetic_v<T>) {
|
||||
return next.Num<T>();
|
||||
} else if constexpr (std::is_convertible_v<std::string_view, T>) {
|
||||
return static_cast<T>(next);
|
||||
} else {
|
||||
static_assert(std::is_same_v<T, NextProxy>, "incorrect type");
|
||||
return next;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the next value if equal to a specific tag. If equal, its consumed.
|
||||
|
|
|
@ -38,8 +38,8 @@ TEST_F(CmdArgParserTest, BasicTypes) {
|
|||
EXPECT_EQ(absl::implicit_cast<string_view>(parser.Next()), "VIEW"sv);
|
||||
|
||||
#ifndef __APPLE__
|
||||
EXPECT_EQ(parser.Next().Int<size_t>(), 11u);
|
||||
EXPECT_EQ(parser.Next().Int<size_t>(), 22u);
|
||||
EXPECT_EQ(parser.Next<size_t>(), 11u);
|
||||
EXPECT_EQ(parser.Next<size_t>(), 22u);
|
||||
#endif
|
||||
|
||||
EXPECT_FALSE(parser.HasNext());
|
||||
|
@ -61,7 +61,7 @@ TEST_F(CmdArgParserTest, BoundError) {
|
|||
TEST_F(CmdArgParserTest, IntError) {
|
||||
auto parser = Make({"NOTANINT"});
|
||||
|
||||
EXPECT_EQ(parser.Next().Int<size_t>(), 0u);
|
||||
EXPECT_EQ(parser.Next<size_t>(), 0u);
|
||||
|
||||
auto err = parser.Error();
|
||||
EXPECT_TRUE(err);
|
||||
|
|
|
@ -356,9 +356,9 @@ void DebugCmd::Replica(CmdArgList args) {
|
|||
}
|
||||
|
||||
void DebugCmd::Load(string_view filename) {
|
||||
GlobalState new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||
if (new_state != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
|
||||
auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||
if (new_state.first != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -145,16 +145,12 @@ SaveStagesController::SaveStagesController(SaveStagesInputs&& inputs)
|
|||
}
|
||||
|
||||
SaveStagesController::~SaveStagesController() {
|
||||
service_->SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
|
||||
}
|
||||
|
||||
GenericError SaveStagesController::Save() {
|
||||
if (auto err = BuildFullPath(); err)
|
||||
return err;
|
||||
|
||||
if (auto err = SwitchState(); err)
|
||||
return err;
|
||||
|
||||
if (auto err = InitResources(); err)
|
||||
return err;
|
||||
|
||||
|
@ -338,15 +334,6 @@ GenericError SaveStagesController::BuildFullPath() {
|
|||
return {};
|
||||
}
|
||||
|
||||
// Switch to saving state if in active state
|
||||
GenericError SaveStagesController::SwitchState() {
|
||||
GlobalState new_state = service_->SwitchState(GlobalState::ACTIVE, GlobalState::SAVING);
|
||||
if (new_state != GlobalState::SAVING && new_state != GlobalState::TAKEN_OVER)
|
||||
return {make_error_code(errc::operation_in_progress),
|
||||
StrCat(GlobalStateName(new_state), " - can not save database")};
|
||||
return {};
|
||||
}
|
||||
|
||||
void SaveStagesController::SaveCb(unsigned index) {
|
||||
if (auto& snapshot = snapshots_[index].first; snapshot && snapshot->HasStarted())
|
||||
shared_err_ = snapshot->SaveBody();
|
||||
|
|
|
@ -67,7 +67,7 @@ class RdbSnapshot {
|
|||
};
|
||||
|
||||
struct SaveStagesController : public SaveStagesInputs {
|
||||
SaveStagesController(SaveStagesInputs&& inputs);
|
||||
SaveStagesController(SaveStagesInputs&& input);
|
||||
|
||||
~SaveStagesController();
|
||||
|
||||
|
|
|
@ -12,9 +12,11 @@
|
|||
#include <optional>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/cleanup/cleanup.h"
|
||||
#include "absl/strings/numbers.h"
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/cmd_arg_parser.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
#include "facade/dragonfly_listener.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
|
@ -96,6 +98,7 @@ struct TransactionGuard {
|
|||
|
||||
Transaction* t;
|
||||
};
|
||||
|
||||
} // namespace
|
||||
|
||||
DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
|
||||
|
@ -128,7 +131,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
return StartStable(args, cntx);
|
||||
}
|
||||
|
||||
if (sub_cmd == "TAKEOVER" && args.size() == 3) {
|
||||
if (sub_cmd == "TAKEOVER" && (args.size() == 3 || args.size() == 4)) {
|
||||
return TakeOver(args, cntx);
|
||||
}
|
||||
|
||||
|
@ -388,15 +391,20 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
||||
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
string_view sync_id_str = ArgS(args, 2);
|
||||
float timeout;
|
||||
if (!absl::SimpleAtof(ArgS(args, 1), &timeout)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
CmdArgParser parser{args};
|
||||
parser.Next();
|
||||
float timeout = parser.Next<float>();
|
||||
if (timeout < 0) {
|
||||
return (*cntx)->SendError("timeout is negative");
|
||||
}
|
||||
|
||||
bool save_flag = static_cast<bool>(parser.Check("SAVE").IgnoreCase());
|
||||
|
||||
string_view sync_id_str = parser.Next<std::string_view>();
|
||||
|
||||
if (auto err = parser.Error(); err)
|
||||
return (*cntx)->SendError(err->MakeReply());
|
||||
|
||||
VLOG(1) << "Got DFLY TAKEOVER " << sync_id_str << " time out:" << timeout;
|
||||
|
||||
auto [sync_id, replica_ptr] = GetReplicaInfoOrReply(sync_id_str, rb);
|
||||
|
@ -427,9 +435,15 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
// We have this guard to disable expirations: We don't want any writes to the journal after
|
||||
// we send the `PING`, and expirations could ruin that.
|
||||
// TODO: Decouple disabling expirations from TransactionGuard because we don't
|
||||
// really need TransactionGuard here.
|
||||
TransactionGuard tg{cntx->transaction, /*disable_expirations=*/true};
|
||||
shard_set->RunBriefInParallel(
|
||||
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(false); });
|
||||
VLOG(2) << "Disable expiration";
|
||||
|
||||
absl::Cleanup([] {
|
||||
shard_set->RunBriefInParallel(
|
||||
[](EngineShard* shard) { shard->db_slice().SetExpireAllowed(true); });
|
||||
VLOG(2) << "Enable expiration";
|
||||
});
|
||||
|
||||
if (*status == OpStatus::OK) {
|
||||
auto cb = [&cntx = replica_ptr->cntx, replica_ptr = replica_ptr, timeout_dur, start,
|
||||
|
@ -464,8 +478,16 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
(*cntx)->SendOk();
|
||||
|
||||
if (save_flag) {
|
||||
VLOG(1) << "Save snapshot after Takeover.";
|
||||
if (auto ec = sf_->DoSave(true); ec) {
|
||||
LOG(WARNING) << "Failed to perform snapshot " << ec.Format();
|
||||
}
|
||||
}
|
||||
VLOG(1) << "Takeover accepted, shutting down.";
|
||||
return sf_->ShutdownCmd({}, cntx);
|
||||
std::string save_arg = "NOSAVE";
|
||||
MutableSlice sargs(save_arg);
|
||||
return sf_->ShutdownCmd(CmdArgList(&sargs, 1), cntx);
|
||||
}
|
||||
|
||||
void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
|
@ -2290,17 +2290,17 @@ VarzValue::Map Service::GetVarzStats() {
|
|||
return res;
|
||||
}
|
||||
|
||||
GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
|
||||
std::pair<GlobalState, bool> Service::SwitchState(GlobalState from, GlobalState to) {
|
||||
lock_guard lk(mu_);
|
||||
if (global_state_ != from)
|
||||
return global_state_;
|
||||
return {global_state_, false};
|
||||
|
||||
VLOG(1) << "Switching state from " << GlobalStateName(from) << " to " << GlobalStateName(to);
|
||||
|
||||
global_state_ = to;
|
||||
|
||||
pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); });
|
||||
return to;
|
||||
return {to, true};
|
||||
}
|
||||
|
||||
GlobalState Service::GetGlobalState() const {
|
||||
|
|
|
@ -87,8 +87,9 @@ class Service : public facade::ServiceInterface {
|
|||
// Returns: the new state.
|
||||
// if from equals the old state then the switch is performed "to" is returned.
|
||||
// Otherwise, does not switch and returns the current state in the system.
|
||||
// true if operation is successed
|
||||
// Upon switch, updates cached global state in threadlocal ServerState struct.
|
||||
GlobalState SwitchState(GlobalState from, GlobalState to);
|
||||
std::pair<GlobalState, bool> SwitchState(GlobalState from, GlobalState to);
|
||||
|
||||
GlobalState GetGlobalState() const;
|
||||
|
||||
|
|
|
@ -153,12 +153,12 @@ void Replica::Pause(bool pause) {
|
|||
Proactor()->Await([&] { is_paused_ = pause; });
|
||||
}
|
||||
|
||||
std::error_code Replica::TakeOver(std::string_view timeout) {
|
||||
std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
|
||||
VLOG(1) << "Taking over";
|
||||
|
||||
std::error_code ec;
|
||||
Proactor()->Await(
|
||||
[this, &ec, timeout] { ec = SendNextPhaseRequest(absl::StrCat("TAKEOVER ", timeout)); });
|
||||
auto takeOverCmd = absl::StrCat("TAKEOVER ", timeout, (save_flag ? " SAVE" : ""));
|
||||
Proactor()->Await([this, &ec, cmd = std::move(takeOverCmd)] { ec = SendNextPhaseRequest(cmd); });
|
||||
|
||||
// If we successfully taken over, return and let server_family stop the replication.
|
||||
return ec;
|
||||
|
@ -371,7 +371,8 @@ error_code Replica::InitiatePSync() {
|
|||
io::PrefixSource ps{io_buf.InputBuffer(), Sock()};
|
||||
|
||||
// Set LOADING state.
|
||||
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING);
|
||||
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING).first ==
|
||||
GlobalState::LOADING);
|
||||
absl::Cleanup cleanup = [this]() {
|
||||
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
|
||||
};
|
||||
|
@ -461,7 +462,8 @@ error_code Replica::InitiateDflySync() {
|
|||
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
|
||||
|
||||
// Make sure we're in LOADING state.
|
||||
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING);
|
||||
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING).first ==
|
||||
GlobalState::LOADING);
|
||||
|
||||
// Start full sync flows.
|
||||
state_mask_.fetch_or(R_SYNCING);
|
||||
|
|
|
@ -86,7 +86,7 @@ class Replica : ProtocolClient {
|
|||
|
||||
void Pause(bool pause);
|
||||
|
||||
std::error_code TakeOver(std::string_view timeout);
|
||||
std::error_code TakeOver(std::string_view timeout, bool save_flag);
|
||||
|
||||
std::string_view MasterId() const {
|
||||
return master_context_.master_repl_id;
|
||||
|
|
|
@ -51,13 +51,13 @@ search::SchemaField::VectorParams ParseVectorParams(CmdArgParser* parser) {
|
|||
size_t capacity = 1000;
|
||||
|
||||
bool use_hnsw = parser->ToUpper().Next().Case("HNSW", true).Case("FLAT", false);
|
||||
size_t num_args = parser->Next().Int<size_t>();
|
||||
size_t num_args = parser->Next<size_t>();
|
||||
|
||||
for (size_t i = 0; i * 2 < num_args; i++) {
|
||||
parser->ToUpper();
|
||||
|
||||
if (parser->Check("DIM").ExpectTail(1)) {
|
||||
dim = parser->Next().Int<size_t>();
|
||||
dim = parser->Next<size_t>();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -69,7 +69,7 @@ search::SchemaField::VectorParams ParseVectorParams(CmdArgParser* parser) {
|
|||
}
|
||||
|
||||
if (parser->Check("INITIAL_CAP").ExpectTail(1)) {
|
||||
capacity = parser->Next().Int<size_t>();
|
||||
capacity = parser->Next<size_t>();
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -159,14 +159,14 @@ optional<SearchParams> ParseSearchParamsOrReply(CmdArgParser parser, ConnectionC
|
|||
while (parser.ToUpper().HasNext()) {
|
||||
// [LIMIT offset total]
|
||||
if (parser.Check("LIMIT").ExpectTail(2)) {
|
||||
params.limit_offset = parser.Next().Int<size_t>();
|
||||
params.limit_total = parser.Next().Int<size_t>();
|
||||
params.limit_offset = parser.Next<size_t>();
|
||||
params.limit_total = parser.Next<size_t>();
|
||||
continue;
|
||||
}
|
||||
|
||||
// RETURN {num} [{ident} AS {name}...]
|
||||
if (parser.Check("RETURN").ExpectTail(1)) {
|
||||
size_t num_fields = parser.Next().Int<size_t>();
|
||||
size_t num_fields = parser.Next<size_t>();
|
||||
params.return_fields = SearchParams::FieldReturnList{};
|
||||
while (params.return_fields->size() < num_fields) {
|
||||
string_view ident = parser.Next();
|
||||
|
@ -184,7 +184,7 @@ optional<SearchParams> ParseSearchParamsOrReply(CmdArgParser parser, ConnectionC
|
|||
|
||||
// [PARAMS num(ignored) name(ignored) knn_vector]
|
||||
if (parser.Check("PARAMS").ExpectTail(1)) {
|
||||
size_t num_args = parser.Next().Int<size_t>();
|
||||
size_t num_args = parser.Next<size_t>();
|
||||
while (parser.HasNext() && params.query_params.Size() * 2 < num_args) {
|
||||
string_view k = parser.Next();
|
||||
string_view v = parser.Next();
|
||||
|
@ -323,7 +323,7 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
// PREFIX count prefix [prefix ...]
|
||||
if (parser.Check("PREFIX").ExpectTail(2)) {
|
||||
if (size_t num = parser.Next().Int<size_t>(); num != 1)
|
||||
if (size_t num = parser.Next<size_t>(); num != 1)
|
||||
return (*cntx)->SendError("Multiple prefixes are not supported");
|
||||
index.prefix = string(parser.Next());
|
||||
continue;
|
||||
|
|
|
@ -624,8 +624,7 @@ void ServerFamily::Shutdown() {
|
|||
|
||||
if (save_on_shutdown_ && !absl::GetFlag(FLAGS_dbfilename).empty()) {
|
||||
shard_set->pool()->GetNextProactor()->Await([this] {
|
||||
GenericError ec = DoSave();
|
||||
if (ec) {
|
||||
if (GenericError ec = DoSave(); ec) {
|
||||
LOG(WARNING) << "Failed to perform snapshot " << ec.Format();
|
||||
}
|
||||
});
|
||||
|
@ -673,9 +672,9 @@ Future<GenericError> ServerFamily::Load(const std::string& load_path) {
|
|||
|
||||
LOG(INFO) << "Loading " << load_path;
|
||||
|
||||
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||
if (new_state != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
|
||||
auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||
if (new_state.first != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
|
||||
return {};
|
||||
}
|
||||
|
||||
|
@ -1059,19 +1058,30 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
|
|||
#undef ADD_LINE
|
||||
}
|
||||
|
||||
GenericError ServerFamily::DoSave() {
|
||||
GenericError ServerFamily::DoSave(bool ignore_state) {
|
||||
const CommandId* cid = service().FindCmd("SAVE");
|
||||
CHECK_NOTNULL(cid);
|
||||
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
|
||||
trans->InitByArgs(0, {});
|
||||
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get());
|
||||
return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get(), ignore_state);
|
||||
}
|
||||
|
||||
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans) {
|
||||
GenericError ServerFamily::DoSave(bool new_version, string_view basename, Transaction* trans,
|
||||
bool ignore_state) {
|
||||
if (!ignore_state) {
|
||||
auto [new_state, success] = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING);
|
||||
if (!success) {
|
||||
return GenericError{make_error_code(errc::operation_in_progress),
|
||||
StrCat(GlobalStateName(new_state), " - can not save database")};
|
||||
}
|
||||
}
|
||||
SaveStagesController sc{detail::SaveStagesInputs{
|
||||
new_version, basename, trans, &service_, &is_saving_, fq_threadpool_.get(), &last_save_info_,
|
||||
&save_mu_, &save_bytes_cb_, snapshot_storage_}};
|
||||
return sc.Save();
|
||||
auto res = sc.Save();
|
||||
if (!ignore_state)
|
||||
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
|
||||
return res;
|
||||
}
|
||||
|
||||
error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
|
||||
|
@ -1287,7 +1297,7 @@ void ServerFamily::ClientList(CmdArgList args, ConnectionContext* cntx) {
|
|||
void ServerFamily::ClientPause(CmdArgList args, ConnectionContext* cntx) {
|
||||
CmdArgParser parser(args);
|
||||
|
||||
auto timeout = parser.Next().Int<uint64_t>();
|
||||
auto timeout = parser.Next<uint64_t>();
|
||||
enum ClientPause pause_state = ClientPause::ALL;
|
||||
if (parser.HasNext()) {
|
||||
pause_state =
|
||||
|
@ -1939,7 +1949,8 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
|
|||
replica_.reset();
|
||||
}
|
||||
|
||||
CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE)
|
||||
CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE).first ==
|
||||
GlobalState::ACTIVE)
|
||||
<< "Server is set to replica no one, yet state is not active!";
|
||||
|
||||
return (*cntx)->SendOk();
|
||||
|
@ -1953,8 +1964,8 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
|
|||
|
||||
// First, switch into the loading state
|
||||
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
|
||||
new_state != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
|
||||
new_state.first != GlobalState::LOADING) {
|
||||
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
|
||||
(*cntx)->SendError("Invalid state");
|
||||
return;
|
||||
}
|
||||
|
@ -2019,14 +2030,21 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
unique_lock lk(replicaof_mu_);
|
||||
|
||||
float_t timeout_sec;
|
||||
if (!absl::SimpleAtof(ArgS(args, 0), &timeout_sec)) {
|
||||
return (*cntx)->SendError(kInvalidIntErr);
|
||||
}
|
||||
CmdArgParser parser{args};
|
||||
|
||||
auto timeout_sec = parser.Next<float>();
|
||||
if (timeout_sec < 0) {
|
||||
return (*cntx)->SendError("timeout is negative");
|
||||
}
|
||||
|
||||
bool save_flag = static_cast<bool>(parser.Check("SAVE").IgnoreCase());
|
||||
|
||||
if (parser.HasNext())
|
||||
return (*cntx)->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next())));
|
||||
|
||||
if (auto err = parser.Error(); err)
|
||||
return (*cntx)->SendError(err->MakeReply());
|
||||
|
||||
if (ServerState::tlocal()->is_master)
|
||||
return (*cntx)->SendError("Already a master instance");
|
||||
auto repl_ptr = replica_;
|
||||
|
@ -2037,7 +2055,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
return (*cntx)->SendError("Full sync not done");
|
||||
}
|
||||
|
||||
std::error_code ec = replica_->TakeOver(ArgS(args, 0));
|
||||
std::error_code ec = replica_->TakeOver(ArgS(args, 0), save_flag);
|
||||
if (ec)
|
||||
return (*cntx)->SendError("Couldn't execute takeover");
|
||||
|
||||
|
@ -2358,7 +2376,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
|
|||
ShutdownCmd)
|
||||
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, acl::kSlaveOf}.HFUNC(ReplicaOf)
|
||||
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, acl::kReplicaOf}.HFUNC(ReplicaOf)
|
||||
<< CI{"REPLTAKEOVER", CO::ADMIN | CO::GLOBAL_TRANS, 2, 0, 0, acl::kReplTakeOver}.HFUNC(
|
||||
<< CI{"REPLTAKEOVER", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, acl::kReplTakeOver}.HFUNC(
|
||||
ReplTakeOver)
|
||||
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, acl::kReplConf}.HFUNC(ReplConf)
|
||||
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, acl::kRole}.HFUNC(Role)
|
||||
|
|
|
@ -145,11 +145,12 @@ class ServerFamily {
|
|||
|
||||
// if new_version is true, saves DF specific, non redis compatible snapshot.
|
||||
// if basename is not empty it will override dbfilename flag.
|
||||
GenericError DoSave(bool new_version, std::string_view basename, Transaction* transaction);
|
||||
GenericError DoSave(bool new_version, std::string_view basename, Transaction* transaction,
|
||||
bool ignore_state = false);
|
||||
|
||||
// Calls DoSave with a default generated transaction and with the format
|
||||
// specified in --df_snapshot_format
|
||||
GenericError DoSave();
|
||||
GenericError DoSave(bool ignore_state = false);
|
||||
|
||||
// Burns down and destroy all the data from the database.
|
||||
// if kDbAll is passed, burns all the databases to the ground.
|
||||
|
|
|
@ -1350,7 +1350,7 @@ void SRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = parser.Next();
|
||||
|
||||
bool is_count = parser.HasNext();
|
||||
int count = is_count ? parser.Next().Int<int>() : 1;
|
||||
int count = is_count ? parser.Next<int>() : 1;
|
||||
|
||||
if (parser.HasNext())
|
||||
return (*cntx)->SendError(WrongNumArgsError("SRANDMEMBER"));
|
||||
|
|
|
@ -2326,7 +2326,7 @@ void ZSetFamily::ZRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = parser.Next();
|
||||
|
||||
bool is_count = parser.HasNext();
|
||||
int count = is_count ? parser.Next().Int<int>() : 1;
|
||||
int count = is_count ? parser.Next<int>() : 1;
|
||||
|
||||
range_spec.params.with_scores = static_cast<bool>(parser.Check("WITHSCORES").IgnoreCase());
|
||||
|
||||
|
|
|
@ -1208,7 +1208,7 @@ async def test_take_over_seeder(
|
|||
|
||||
# Give the seeder a bit of time.
|
||||
await asyncio.sleep(1)
|
||||
await c_replica.execute_command(f"REPLTAKEOVER 5")
|
||||
await c_replica.execute_command(f"REPLTAKEOVER 5 SAVE")
|
||||
seeder.stop()
|
||||
|
||||
assert await c_replica.execute_command("role") == ["master", []]
|
||||
|
|
|
@ -217,6 +217,37 @@ class TestSetsnapshot_cron(SnapshotTestBase):
|
|||
assert super().get_main_file("test-set-snapshot_cron-summary.dfs")
|
||||
|
||||
|
||||
@dfly_args({**BASIC_ARGS})
|
||||
class TestOnlyOneSaveAtATime(SnapshotTestBase):
|
||||
"""Dragonfly does not allow simultaneous save operations, send 2 save operations and make sure one is rejected"""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(self, tmp_dir: Path):
|
||||
super().setup(tmp_dir)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@pytest.mark.slow
|
||||
async def test_snapshot(self, async_client, df_server):
|
||||
await async_client.execute_command(
|
||||
"debug", "populate", "1000000", "askldjh", "1000", "RAND"
|
||||
)
|
||||
|
||||
async def save():
|
||||
try:
|
||||
res = await async_client.execute_command("save", "rdb", "dump")
|
||||
return True
|
||||
except Exception as e:
|
||||
return False
|
||||
|
||||
save_commnands = [asyncio.create_task(save()) for _ in range(2)]
|
||||
|
||||
num_successes = 0
|
||||
for result in asyncio.as_completed(save_commnands):
|
||||
num_successes += await result
|
||||
|
||||
assert num_successes == 1, "Only one SAVE must be successful"
|
||||
|
||||
|
||||
@dfly_args({**BASIC_ARGS})
|
||||
class TestPathEscapes(SnapshotTestBase):
|
||||
"""Test that we don't allow path escapes. We just check that df_server.start()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue