mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: change thread_local to __thread when possible.
This should reduce the access cost to thread_local objects. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
149b1fa913
commit
bac9180602
7 changed files with 26 additions and 17 deletions
|
@ -1,4 +1,4 @@
|
||||||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
// Copyright 2023, DragonflyDB authors. All rights reserved.
|
||||||
// See LICENSE for licensing terms.
|
// See LICENSE for licensing terms.
|
||||||
//
|
//
|
||||||
|
|
||||||
|
@ -9,6 +9,7 @@
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
#include "server/command_registry.h"
|
#include "server/command_registry.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
|
#include "server/server_state.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_pool.h"
|
#include "util/uring/uring_pool.h"
|
||||||
|
|
||||||
|
@ -38,6 +39,8 @@ constexpr size_t kNumThreads = 3;
|
||||||
void BlockingControllerTest::SetUp() {
|
void BlockingControllerTest::SetUp() {
|
||||||
pp_.reset(new uring::UringPool(16, kNumThreads));
|
pp_.reset(new uring::UringPool(16, kNumThreads));
|
||||||
pp_->Run();
|
pp_->Run();
|
||||||
|
pp_->Await([](unsigned index, ProactorBase* p) { ServerState::Init(index); });
|
||||||
|
|
||||||
shard_set = new EngineShardSet(pp_.get());
|
shard_set = new EngineShardSet(pp_.get());
|
||||||
shard_set->Init(kNumThreads, false);
|
shard_set->Init(kNumThreads, false);
|
||||||
|
|
||||||
|
|
|
@ -185,7 +185,7 @@ TEST_F(DflyEngineTest, EvalBug713b) {
|
||||||
fibers_ext::Fiber fibers[kNumFibers];
|
fibers_ext::Fiber fibers[kNumFibers];
|
||||||
|
|
||||||
for (unsigned j = 0; j < kNumFibers; ++j) {
|
for (unsigned j = 0; j < kNumFibers; ++j) {
|
||||||
fibers[j] = pp_->at(1)->LaunchFiber([=] {
|
fibers[j] = pp_->at(1)->LaunchFiber([=, this] {
|
||||||
for (unsigned i = 0; i < 50; ++i) {
|
for (unsigned i = 0; i < 50; ++i) {
|
||||||
Run(StrCat("fb", j), {"eval", script, "3", kKeySid0, kKeySid1, kKeySid2});
|
Run(StrCat("fb", j), {"eval", script, "3", kKeySid0, kKeySid1, kKeySid2});
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ ShardMemUsage ReadShardMemUsage(float wasted_ratio) {
|
||||||
|
|
||||||
constexpr size_t kQueueLen = 64;
|
constexpr size_t kQueueLen = 64;
|
||||||
|
|
||||||
thread_local EngineShard* EngineShard::shard_ = nullptr;
|
__thread EngineShard* EngineShard::shard_ = nullptr;
|
||||||
EngineShardSet* shard_set = nullptr;
|
EngineShardSet* shard_set = nullptr;
|
||||||
uint64_t TEST_current_time_ms = 0;
|
uint64_t TEST_current_time_ms = 0;
|
||||||
|
|
||||||
|
|
|
@ -225,7 +225,7 @@ class EngineShard {
|
||||||
Counter counter_[COUNTER_TOTAL];
|
Counter counter_[COUNTER_TOTAL];
|
||||||
std::vector<Counter> ttl_survivor_sum_; // we need it per db.
|
std::vector<Counter> ttl_survivor_sum_; // we need it per db.
|
||||||
|
|
||||||
static thread_local EngineShard* shard_;
|
static __thread EngineShard* shard_;
|
||||||
};
|
};
|
||||||
|
|
||||||
class EngineShardSet {
|
class EngineShardSet {
|
||||||
|
|
|
@ -495,8 +495,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
|
||||||
const InitOpts& opts) {
|
const InitOpts& opts) {
|
||||||
InitRedisTables();
|
InitRedisTables();
|
||||||
|
|
||||||
pp_.AwaitFiberOnAll(
|
pp_.Await([](uint32_t index, ProactorBase* pb) { ServerState::Init(index); });
|
||||||
[&](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->Init(index); });
|
|
||||||
|
|
||||||
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
||||||
shard_set->Init(shard_num, !opts.disable_time_update);
|
shard_set->Init(shard_num, !opts.disable_time_update);
|
||||||
|
@ -513,7 +512,7 @@ void Service::Shutdown() {
|
||||||
// We mark that we are shutting down. After this incoming requests will be
|
// We mark that we are shutting down. After this incoming requests will be
|
||||||
// rejected
|
// rejected
|
||||||
pp_.AwaitFiberOnAll([](ProactorBase* pb) {
|
pp_.AwaitFiberOnAll([](ProactorBase* pb) {
|
||||||
ServerState::tlocal()->Shutdown();
|
ServerState::tlocal()->EnterLameDuck();
|
||||||
facade::Connection::ShutdownThreadLocal();
|
facade::Connection::ShutdownThreadLocal();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -526,6 +525,7 @@ void Service::Shutdown() {
|
||||||
request_latency_usec.Shutdown();
|
request_latency_usec.Shutdown();
|
||||||
|
|
||||||
shard_set->Shutdown();
|
shard_set->Shutdown();
|
||||||
|
pp_.Await([](ProactorBase* pb) { ServerState::tlocal()->Destroy(); });
|
||||||
|
|
||||||
// wait for all the pending callbacks to stop.
|
// wait for all the pending callbacks to stop.
|
||||||
fibers_ext::SleepFor(10ms);
|
fibers_ext::SleepFor(10ms);
|
||||||
|
|
|
@ -18,7 +18,7 @@ ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
thread_local ServerState ServerState::state_;
|
__thread ServerState* ServerState::state_ = nullptr;
|
||||||
|
|
||||||
void MonitorsRepo::Add(facade::Connection* connection) {
|
void MonitorsRepo::Add(facade::Connection* connection) {
|
||||||
VLOG(1) << "register connection "
|
VLOG(1) << "register connection "
|
||||||
|
@ -60,12 +60,14 @@ ServerState::~ServerState() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ServerState::Init(uint32_t thread_index) {
|
void ServerState::Init(uint32_t thread_index) {
|
||||||
gstate_ = GlobalState::ACTIVE;
|
state_ = new ServerState();
|
||||||
thread_index_ = thread_index;
|
state_->gstate_ = GlobalState::ACTIVE;
|
||||||
|
state_->thread_index_ = thread_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ServerState::Shutdown() {
|
void ServerState::Destroy() {
|
||||||
gstate_ = GlobalState::SHUTTING_DOWN;
|
delete state_;
|
||||||
|
state_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
Interpreter* ServerState::BorrowInterpreter() {
|
Interpreter* ServerState::BorrowInterpreter() {
|
||||||
|
|
|
@ -93,18 +93,22 @@ class ServerState { // public struct - to allow initialization.
|
||||||
};
|
};
|
||||||
|
|
||||||
static ServerState* tlocal() {
|
static ServerState* tlocal() {
|
||||||
return &state_;
|
return state_;
|
||||||
}
|
}
|
||||||
|
|
||||||
static facade::ConnectionStats* tl_connection_stats() {
|
static facade::ConnectionStats* tl_connection_stats() {
|
||||||
return &state_.connection_stats;
|
return &state_->connection_stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
ServerState();
|
ServerState();
|
||||||
~ServerState();
|
~ServerState();
|
||||||
|
|
||||||
void Init(uint32_t thread_index);
|
static void Init(uint32_t thread_index);
|
||||||
void Shutdown();
|
static void Destroy();
|
||||||
|
|
||||||
|
void EnterLameDuck() {
|
||||||
|
state_->gstate_ = GlobalState::SHUTTING_DOWN;
|
||||||
|
}
|
||||||
|
|
||||||
bool is_master = true;
|
bool is_master = true;
|
||||||
std::string remote_client_id_; // for cluster support
|
std::string remote_client_id_; // for cluster support
|
||||||
|
@ -206,7 +210,7 @@ class ServerState { // public struct - to allow initialization.
|
||||||
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
||||||
uint32_t thread_index_ = 0;
|
uint32_t thread_index_ = 0;
|
||||||
|
|
||||||
static thread_local ServerState state_;
|
static __thread ServerState* state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue