fix(server): Do not block admin-port commands (#2842)

There are a few use cases which cause a temporary block of connections:
* `CLIENT PAUSE` command
* replica takeover
* cluster config / migration

Before this PR, these commands interfered with replication / migration
connections, which could cause delays and even deadlocks.

We do not want such internal connections to ever be blocked, and it's ok
to assume they won't issue regular "data" commands. As such, this PR
disables blocking any commands issued via an admin-port, and once merged
we'll recommend issuing replication and cluster migration via the admin
port.

Fixes #2703
This commit is contained in:
Shahar Mike 2024-04-07 13:55:52 +03:00 committed by GitHub
parent 2cf7f21a00
commit 10ebe934be
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 26 additions and 13 deletions

View file

@ -504,8 +504,8 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */,
true /* ignore blocked */};
DispatchTracker tracker{server_family_->GetNonPriviligedListeners(), cntx->conn(),
false /* ignore paused */, true /* ignore blocked */};
auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });

View file

@ -117,8 +117,8 @@ void OutgoingMigration::SyncFb() {
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt =
Pause(server_family_->GetListeners(), nullptr, ClientPause::WRITE, is_pause_in_progress);
auto pause_fb_opt = Pause(server_family_->GetNonPriviligedListeners(), nullptr,
ClientPause::WRITE, is_pause_in_progress);
if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out";

View file

@ -361,7 +361,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()};
facade::DispatchTracker tracker{sf_->GetNonPriviligedListeners(), cntx->conn()};
shard_set->pool()->Await([&](unsigned index, auto* pb) {
sf_->CancelBlockingOnThread();
tracker.TrackOnThread();

View file

@ -1122,7 +1122,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
<< args << " in dbid=" << dfly_cntx->conn_state.db_index;
}
if (!dispatching_in_multi) { // Don't interrupt running multi commands
// Don't interrupt running multi commands or admin connections.
if (!dispatching_in_multi && (!cntx->conn() || !cntx->conn()->IsPrivileged())) {
bool is_write = cid->IsWriteOnly();
is_write |= cid->name() == "PUBLISH" || cid->name() == "EVAL" || cid->name() == "EVALSHA";
is_write |= cid->name() == "EXEC" && dfly_cntx->conn_state.exec_info.is_write;

View file

@ -418,8 +418,7 @@ void ClientList(CmdArgList args, absl::Span<facade::Listener*> listeners, Connec
return rb->SendVerbatimString(result);
}
void ClientPauseCmd(CmdArgList args, absl::Span<facade::Listener*> listeners,
ConnectionContext* cntx) {
void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, ConnectionContext* cntx) {
CmdArgParser parser(args);
auto timeout = parser.Next<uint64_t>();
@ -604,15 +603,15 @@ optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo
} // namespace
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
facade::Connection* conn, ClientPause pause_state,
std::optional<fb2::Fiber> Pause(std::vector<facade::Listener*> listeners, facade::Connection* conn,
ClientPause pause_state,
std::function<bool()> is_pause_in_progress) {
// Track connections and set pause state to be able to wait untill all running transactions read
// the new pause state. Exlude already paused commands from the busy count. Exlude tracking
// blocked connections because: a) If the connection is blocked it is puased. b) We read pause
// state after waking from blocking so if the trasaction was waken by another running
// command that did not pause on the new state yet we will pause after waking up.
DispatchTracker tracker{listeners, conn, true /* ignore paused commands */,
DispatchTracker tracker{std::move(listeners), conn, true /* ignore paused commands */,
true /*ignore blocking*/};
shard_set->pool()->Await([&tracker, pause_state](util::ProactorBase* pb) {
// Commands don't suspend before checking the pause state, so
@ -1262,6 +1261,17 @@ std::optional<ReplicaOffsetInfo> ServerFamily::GetReplicaOffsetInfo() {
return nullopt;
}
vector<facade::Listener*> ServerFamily::GetNonPriviligedListeners() const {
std::vector<facade::Listener*> listeners;
listeners.reserve(listeners.size());
for (facade::Listener* listener : listeners_) {
if (!listener->IsPrivilegedInterface()) {
listeners.push_back(listener);
}
}
return listeners;
}
bool ServerFamily::HasReplica() const {
unique_lock lk(replicaof_mu_);
return replica_ != nullptr;
@ -1565,7 +1575,7 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
} else if (sub_cmd == "LIST") {
return ClientList(sub_args, absl::MakeSpan(listeners_), cntx);
} else if (sub_cmd == "PAUSE") {
return ClientPauseCmd(sub_args, absl::MakeSpan(listeners_), cntx);
return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), cntx);
} else if (sub_cmd == "TRACKING") {
return ClientTracking(sub_args, cntx);
} else if (sub_cmd == "KILL") {

View file

@ -208,6 +208,8 @@ class ServerFamily {
return listeners_;
}
std::vector<facade::Listener*> GetNonPriviligedListeners() const;
bool HasReplica() const;
std::optional<Replica::Info> GetReplicaInfo() const;
@ -325,7 +327,7 @@ class ServerFamily {
};
// Reusable CLIENT PAUSE implementation that blocks while polling is_pause_in_progress
std::optional<util::fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
std::optional<util::fb2::Fiber> Pause(std::vector<facade::Listener*> listeners,
facade::Connection* conn, ClientPause pause_state,
std::function<bool()> is_pause_in_progress);