feat: increase the flexibility of how to assign DF threads (#1042)

feat: increase the flexibility of how to assign threads in DF.

Specifically, introduce `conn_io_threads` and `conn_io_thread_start` flags that choose
which threads can handle I/O. In addition, introduce `num_shards` flag that may override
how many database shards exist in dragonfly process.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-04-07 14:50:25 +03:00 committed by GitHub
parent 64aacfa93e
commit 2cfe1f84dc
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 22 additions and 11 deletions

View file

@ -16,7 +16,8 @@
using namespace std;
ABSL_FLAG(uint32_t, conn_threads, 0, "Number of threads used for handing server connections");
ABSL_FLAG(uint32_t, conn_io_threads, 0, "Number of threads used for handing server connections");
ABSL_FLAG(uint32_t, conn_io_thread_start, 0, "Starting thread id for handling server connections");
ABSL_FLAG(bool, tls, false, "");
ABSL_FLAG(bool, conn_use_incoming_cpu, false,
"If true uses incoming cpu of a socket in order to distribute"
@ -233,12 +234,8 @@ void Listener::OnConnectionClose(util::Connection* conn) {
// We can limit number of threads handling dragonfly connections.
ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
util::ProactorPool* pp = pool();
uint32_t total = GetFlag(FLAGS_conn_threads);
uint32_t res_id = kuint32max;
if (total == 0 || total > pp->size()) {
total = pp->size();
}
uint32_t res_id = kuint32max;
if (!sock->IsUDS()) {
int fd = sock->native_handle();
@ -275,7 +272,14 @@ ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) {
}
if (res_id == kuint32max) {
res_id = next_id_.fetch_add(1, std::memory_order_relaxed) % total;
uint32_t total = GetFlag(FLAGS_conn_io_threads);
uint32_t start = GetFlag(FLAGS_conn_io_thread_start) % pp->size();
if (total == 0 || total + start > pp->size()) {
total = pp->size() - start;
}
res_id = start + (next_id_.fetch_add(1, std::memory_order_relaxed) % total);
}
return pp->at(res_id);

View file

@ -42,12 +42,13 @@ using namespace std;
ABSL_FLAG(uint32_t, port, 6379, "Redis port");
ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");
ABSL_FLAG(int, multi_exec_mode, 1,
ABSL_FLAG(uint32_t, multi_exec_mode, 1,
"Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
"incrementally, 4 for non atomic");
ABSL_FLAG(int, multi_eval_mode, 1,
ABSL_FLAG(uint32_t, multi_eval_mode, 1,
"Set EVAL atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
"incrementally, 4 for non atomic");
ABSL_FLAG(uint32_t, num_shards, 0, "Number of database shards, 0 - to choose automatically");
namespace dfly {
@ -518,7 +519,13 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
pp_.Await([](uint32_t index, ProactorBase* pb) { ServerState::Init(index); });
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0) {
shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
} else if (shard_num > pp_.size()) {
shard_num = pp_.size();
}
shard_set->Init(shard_num, !opts.disable_time_update);
request_latency_usec.Init(&pp_);

View file

@ -15,7 +15,7 @@
#include "server/test_utils.h"
#include "server/transaction.h"
ABSL_DECLARE_FLAG(int, multi_exec_mode);
ABSL_DECLARE_FLAG(uint32_t, multi_exec_mode);
ABSL_DECLARE_FLAG(std::string, default_lua_config);
namespace dfly {