mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore(server): pass coordinator thread to a transaction object (#905)
This should help with some of the optimizations we may do in the future. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
c9e9311c8e
commit
8cf8115116
8 changed files with 21 additions and 11 deletions
|
@ -41,7 +41,7 @@ void BlockingControllerTest::SetUp() {
|
||||||
shard_set = new EngineShardSet(pp_.get());
|
shard_set = new EngineShardSet(pp_.get());
|
||||||
shard_set->Init(kNumThreads, false);
|
shard_set->Init(kNumThreads, false);
|
||||||
|
|
||||||
trans_.reset(new Transaction{&cid_});
|
trans_.reset(new Transaction{&cid_, 0});
|
||||||
|
|
||||||
str_vec_.assign({"blpop", "x", "z", "0"});
|
str_vec_.assign({"blpop", "x", "z", "0"});
|
||||||
for (auto& s : str_vec_) {
|
for (auto& s : str_vec_) {
|
||||||
|
|
|
@ -175,7 +175,7 @@ void DebugCmd::Reload(CmdArgList args) {
|
||||||
string err_details;
|
string err_details;
|
||||||
const CommandId* cid = sf_.service().FindCmd("SAVE");
|
const CommandId* cid = sf_.service().FindCmd("SAVE");
|
||||||
CHECK_NOTNULL(cid);
|
CHECK_NOTNULL(cid);
|
||||||
intrusive_ptr<Transaction> trans(new Transaction{cid});
|
intrusive_ptr<Transaction> trans(new Transaction{cid, ServerState::tlocal()->thread_index()});
|
||||||
trans->InitByArgs(0, {});
|
trans->InitByArgs(0, {});
|
||||||
VLOG(1) << "Performing save";
|
VLOG(1) << "Performing save";
|
||||||
|
|
||||||
|
@ -226,7 +226,8 @@ void DebugCmd::Load(string_view filename) {
|
||||||
};
|
};
|
||||||
|
|
||||||
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
|
const CommandId* cid = sf_.service().FindCmd("FLUSHALL");
|
||||||
intrusive_ptr<Transaction> flush_trans(new Transaction{cid});
|
intrusive_ptr<Transaction> flush_trans(
|
||||||
|
new Transaction{cid, ServerState::tlocal()->thread_index()});
|
||||||
flush_trans->InitByArgs(0, {});
|
flush_trans->InitByArgs(0, {});
|
||||||
VLOG(1) << "Performing flush";
|
VLOG(1) << "Performing flush";
|
||||||
error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll);
|
error_code ec = sf_.Drakarys(flush_trans.get(), DbSlice::kDbAll);
|
||||||
|
|
|
@ -495,7 +495,8 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
|
||||||
const InitOpts& opts) {
|
const InitOpts& opts) {
|
||||||
InitRedisTables();
|
InitRedisTables();
|
||||||
|
|
||||||
pp_.AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->Init(); });
|
pp_.AwaitFiberOnAll(
|
||||||
|
[&](uint32_t index, ProactorBase* pb) { ServerState::tlocal()->Init(index); });
|
||||||
|
|
||||||
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
|
||||||
shard_set->Init(shard_num, !opts.disable_time_update);
|
shard_set->Init(shard_num, !opts.disable_time_update);
|
||||||
|
@ -706,7 +707,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
DCHECK(dfly_cntx->transaction == nullptr);
|
DCHECK(dfly_cntx->transaction == nullptr);
|
||||||
|
|
||||||
if (IsTransactional(cid)) {
|
if (IsTransactional(cid)) {
|
||||||
dist_trans.reset(new Transaction{cid});
|
dist_trans.reset(new Transaction{cid, etl.thread_index()});
|
||||||
|
|
||||||
if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode.
|
if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode.
|
||||||
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args);
|
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args);
|
||||||
|
|
|
@ -589,7 +589,8 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) {
|
||||||
|
|
||||||
const CommandId* cid = service().FindCmd("SAVE");
|
const CommandId* cid = service().FindCmd("SAVE");
|
||||||
CHECK_NOTNULL(cid);
|
CHECK_NOTNULL(cid);
|
||||||
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
|
boost::intrusive_ptr<Transaction> trans(
|
||||||
|
new Transaction{cid, ServerState::tlocal()->thread_index()});
|
||||||
trans->InitByArgs(0, {});
|
trans->InitByArgs(0, {});
|
||||||
|
|
||||||
GenericError ec = DoSave(absl::GetFlag(FLAGS_df_snapshot_format), trans.get());
|
GenericError ec = DoSave(absl::GetFlag(FLAGS_df_snapshot_format), trans.get());
|
||||||
|
|
|
@ -59,8 +59,9 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe
|
||||||
ServerState::~ServerState() {
|
ServerState::~ServerState() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ServerState::Init() {
|
void ServerState::Init(uint32_t thread_index) {
|
||||||
gstate_ = GlobalState::ACTIVE;
|
gstate_ = GlobalState::ACTIVE;
|
||||||
|
thread_index_ = thread_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ServerState::Shutdown() {
|
void ServerState::Shutdown() {
|
||||||
|
|
|
@ -103,7 +103,7 @@ class ServerState { // public struct - to allow initialization.
|
||||||
ServerState();
|
ServerState();
|
||||||
~ServerState();
|
~ServerState();
|
||||||
|
|
||||||
void Init();
|
void Init(uint32_t thread_index);
|
||||||
void Shutdown();
|
void Shutdown();
|
||||||
|
|
||||||
bool is_master = true;
|
bool is_master = true;
|
||||||
|
@ -182,6 +182,10 @@ class ServerState { // public struct - to allow initialization.
|
||||||
return it != cached_script_params_.end() ? std::optional{it->second} : std::nullopt;
|
return it != cached_script_params_.end() ? std::optional{it->second} : std::nullopt;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint32_t thread_index() const {
|
||||||
|
return thread_index_;
|
||||||
|
}
|
||||||
|
|
||||||
Stats stats;
|
Stats stats;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
@ -200,6 +204,7 @@ class ServerState { // public struct - to allow initialization.
|
||||||
MonitorsRepo monitors_;
|
MonitorsRepo monitors_;
|
||||||
|
|
||||||
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
absl::flat_hash_map<std::string, base::Histogram> call_latency_histos_;
|
||||||
|
uint32_t thread_index_ = 0;
|
||||||
|
|
||||||
static thread_local ServerState state_;
|
static thread_local ServerState state_;
|
||||||
};
|
};
|
||||||
|
|
|
@ -41,7 +41,8 @@ IntentLock::Mode Transaction::Mode() const {
|
||||||
* @param ess
|
* @param ess
|
||||||
* @param cs
|
* @param cs
|
||||||
*/
|
*/
|
||||||
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
|
Transaction::Transaction(const CommandId* cid, uint32_t thread_index)
|
||||||
|
: cid_{cid}, coordinator_index_(thread_index) {
|
||||||
string_view cmd_name(cid_->name());
|
string_view cmd_name(cid_->name());
|
||||||
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
||||||
multi_.reset(new MultiData);
|
multi_.reset(new MultiData);
|
||||||
|
|
|
@ -102,7 +102,7 @@ class Transaction {
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
public:
|
||||||
explicit Transaction(const CommandId* cid);
|
explicit Transaction(const CommandId* cid, uint32_t thread_index);
|
||||||
|
|
||||||
// Initialize from command (args) on specific db.
|
// Initialize from command (args) on specific db.
|
||||||
OpStatus InitByArgs(DbIndex index, CmdArgList args);
|
OpStatus InitByArgs(DbIndex index, CmdArgList args);
|
||||||
|
@ -175,7 +175,6 @@ class Transaction {
|
||||||
// Runs in the shard thread.
|
// Runs in the shard thread.
|
||||||
KeyLockArgs GetLockArgs(ShardId sid) const;
|
KeyLockArgs GetLockArgs(ShardId sid) const;
|
||||||
|
|
||||||
public:
|
|
||||||
//! Returns true if the transaction spans this shard_id.
|
//! Returns true if the transaction spans this shard_id.
|
||||||
//! Runs from the coordinator thread.
|
//! Runs from the coordinator thread.
|
||||||
bool IsActive(ShardId shard_id) const {
|
bool IsActive(ShardId shard_id) const {
|
||||||
|
@ -484,6 +483,7 @@ class Transaction {
|
||||||
// they read this variable the coordinator thread is stalled and can not cause data races.
|
// they read this variable the coordinator thread is stalled and can not cause data races.
|
||||||
// If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX.
|
// If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX.
|
||||||
uint8_t coordinator_state_ = 0;
|
uint8_t coordinator_state_ = 0;
|
||||||
|
uint32_t coordinator_index_; // thread_index of the coordinator thread.
|
||||||
|
|
||||||
// Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
|
// Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
|
||||||
OpStatus local_result_ = OpStatus::OK;
|
OpStatus local_result_ = OpStatus::OK;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue