feat: introduce transaction statistics in the info output (#2328)

1. How many transactions we processed by type
2. How many transactions we processed by width (number of unique shards).

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-12-23 13:18:49 +02:00 committed by GitHub
parent 1376295799
commit bbe3d9303b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 179 additions and 82 deletions

View file

@ -40,8 +40,9 @@ constexpr size_t kNumThreads = 3;
void BlockingControllerTest::SetUp() {
pp_.reset(fb2::Pool::Epoll(kNumThreads));
pp_->Run();
pp_->Await([](unsigned index, ProactorBase* p) { ServerState::Init(index, nullptr); });
ServerState::Init(kNumThreads, nullptr);
pp_->Await(
[](unsigned index, ProactorBase* p) { ServerState::Init(index, kNumThreads, nullptr); });
ServerState::Init(kNumThreads, kNumThreads, nullptr);
shard_set = new EngineShardSet(pp_.get());
shard_set->Init(kNumThreads, false);

View file

@ -173,8 +173,6 @@ EngineShardSet* shard_set = nullptr;
uint64_t TEST_current_time_ms = 0;
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
ooo_runs += o.ooo_runs;
quick_runs += o.quick_runs;
defrag_attempt_total += o.defrag_attempt_total;
defrag_realloc_total += o.defrag_realloc_total;
defrag_task_invocation_total += o.defrag_task_invocation_total;
@ -516,7 +514,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
if (VLOG_IS_ON(1)) {
dbg_id = trans->DebugId();
}
++stats_.ooo_runs;
bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER;
bool keep = trans->RunInShard(this, txq_ooo);

View file

@ -37,8 +37,6 @@ class BlockingController;
class EngineShard {
public:
struct Stats {
uint64_t ooo_runs = 0; // how many times transactions run as OOO.
uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run.
uint64_t defrag_attempt_total = 0;
uint64_t defrag_realloc_total = 0;
uint64_t defrag_task_invocation_total = 0;
@ -101,10 +99,6 @@ class EngineShard {
// Remove current continuation trans if its equal to tx.
void RemoveContTx(Transaction* tx);
void IncQuickRun() {
stats_.quick_runs++;
}
const Stats& stats() const {
return stats_;
}

36
src/server/main_service.cc Executable file → Normal file
View file

@ -791,9 +791,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
config_registry.RegisterMutable("max_eviction_per_heartbeat");
config_registry.RegisterMutable("max_segment_to_consider");
acl::UserRegistry* reg = &user_registry_;
pp_.Await([reg](uint32_t index, ProactorBase* pb) { ServerState::Init(index, reg); });
uint32_t shard_num = GetFlag(FLAGS_num_shards);
if (shard_num == 0 || shard_num > pp_.size()) {
LOG_IF(WARNING, shard_num > pp_.size())
@ -802,6 +799,11 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
shard_num = pp_.size();
}
// Must initialize before the shard_set because EngineShard::Init references ServerState.
pp_.Await([&](uint32_t index, ProactorBase* pb) {
ServerState::Init(index, shard_num, &user_registry_);
});
shard_set->Init(shard_num, !opts.disable_time_update);
const auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
// We assume that listeners.front() is the main_listener
@ -969,7 +971,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
bool is_trans_cmd = CO::IsTransKind(cid->name());
bool under_script = dfly_cntx.conn_state.script_info != nullptr;
bool is_write_cmd = cid->IsWriteOnly();
bool under_multi = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd;
bool multi_active = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd;
// Check if the command is allowed to execute under this global state
bool allowed_by_state = true;
@ -1008,7 +1010,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating)
return ErrorReply{"-READONLY You can't write against a read only replica."};
if (under_multi) {
if (multi_active) {
if (cmd_name == "SELECT" || absl::EndsWith(cmd_name, "SUBSCRIBE"))
return ErrorReply{absl::StrCat("Can not call ", cmd_name, " within a transaction")};
@ -1069,8 +1071,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
bool under_script = bool(dfly_cntx->conn_state.script_info);
bool under_multi = dfly_cntx->conn_state.exec_info.IsRunning();
bool dispatching_in_multi = under_script || under_multi;
bool under_exec = dfly_cntx->conn_state.exec_info.IsRunning();
bool dispatching_in_multi = under_script || under_exec;
if (VLOG_IS_ON(2) && cntx->conn() /* no owner in replica context */) {
LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << (under_script ? "LUA " : "")
@ -1152,11 +1154,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
dfly_cntx->cid = cid;
// Collect stats for all regular transactions and all multi transactions from scripts, except EVAL
// itself. EXEC does not use DispatchCommand for dispatching.
bool collect_stats =
dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || dispatching_in_multi);
if (!InvokeCmd(cid, args_no_cmd, dfly_cntx, collect_stats)) {
if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) {
dfly_cntx->reply_builder()->SendError("Internal Error");
dfly_cntx->reply_builder()->CloseConnection();
}
@ -1204,8 +1202,7 @@ class ReplyGuard {
SinkReplyBuilder* builder_ = nullptr;
};
bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx,
bool record_stats) {
bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx) {
DCHECK(cid);
DCHECK(!cid->Validate(tail_args));
@ -1225,8 +1222,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
}
#ifndef NDEBUG
// Verifies that we reply to the client when needed.
ReplyGuard reply_guard(cntx, cid->name());
#endif
try {
cid->Invoke(tail_args, cntx);
} catch (std::exception& e) {
@ -1234,13 +1233,12 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return false;
}
if (record_stats) {
DCHECK(cntx->transaction);
if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() &&
cntx->conn_state.script_info == nullptr) {
bool is_ooo = cntx->transaction->IsOOO();
cntx->last_command_debug.clock = cntx->transaction->txid();
cntx->last_command_debug.is_ooo = is_ooo;
etl.stats.ooo_tx_cnt += is_ooo;
}
return true;
@ -1986,7 +1984,7 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {
void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
Transaction::MultiMode multi_mode) {
CmdArgVec tmp_keys;
switch ((Transaction::MultiMode)multi_mode) {
switch (multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
break;
@ -2079,7 +2077,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
}
bool ok = InvokeCmd(scmd.Cid(), args, cntx, true);
bool ok = InvokeCmd(scmd.Cid(), args, cntx);
if (!ok || rb->GetError()) // checks for i/o error, not logical error.
break;
}

View file

@ -52,8 +52,7 @@ class Service : public facade::ServiceInterface {
facade::ConnectionContext* cntx) final;
// Check VerifyCommandExecution and invoke command with args
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx,
bool record_stats = false);
bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx);
// Verify command can be executed now (check out of memory), always called immediately before
// execution

View file

@ -511,9 +511,9 @@ TEST_F(MultiTest, MultiOOO) {
// OOO works in LOCK_AHEAD mode.
int mode = absl::GetFlag(FLAGS_multi_exec_mode);
if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC)
EXPECT_EQ(200, metrics.coordinator_stats.ooo_tx_cnt);
EXPECT_EQ(200, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]);
else
EXPECT_EQ(0, metrics.coordinator_stats.ooo_tx_cnt);
EXPECT_EQ(0, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]);
}
// Lua scripts lock their keys ahead and thus can run out of order.
@ -620,16 +620,19 @@ TEST_F(MultiTest, ExecGlobalFallback) {
Run({"set", "a", "1"}); // won't run ooo, because it became part of global
Run({"move", "a", "1"});
Run({"exec"});
EXPECT_EQ(0, GetMetrics().coordinator_stats.ooo_tx_cnt);
EXPECT_EQ(1, GetMetrics().coordinator_stats.tx_type_cnt[ServerState::GLOBAL]);
ClearMetrics();
// Check non atomic mode does not fall back to global.
absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::NON_ATOMIC);
Run({"multi"});
Run({"set", "a", "1"}); // will run ooo
Run({"set", "a", "1"}); // will run ooo(quick)
Run({"move", "a", "1"});
Run({"exec"});
// TODO: Stats with squashed cmds are broken
// EXPECT_EQ(1, GetMetrics().ooo_tx_transaction_cnt);
auto stats = GetMetrics().coordinator_stats;
EXPECT_EQ(1, stats.tx_type_cnt[ServerState::QUICK] + stats.tx_type_cnt[ServerState::INLINE]);
}
TEST_F(MultiTest, ScriptFlagsCommand) {

View file

@ -760,13 +760,14 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
AppendMetricWithoutLabels("uptime_in_seconds", "", m.uptime, MetricType::COUNTER, &resp->body());
// Clients metrics
AppendMetricWithoutLabels("connected_clients", "", m.conn_stats.num_conns, MetricType::GAUGE,
const auto& conn_stats = m.conn_stats;
AppendMetricWithoutLabels("connected_clients", "", conn_stats.num_conns, MetricType::GAUGE,
&resp->body());
AppendMetricWithoutLabels("client_read_buffer_bytes", "", m.conn_stats.read_buf_capacity,
AppendMetricWithoutLabels("client_read_buffer_bytes", "", conn_stats.read_buf_capacity,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("blocked_clients", "", m.conn_stats.num_blocked_clients,
AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients,
MetricType::GAUGE, &resp->body());
AppendMetricWithoutLabels("dispatch_queue_bytes", "", m.conn_stats.dispatch_queue_bytes,
AppendMetricWithoutLabels("dispatch_queue_bytes", "", conn_stats.dispatch_queue_bytes,
MetricType::GAUGE, &resp->body());
// Memory metrics
@ -807,10 +808,10 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
}
// Stats metrics
AppendMetricWithoutLabels("connections_received_total", "", m.conn_stats.conn_received_cnt,
AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("commands_processed_total", "", m.conn_stats.command_cnt,
AppendMetricWithoutLabels("commands_processed_total", "", conn_stats.command_cnt,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("keyspace_hits_total", "", m.events.hits, MetricType::COUNTER,
&resp->body());
@ -818,9 +819,9 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
&resp->body());
// Net metrics
AppendMetricWithoutLabels("net_input_bytes_total", "", m.conn_stats.io_read_bytes,
AppendMetricWithoutLabels("net_input_bytes_total", "", conn_stats.io_read_bytes,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("net_output_bytes_total", "", m.conn_stats.io_write_bytes,
AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes,
MetricType::COUNTER, &resp->body());
// DB stats
@ -1477,11 +1478,11 @@ Metrics ServerFamily::GetMetrics() const {
result.fiber_longrun_cnt += fb2::FiberLongRunCnt();
result.fiber_longrun_usec += fb2::FiberLongRunSumUsec();
result.coordinator_stats += ss->stats;
result.conn_stats += ss->connection_stats;
result.coordinator_stats.Add(shard_set->size(), ss->stats);
result.uptime = time(NULL) - this->start_time_;
result.qps += uint64_t(ss->MovingSum6());
result.conn_stats += ss->connection_stats;
if (shard) {
result.heap_used_bytes += shard->UsedMemory();
@ -1670,11 +1671,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt);
append("eval_shardlocal_coordination_total",
m.coordinator_stats.eval_shardlocal_coordination_cnt);
append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes);
append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt);
}
if (should_enter("TIERED", true)) {
@ -1707,6 +1703,30 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("rdb_changes_since_last_save", m.events.update);
}
if (should_enter("TRANSACTION", true)) {
const auto& tc = m.coordinator_stats.tx_type_cnt;
string val = StrCat("global=", tc[ServerState::GLOBAL], ",normal=", tc[ServerState::NORMAL],
",ooo=", tc[ServerState::OOO], ",quick=", tc[ServerState::QUICK],
",inline=", tc[ServerState::INLINE]);
append("tx_type_cnt", val);
val.clear();
for (unsigned width = 0; width < shard_set->size(); ++width) {
if (m.coordinator_stats.tx_width_freq_arr[width] > 0) {
absl::StrAppend(&val, "w", width + 1, "=", m.coordinator_stats.tx_width_freq_arr[width],
",");
}
}
if (!val.empty()) {
val.pop_back(); // last comma.
append("tx_width_freq", val);
}
append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt);
append("eval_shardlocal_coordination_total",
m.coordinator_stats.eval_shardlocal_coordination_cnt);
append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes);
append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt);
}
if (should_enter("REPLICATION")) {
ServerState& etl = *ServerState::tlocal();
@ -1945,6 +1965,12 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
if (replica_)
replica_->Stop();
// If we are called by "Replicate", cntx->transaction will be null but we do not need
// to flush anything.
if (cntx->transaction) {
Drakarys(cntx->transaction, DbSlice::kDbAll);
}
// Create a new replica and assing it
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id());
replica_ = new_replica;
@ -1980,9 +2006,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);
if (!IsReplicatingNoOne(host, port))
Drakarys(cntx->transaction, DbSlice::kDbAll);
ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError);
}
@ -1991,8 +2014,6 @@ void ServerFamily::Replicate(string_view host, string_view port) {
ConnectionContext ctxt{&sink, nullptr};
ctxt.skip_acl_validation = true;
// we don't flush the database as the context is null
// (and also because there is nothing to flush)
ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
}

