Add prometheus-like metric support and allow connection affinity according to INCOMING_CPU

This commit is contained in:
Roman Gershman 2021-11-29 21:50:08 +02:00
parent d7b22ca582
commit e7dc509fed
3 changed files with 34 additions and 5 deletions

View file

@ -52,6 +52,7 @@ int main(int argc, char* argv[]) {
if (FLAGS_http_port >= 0) {
http_listener = new HttpListener<>;
http_listener->enable_metrics();
// Ownership over http_listener is moved to the acceptor.
uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener);

View file

@ -11,10 +11,11 @@
#include "server/dragonfly_connection.h"
#include "util/proactor_pool.h"
using namespace util;
DEFINE_uint32(conn_threads, 0, "Number of threads used for handing server connections");
DEFINE_bool(tls, false, "");
DEFINE_bool(conn_use_incoming_cpu, false,
"If true uses incoming cpu of a socket in order to distribute"
" incoming connections");
CONFIG_string(tls_client_cert_file, "", "", TrueValidator);
CONFIG_string(tls_client_key_file, "", "", TrueValidator);
@ -37,6 +38,9 @@ CONFIG_enum(tls_auth_clients, "yes", "", tls_auth_clients_enum, tls_auth_clients
namespace dfly {
using namespace util;
using namespace std;
// To connect: openssl s_client -cipher "ADH:@SECLEVEL=0" -state -crlf -connect 127.0.0.1:6380
static SSL_CTX* CreateSslCntx() {
SSL_CTX* ctx = SSL_CTX_new(TLS_server_method());
@ -54,7 +58,7 @@ static SSL_CTX* CreateSslCntx() {
LOG(WARNING)
<< "tls-client-key-file not set, no keys are loaded and anonymous ciphers are enabled. "
<< "Do not use in production!";
} else { // tls_client_key_file is set.
} else { // tls_client_key_file is set.
CHECK_EQ(1,
SSL_CTX_use_PrivateKey_file(ctx, FLAGS_tls_client_key_file.c_str(), SSL_FILETYPE_PEM));
@ -104,14 +108,33 @@ void Listener::PostShutdown() {
// We can limit number of threads handling dragonfly connections.
ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
uint32_t id = next_id_.fetch_add(1, std::memory_order_relaxed);
uint32_t total = FLAGS_conn_threads;
util::ProactorPool* pp = pool();
uint32_t total = FLAGS_conn_threads;
uint32_t id = kuint32max;
if (total == 0 || total > pp->size()) {
total = pp->size();
}
if (FLAGS_conn_use_incoming_cpu) {
int fd = sock->native_handle();
int cpu;
socklen_t len = sizeof(cpu);
CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len));
VLOG(1) << "CPU for connection " << fd << " is " << cpu;
vector<unsigned> ids = pool()->MapCpuToThreads(cpu);
if (!ids.empty()) {
id = ids.front();
}
}
if (id == kuint32max) {
id = next_id_.fetch_add(1, std::memory_order_relaxed);
}
return pp->at(id % total);
}

View file

@ -12,6 +12,7 @@
#include "base/logging.h"
#include "server/conn_context.h"
#include "util/metrics/metrics.h"
#include "util/uring/uring_fiber_algo.h"
#include "util/varz.h"
@ -34,6 +35,7 @@ DEFINE_VARZ(VarzQps, ping_qps);
DEFINE_VARZ(VarzQps, set_qps);
std::optional<VarzFunction> engine_varz;
metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests");
} // namespace
@ -59,6 +61,7 @@ void Service::Init(util::AcceptServer* acceptor) {
request_latency_usec.Init(&pp_);
ping_qps.Init(&pp_);
set_qps.Init(&pp_);
cmd_req.Init(&pp_, {"type"});
}
void Service::Shutdown() {
@ -66,6 +69,7 @@ void Service::Shutdown() {
request_latency_usec.Shutdown();
ping_qps.Shutdown();
set_qps.Shutdown();
shard_set_.RunBriefInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
@ -90,6 +94,7 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
}
uint64_t start_usec = ProactorBase::GetMonotonicTimeNs(), end_usec;
cntx->cid = cid;
cmd_req.Inc({cid->name()});
cid->Invoke(args, cntx);
end_usec = ProactorBase::GetMonotonicTimeNs();