fix: switch to SHUTTING_DOWN state unconditionally (#4408)

* fix: switch to SHUTTING_DOWN state unconditionally

During the shutdown sequence always switch to SHUTTING_DOWN.
Make sure that the rest of the code does not break if it can not switch to the desired
global state + some clean ups around state transitions.

Finally, reduce the amount of data in test_replicaof_reject_on_load
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-01-08 11:28:36 +02:00 committed by GitHub
parent 739bee5c83
commit 0a4008385d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 54 additions and 46 deletions

View file

@ -885,7 +885,11 @@ void Service::Shutdown() {
VLOG(1) << "Service::Shutdown"; VLOG(1) << "Service::Shutdown";
// We mark that we are shutting down. After this incoming requests will be // We mark that we are shutting down. After this incoming requests will be
// rejected // rejected.
mu_.lock();
global_state_ = GlobalState::SHUTTING_DOWN;
mu_.unlock();
pp_.AwaitFiberOnAll([](ProactorBase* pb) { pp_.AwaitFiberOnAll([](ProactorBase* pb) {
ServerState::tlocal()->EnterLameDuck(); ServerState::tlocal()->EnterLameDuck();
facade::Connection::ShutdownThreadLocal(); facade::Connection::ShutdownThreadLocal();
@ -2504,27 +2508,20 @@ GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
return to; return to;
} }
void Service::RequestLoadingState() { bool Service::RequestLoadingState() {
bool switch_state = false; if (SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING) {
{
util::fb2::LockGuard lk(mu_); util::fb2::LockGuard lk(mu_);
++loading_state_counter_; loading_state_counter_++;
if (global_state_ != GlobalState::LOADING) { return true;
DCHECK_EQ(global_state_, GlobalState::ACTIVE);
switch_state = true;
}
}
if (switch_state) {
SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
} }
return false;
} }
void Service::RemoveLoadingState() { void Service::RemoveLoadingState() {
bool switch_state = false; bool switch_state = false;
{ {
util::fb2::LockGuard lk(mu_); util::fb2::LockGuard lk(mu_);
DCHECK_EQ(global_state_, GlobalState::LOADING); CHECK_GT(loading_state_counter_, 0u);
DCHECK_GT(loading_state_counter_, 0u);
--loading_state_counter_; --loading_state_counter_;
switch_state = loading_state_counter_ == 0; switch_state = loading_state_counter_ == 0;
} }
@ -2533,11 +2530,6 @@ void Service::RemoveLoadingState() {
} }
} }
GlobalState Service::GetGlobalState() const {
util::fb2::LockGuard lk(mu_);
return global_state_;
}
void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) { void Service::ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) {
// We skip authentication on privileged listener if the flag admin_nopass is set // We skip authentication on privileged listener if the flag admin_nopass is set
// We also skip authentication if requirepass is empty // We also skip authentication if requirepass is empty

View file

@ -78,11 +78,9 @@ class Service : public facade::ServiceInterface {
// Upon switch, updates cached global state in threadlocal ServerState struct. // Upon switch, updates cached global state in threadlocal ServerState struct.
GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_); GlobalState SwitchState(GlobalState from, GlobalState to) ABSL_LOCKS_EXCLUDED(mu_);
void RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_); bool RequestLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_); void RemoveLoadingState() ABSL_LOCKS_EXCLUDED(mu_);
GlobalState GetGlobalState() const ABSL_LOCKS_EXCLUDED(mu_);
void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final; void ConfigureHttpHandlers(util::HttpListenerBase* base, bool is_privileged) final;
void OnConnectionClose(facade::ConnectionContext* cntx) final; void OnConnectionClose(facade::ConnectionContext* cntx) final;

View file

@ -417,7 +417,11 @@ error_code Replica::InitiatePSync() {
io::PrefixSource ps{io_buf.InputBuffer(), Sock()}; io::PrefixSource ps{io_buf.InputBuffer(), Sock()};
// Set LOADING state. // Set LOADING state.
service_.RequestLoadingState(); if (!service_.RequestLoadingState()) {
return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Failed to enter LOADING state");
}
absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); }; absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); };
if (slot_range_.has_value()) { if (slot_range_.has_value()) {
@ -502,10 +506,14 @@ error_code Replica::InitiateDflySync() {
for (auto& flow : shard_flows_) for (auto& flow : shard_flows_)
flow->Cancel(); flow->Cancel();
}; };
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
// Make sure we're in LOADING state. // Make sure we're in LOADING state.
service_.RequestLoadingState(); if (!service_.RequestLoadingState()) {
return cntx_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Failed to enter LOADING state");
}
// Start full sync flows. // Start full sync flows.
state_mask_.fetch_or(R_SYNCING); state_mask_.fetch_or(R_SYNCING);

View file

@ -1168,10 +1168,12 @@ void ServerFamily::SnapshotScheduling() {
return; return;
} }
const auto loading_check_interval = std::chrono::seconds(10); ServerState* ss = ServerState::tlocal();
while (service_.GetGlobalState() == GlobalState::LOADING) { do {
schedule_done_.WaitFor(loading_check_interval); if (schedule_done_.WaitFor(100ms)) {
} return;
}
} while (ss->gstate() == GlobalState::LOADING);
while (true) { while (true) {
const std::chrono::time_point now = std::chrono::system_clock::now(); const std::chrono::time_point now = std::chrono::system_clock::now();
@ -1657,9 +1659,10 @@ GenericError ServerFamily::DoSave(bool ignore_state) {
GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename, GenericError ServerFamily::DoSaveCheckAndStart(bool new_version, string_view basename,
Transaction* trans, bool ignore_state) { Transaction* trans, bool ignore_state) {
auto state = service_.GetGlobalState(); auto state = ServerState::tlocal()->gstate();
// In some cases we want to create a snapshot even if server is not active, f.e in takeover // In some cases we want to create a snapshot even if server is not active, f.e in takeover
if (!ignore_state && (state != GlobalState::ACTIVE)) { if (!ignore_state && (state != GlobalState::ACTIVE && state != GlobalState::SHUTTING_DOWN)) {
return GenericError{make_error_code(errc::operation_in_progress), return GenericError{make_error_code(errc::operation_in_progress),
StrCat(GlobalStateName(state), " - can not save database")}; StrCat(GlobalStateName(state), " - can not save database")};
} }
@ -2242,6 +2245,8 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
absl::StrAppend(&info, a1, ":", a2, "\r\n"); absl::StrAppend(&info, a1, ":", a2, "\r\n");
}; };
ServerState* ss = ServerState::tlocal();
if (should_enter("SERVER")) { if (should_enter("SERVER")) {
auto kind = ProactorBase::me()->GetKind(); auto kind = ProactorBase::me()->GetKind();
const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll"; const char* multiplex_api = (kind == ProactorBase::IOURING) ? "iouring" : "epoll";
@ -2467,7 +2472,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
append("last_saved_file", save_info.file_name); append("last_saved_file", save_info.file_name);
append("last_success_save_duration_sec", save_info.success_duration_sec); append("last_success_save_duration_sec", save_info.success_duration_sec);
size_t is_loading = service_.GetGlobalState() == GlobalState::LOADING; unsigned is_loading = (ss->gstate() == GlobalState::LOADING);
append("loading", is_loading); append("loading", is_loading);
append("saving", is_saving); append("saving", is_saving);
append("current_save_duration_sec", curent_durration_sec); append("current_save_duration_sec", curent_durration_sec);
@ -2752,7 +2757,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
// We should not execute replica of command while loading from snapshot. // We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) { ServerState* ss = ServerState::tlocal();
if (ss->is_master && ss->gstate() == GlobalState::LOADING) {
builder->SendError(kLoadingErr); builder->SendError(kLoadingErr);
return; return;
} }
@ -2766,7 +2772,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
// If NO ONE was supplied, just stop the current replica (if it exists) // If NO ONE was supplied, just stop the current replica (if it exists)
if (replicaof_args->IsReplicaOfNoOne()) { if (replicaof_args->IsReplicaOfNoOne()) {
if (!ServerState::tlocal()->is_master) { if (!ss->is_master) {
CHECK(replica_); CHECK(replica_);
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
@ -2776,8 +2782,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
StopAllClusterReplicas(); StopAllClusterReplicas();
} }
CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE) // May not switch to ACTIVE if the process is, for example, shutting down at the same time.
<< "Server is set to replica no one, yet state is not active!"; service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
return builder->SendOk(); return builder->SendOk();
} }

View file

@ -1971,23 +1971,27 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory):
df_factory.start_all([master, replica]) df_factory.start_all([master, replica])
c_replica = replica.client() c_replica = replica.client()
await c_replica.execute_command(f"DEBUG POPULATE 8000000") await c_replica.execute_command(f"DEBUG POPULATE 1000 key 1000 RAND type set elements 2000")
replica.stop() replica.stop()
replica.start() replica.start()
c_replica = replica.client() c_replica = replica.client()
@assert_eventually
async def check_replica_isloading():
persistence = await c_replica.info("PERSISTENCE")
assert persistence["loading"] == 1
# If this fails adjust load of DEBUG POPULATE above.
await check_replica_isloading()
# Check replica of not alowed while loading snapshot # Check replica of not alowed while loading snapshot
try: # Keep in mind that if the exception has not been raised, it doesn't mean
# If this fails adjust `keys` and the `assert dbsize >= 30000` above. # that there is a bug because it could be the case that while executing
# Keep in mind that if the assert False is triggered below, it doesn't mean # INFO PERSISTENCE df is in loading state but when we call REPLICAOF df
# that there is a bug because it could be the case that while executing # is no longer in loading state and the assertion false is triggered.
# INFO PERSISTENCE df is in loading state but when we call REPLICAOF df with pytest.raises(aioredis.BusyLoadingError):
# is no longer in loading state and the assertion false is triggered.
assert "loading:1" in (await c_replica.execute_command("INFO PERSISTENCE"))
await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert False
except aioredis.BusyLoadingError as e:
assert "Dragonfly is loading the dataset in memory" in str(e)
# Check one we finish loading snapshot replicaof success # Check one we finish loading snapshot replicaof success
await wait_available_async(c_replica, timeout=180) await wait_available_async(c_replica, timeout=180)