View file

@ -23,14 +23,51 @@ namespace dfly {
__thread ServerState* ServerState::state_ = nullptr;
ServerState::Stats& ServerState::Stats::operator+=(const ServerState::Stats& other) {
this->ooo_tx_cnt += other.ooo_tx_cnt;
ServerState::Stats::Stats() {
tx_type_cnt.fill(0);
}
ServerState::Stats::~Stats() {
delete[] tx_width_freq_arr;
}
auto ServerState::Stats::operator=(Stats&& other) -> Stats& {
for (int i = 0; i < NUM_TX_TYPES; ++i) {
this->tx_type_cnt[i] = other.tx_type_cnt[i];
}
this->eval_io_coordination_cnt = other.eval_io_coordination_cnt;
this->eval_shardlocal_coordination_cnt = other.eval_shardlocal_coordination_cnt;
this->eval_squashed_flushes = other.eval_squashed_flushes;
this->tx_schedule_cancel_cnt = other.tx_schedule_cancel_cnt;
delete[] this->tx_width_freq_arr;
this->tx_width_freq_arr = other.tx_width_freq_arr;
other.tx_width_freq_arr = nullptr;
return *this;
}
ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 10 * 8, "Stats size mismatch");
for (int i = 0; i < NUM_TX_TYPES; ++i) {
this->tx_type_cnt[i] += other.tx_type_cnt[i];
}
this->eval_io_coordination_cnt += other.eval_io_coordination_cnt;
this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt;
this->eval_squashed_flushes += other.eval_squashed_flushes;
this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt;
static_assert(sizeof(Stats) == 5 * 8);
if (this->tx_width_freq_arr == nullptr) {
this->tx_width_freq_arr = new uint64_t[num_shards];
std::copy_n(other.tx_width_freq_arr, num_shards, this->tx_width_freq_arr);
} else {
for (unsigned i = 0; i < num_shards; ++i) {
this->tx_width_freq_arr[i] += other.tx_width_freq_arr[i];
}
}
return *this;
}
@ -73,11 +110,13 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe
ServerState::~ServerState() {
}
void ServerState::Init(uint32_t thread_index, acl::UserRegistry* registry) {
void ServerState::Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry) {
state_ = new ServerState();
state_->gstate_ = GlobalState::ACTIVE;
state_->thread_index_ = thread_index;
state_->user_registry = registry;
state_->stats.tx_width_freq_arr = new uint64_t[num_shards];
std::fill_n(state_->stats.tx_width_freq_arr, num_shards, 0);
}
void ServerState::Destroy() {

View file

@ -93,8 +93,9 @@ class ServerState { // public struct - to allow initialization.
void operator=(const ServerState&) = delete;
public:
enum TxType { GLOBAL, NORMAL, OOO, QUICK, INLINE, NUM_TX_TYPES };
struct Stats {
uint64_t ooo_tx_cnt = 0;
std::array<uint64_t, NUM_TX_TYPES> tx_type_cnt;
uint64_t eval_io_coordination_cnt = 0;
uint64_t eval_shardlocal_coordination_cnt = 0;
@ -102,7 +103,23 @@ class ServerState { // public struct - to allow initialization.
uint64_t tx_schedule_cancel_cnt = 0;
Stats& operator+=(const Stats& other);
// Array of size of number of shards.
// Each entry is how many transactions we had with this width (unique_shard_cnt).
uint64_t* tx_width_freq_arr = nullptr;
Stats();
~Stats();
Stats(Stats&& other) {
*this = std::move(other);
}
Stats& operator=(Stats&& other);
Stats& Add(unsigned num_shards, const Stats& other);
Stats(const Stats&) = delete;
Stats& operator=(const Stats&) = delete;
};
static ServerState* tlocal() {
@ -116,7 +133,7 @@ class ServerState { // public struct - to allow initialization.
ServerState();
~ServerState();
static void Init(uint32_t thread_index, acl::UserRegistry* registry);
static void Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry);
static void Destroy();
void EnterLameDuck() {

View file

@ -48,7 +48,8 @@ TEST_F(StringFamilyTest, SetGet) {
EXPECT_EQ(Run({"get", "key"}), "2");
auto metrics = GetMetrics();
EXPECT_EQ(6, metrics.coordinator_stats.ooo_tx_cnt);
auto tc = metrics.coordinator_stats.tx_type_cnt;
EXPECT_EQ(6, tc[ServerState::QUICK] + tc[ServerState::INLINE]);
}
TEST_F(StringFamilyTest, Incr) {

View file

@ -82,7 +82,7 @@ std::string TestConnection::RemoteEndpointStr() const {
}
void TransactionSuspension::Start() {
CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, acl::NONE};
static CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, acl::NONE};
transaction_ = new dfly::Transaction{&cid};
@ -317,6 +317,14 @@ unsigned BaseFamilyTest::NumLocked() {
return count;
}
void BaseFamilyTest::ClearMetrics() {
shard_set->pool()->Await([](auto*) {
ServerState::Stats stats;
stats.tx_width_freq_arr = new uint64_t[shard_set->size()];
ServerState::tlocal()->stats = std::move(stats);
});
}
void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) {
auto step = 50us;
auto timeout_micro = chrono::duration_cast<chrono::microseconds>(1000ms * timeout);
@ -659,8 +667,9 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& c
Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function<bool()>& condition) {
TransactionSuspension tx;
tx.Start();
auto fb = pp_->at(0)->LaunchFiber([condition, tx = std::move(tx)]() mutable {
pp_->at(0)->Await([&] { tx.Start(); });
auto fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [condition, tx = std::move(tx)]() mutable {
ExpectConditionWithinTimeout(condition);
tx.Terminate();
});

View file

@ -115,6 +115,8 @@ class BaseFamilyTest : public ::testing::Test {
return service_->server_family().GetMetrics();
}
void ClearMetrics();
void AdvanceTime(int64_t ms) {
TEST_current_time_ms += ms;
}

View file

@ -591,6 +591,8 @@ void Transaction::ScheduleInternal() {
}
// Loop until successfully scheduled in all shards.
ServerState* ss = ServerState::tlocal();
DCHECK(ss);
while (true) {
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
time_now_ms_ = GetCurrentTimeMs();
@ -605,13 +607,22 @@ void Transaction::ScheduleInternal() {
};
shard_set->RunBriefInParallel(std::move(cb), is_active);
bool ooo_disabled = IsGlobal() || (IsAtomicMulti() && multi_->mode != LOCK_AHEAD);
if (success.load(memory_order_acquire) == num_shards) {
coordinator_state_ |= COORD_SCHED;
// If we granted all locks, we can run out of order.
if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
bool ooo_disabled = IsAtomicMulti() && multi_->mode != LOCK_AHEAD;
DCHECK_GT(num_shards, 0u);
ss->stats.tx_width_freq_arr[num_shards - 1]++;
if (IsGlobal()) {
ss->stats.tx_type_cnt[ServerState::GLOBAL]++;
} else if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
// If we granted all locks, we can run out of order.
coordinator_state_ |= COORD_OOO;
ss->stats.tx_type_cnt[ServerState::OOO]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
VLOG(2) << "Scheduled " << DebugId()
<< " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO)
@ -678,6 +689,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// If we run only on one shard and conclude, we can avoid scheduling at all
// and directly dispatch the task to its destination shard.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();
bool run_inline = false;
ServerState* ss = nullptr;
if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
@ -707,10 +721,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
};
if (auto* ss = ServerState::tlocal();
ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) {
ss = ServerState::tlocal();
if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) {
DVLOG(2) << "Inline scheduling a transaction";
schedule_cb();
run_inline = true;
} else {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
}
@ -727,12 +742,15 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
WaitForShardCallbacks();
DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId();
if (was_ooo) {
coordinator_state_ |= COORD_OOO;
}
if (schedule_fast) {
CHECK(!cb_ptr_); // we should have reset it within the callback.
if (was_ooo) {
coordinator_state_ |= COORD_OOO;
ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
}
cb_ptr_ = nullptr;
return local_result_;
@ -934,8 +952,6 @@ void Transaction::RunQuickie(EngineShard* shard) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK_EQ(0u, txid_);
shard->IncQuickRun();
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER));

View file

@ -262,5 +262,5 @@ async def test_lua_auto_async(async_client: aioredis.Redis):
await async_client.eval(TEST_SCRIPT, 4, "a", "b", "c", "d")
flushes = (await async_client.info("stats"))["eval_squashed_flushes"]
flushes = (await async_client.info("transaction"))["eval_squashed_flushes"]
assert 1 <= flushes <= 3 # all 100 commands are executed in at most 3 batches