chore: clean up conn_use_incoming_cpu heuristic (#3978)

Add a comment explaining when it can be useful.
It usually won't work without tuning the host first.
See https://docs.kernel.org/networking/scaling.html
for more details.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-28 09:54:44 +02:00 committed by GitHub
parent b0d52c69ba
commit db6504564d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 13 additions and 62 deletions

View file

@ -287,7 +287,6 @@ uint64_t Listener::RefusedConnectionMaxClientsCount() {
}
void Listener::PreAcceptLoop(util::ProactorBase* pb) {
per_thread_.resize(pool()->size());
}
bool Listener::IsPrivilegedInterface() const {
@ -319,51 +318,15 @@ void Listener::PostShutdown() {
}
void Listener::OnConnectionStart(util::Connection* conn) {
unsigned id = conn->socket()->proactor()->GetPoolIndex();
DCHECK_LT(id, per_thread_.size());
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
VLOG(1) << "Opening connection " << facade_conn->GetClientId();
facade_conn->OnConnectionStart();
absl::base_internal::SpinLockHolder lock{&mutex_};
int32_t prev_cnt = per_thread_[id].num_connections++;
++conn_cnt_;
if (id == min_cnt_thread_id_) {
DCHECK_EQ(min_cnt_, prev_cnt);
++min_cnt_;
for (unsigned i = 0; i < per_thread_.size(); ++i) {
auto cnt = per_thread_[i].num_connections;
if (cnt < min_cnt_) {
min_cnt_ = cnt;
min_cnt_thread_id_ = i;
break;
}
}
}
}
void Listener::OnConnectionClose(util::Connection* conn) {
// TODO: We do not account for connections migrated to other threads. This is a rare case.
unsigned id = conn->socket()->proactor()->GetPoolIndex();
DCHECK_LT(id, per_thread_.size());
auto& pth = per_thread_[id];
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
Connection* facade_conn = static_cast<Connection*>(conn);
VLOG(1) << "Closing connection " << facade_conn->GetClientId();
absl::base_internal::SpinLockHolder lock{&mutex_};
int32_t cur_cnt = --pth.num_connections;
--conn_cnt_;
auto mincnt = min_cnt_;
if (mincnt > cur_cnt) {
min_cnt_ = cur_cnt;
min_cnt_thread_id_ = id;
return;
}
}
void Listener::OnMaxConnectionsReached(util::FiberSocketBase* sock) {
@ -396,22 +359,19 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
}
if (GetFlag(FLAGS_conn_use_incoming_cpu)) {
// We choose a thread that is running on the incoming CPU. Usually there is
// a single thread per CPU. SO_INCOMING_CPU returns the CPU that the kernel
// uses to steer the packets to. In order to make
// conn_use_incoming_cpu effective, we should make sure that the receive packets are
// steered to enough CPUs. This can be done by setting the RPS mask in
// /sys/class/net/<dev>/queues/rx-<n>/rps_cpus. For more details, see
// https://docs.kernel.org/networking/scaling.html#rps-configuration
// Please note that if conn_use_incoming_cpu is true, connections will be handled only
// on the CPUs that handle the softirqs for the incoming packets.
// To avoid imbalance in CPU load, RPS tuning is strongly advised.
const vector<unsigned>& ids = pool()->MapCpuToThreads(cpu);
absl::base_internal::SpinLockHolder lock{&mutex_};
for (auto id : ids) {
DCHECK_LT(id, per_thread_.size());
if (per_thread_[id].num_connections < min_cnt_ + 5) {
VLOG(1) << "using thread " << id << " for cpu " << cpu;
res_id = id;
break;
}
}
if (res_id == kuint32max) {
VLOG(1) << "choosing a thread with minimum conns " << min_cnt_thread_id_ << " instead of "
<< cpu;
res_id = min_cnt_thread_id_;
if (!ids.empty()) {
res_id = ids[0];
}
}
}

View file

@ -66,20 +66,11 @@ class Listener : public util::ListenerInterface {
ServiceInterface* service_;
struct PerThread {
int32_t num_connections{0};
unsigned napi_id = 0;
};
std::vector<PerThread> per_thread_;
std::atomic_uint32_t next_id_{0};
Role role_;
uint32_t conn_cnt_{0};
uint32_t min_cnt_thread_id_{0};
int32_t min_cnt_{0};
absl::base_internal::SpinLock mutex_;
Protocol protocol_;
SSL_CTX* ctx_ = nullptr;