diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index e5839e52b..18df73ef2 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -40,8 +40,9 @@ constexpr size_t kNumThreads = 3; void BlockingControllerTest::SetUp() { pp_.reset(fb2::Pool::Epoll(kNumThreads)); pp_->Run(); - pp_->Await([](unsigned index, ProactorBase* p) { ServerState::Init(index, nullptr); }); - ServerState::Init(kNumThreads, nullptr); + pp_->Await( + [](unsigned index, ProactorBase* p) { ServerState::Init(index, kNumThreads, nullptr); }); + ServerState::Init(kNumThreads, kNumThreads, nullptr); shard_set = new EngineShardSet(pp_.get()); shard_set->Init(kNumThreads, false); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index c5f6a3797..45d518e03 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -173,8 +173,6 @@ EngineShardSet* shard_set = nullptr; uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - ooo_runs += o.ooo_runs; - quick_runs += o.quick_runs; defrag_attempt_total += o.defrag_attempt_total; defrag_realloc_total += o.defrag_realloc_total; defrag_task_invocation_total += o.defrag_task_invocation_total; @@ -516,7 +514,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { if (VLOG_IS_ON(1)) { dbg_id = trans->DebugId(); } - ++stats_.ooo_runs; bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER; bool keep = trans->RunInShard(this, txq_ooo); diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 0968ca171..3454534a8 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -37,8 +37,6 @@ class BlockingController; class EngineShard { public: struct Stats { - uint64_t ooo_runs = 0; // how many times transactions run as OOO. - uint64_t quick_runs = 0; // how many times single shard "RunQuickie" transaction run. uint64_t defrag_attempt_total = 0; uint64_t defrag_realloc_total = 0; uint64_t defrag_task_invocation_total = 0; @@ -101,10 +99,6 @@ class EngineShard { // Remove current continuation trans if its equal to tx. void RemoveContTx(Transaction* tx); - void IncQuickRun() { - stats_.quick_runs++; - } - const Stats& stats() const { return stats_; } diff --git a/src/server/main_service.cc b/src/server/main_service.cc old mode 100755 new mode 100644 index 93ade7adb..4c90ec532 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -791,9 +791,6 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("max_eviction_per_heartbeat"); config_registry.RegisterMutable("max_segment_to_consider"); - acl::UserRegistry* reg = &user_registry_; - pp_.Await([reg](uint32_t index, ProactorBase* pb) { ServerState::Init(index, reg); }); - uint32_t shard_num = GetFlag(FLAGS_num_shards); if (shard_num == 0 || shard_num > pp_.size()) { LOG_IF(WARNING, shard_num > pp_.size()) @@ -802,6 +799,11 @@ void Service::Init(util::AcceptServer* acceptor, std::vector shard_num = pp_.size(); } + // Must initialize before the shard_set because EngineShard::Init references ServerState. + pp_.Await([&](uint32_t index, ProactorBase* pb) { + ServerState::Init(index, shard_num, &user_registry_); + }); + shard_set->Init(shard_num, !opts.disable_time_update); const auto tcp_disabled = GetFlag(FLAGS_port) == 0u; // We assume that listeners.front() is the main_listener @@ -969,7 +971,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA bool is_trans_cmd = CO::IsTransKind(cid->name()); bool under_script = dfly_cntx.conn_state.script_info != nullptr; bool is_write_cmd = cid->IsWriteOnly(); - bool under_multi = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd; + bool multi_active = dfly_cntx.conn_state.exec_info.IsCollecting() && !is_trans_cmd; // Check if the command is allowed to execute under this global state bool allowed_by_state = true; @@ -1008,7 +1010,7 @@ std::optional Service::VerifyCommandState(const CommandId* cid, CmdA if (!etl.is_master && is_write_cmd && !dfly_cntx.is_replicating) return ErrorReply{"-READONLY You can't write against a read only replica."}; - if (under_multi) { + if (multi_active) { if (cmd_name == "SELECT" || absl::EndsWith(cmd_name, "SUBSCRIBE")) return ErrorReply{absl::StrCat("Can not call ", cmd_name, " within a transaction")}; @@ -1069,8 +1071,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) ConnectionContext* dfly_cntx = static_cast(cntx); bool under_script = bool(dfly_cntx->conn_state.script_info); - bool under_multi = dfly_cntx->conn_state.exec_info.IsRunning(); - bool dispatching_in_multi = under_script || under_multi; + bool under_exec = dfly_cntx->conn_state.exec_info.IsRunning(); + bool dispatching_in_multi = under_script || under_exec; if (VLOG_IS_ON(2) && cntx->conn() /* no owner in replica context */) { LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << (under_script ? "LUA " : "") @@ -1152,11 +1154,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) dfly_cntx->cid = cid; - // Collect stats for all regular transactions and all multi transactions from scripts, except EVAL - // itself. EXEC does not use DispatchCommand for dispatching. - bool collect_stats = - dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || dispatching_in_multi); - if (!InvokeCmd(cid, args_no_cmd, dfly_cntx, collect_stats)) { + if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) { dfly_cntx->reply_builder()->SendError("Internal Error"); dfly_cntx->reply_builder()->CloseConnection(); } @@ -1204,8 +1202,7 @@ class ReplyGuard { SinkReplyBuilder* builder_ = nullptr; }; -bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx, - bool record_stats) { +bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx) { DCHECK(cid); DCHECK(!cid->Validate(tail_args)); @@ -1225,8 +1222,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo } #ifndef NDEBUG + // Verifies that we reply to the client when needed. ReplyGuard reply_guard(cntx, cid->name()); #endif + try { cid->Invoke(tail_args, cntx); } catch (std::exception& e) { @@ -1234,13 +1233,12 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo return false; } - if (record_stats) { - DCHECK(cntx->transaction); + if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() && + cntx->conn_state.script_info == nullptr) { bool is_ooo = cntx->transaction->IsOOO(); cntx->last_command_debug.clock = cntx->transaction->txid(); cntx->last_command_debug.is_ooo = is_ooo; - etl.stats.ooo_tx_cnt += is_ooo; } return true; @@ -1986,7 +1984,7 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) { void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info, Transaction::MultiMode multi_mode) { CmdArgVec tmp_keys; - switch ((Transaction::MultiMode)multi_mode) { + switch (multi_mode) { case Transaction::GLOBAL: trans->StartMultiGlobal(dbid); break; @@ -2079,7 +2077,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { } } - bool ok = InvokeCmd(scmd.Cid(), args, cntx, true); + bool ok = InvokeCmd(scmd.Cid(), args, cntx); if (!ok || rb->GetError()) // checks for i/o error, not logical error. break; } diff --git a/src/server/main_service.h b/src/server/main_service.h index 0912d7b91..973fb127d 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -52,8 +52,7 @@ class Service : public facade::ServiceInterface { facade::ConnectionContext* cntx) final; // Check VerifyCommandExecution and invoke command with args - bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx, - bool record_stats = false); + bool InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* reply_cntx); // Verify command can be executed now (check out of memory), always called immediately before // execution diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 2785db559..2a2c41ddc 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -511,9 +511,9 @@ TEST_F(MultiTest, MultiOOO) { // OOO works in LOCK_AHEAD mode. int mode = absl::GetFlag(FLAGS_multi_exec_mode); if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC) - EXPECT_EQ(200, metrics.coordinator_stats.ooo_tx_cnt); + EXPECT_EQ(200, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]); else - EXPECT_EQ(0, metrics.coordinator_stats.ooo_tx_cnt); + EXPECT_EQ(0, metrics.coordinator_stats.tx_type_cnt[ServerState::OOO]); } // Lua scripts lock their keys ahead and thus can run out of order. @@ -620,16 +620,19 @@ TEST_F(MultiTest, ExecGlobalFallback) { Run({"set", "a", "1"}); // won't run ooo, because it became part of global Run({"move", "a", "1"}); Run({"exec"}); - EXPECT_EQ(0, GetMetrics().coordinator_stats.ooo_tx_cnt); + EXPECT_EQ(1, GetMetrics().coordinator_stats.tx_type_cnt[ServerState::GLOBAL]); + + ClearMetrics(); // Check non atomic mode does not fall back to global. absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::NON_ATOMIC); Run({"multi"}); - Run({"set", "a", "1"}); // will run ooo + Run({"set", "a", "1"}); // will run ooo(quick) Run({"move", "a", "1"}); Run({"exec"}); - // TODO: Stats with squashed cmds are broken - // EXPECT_EQ(1, GetMetrics().ooo_tx_transaction_cnt); + + auto stats = GetMetrics().coordinator_stats; + EXPECT_EQ(1, stats.tx_type_cnt[ServerState::QUICK] + stats.tx_type_cnt[ServerState::INLINE]); } TEST_F(MultiTest, ScriptFlagsCommand) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d69356deb..1e2b03d43 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -760,13 +760,14 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { AppendMetricWithoutLabels("uptime_in_seconds", "", m.uptime, MetricType::COUNTER, &resp->body()); // Clients metrics - AppendMetricWithoutLabels("connected_clients", "", m.conn_stats.num_conns, MetricType::GAUGE, + const auto& conn_stats = m.conn_stats; + AppendMetricWithoutLabels("connected_clients", "", conn_stats.num_conns, MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("client_read_buffer_bytes", "", m.conn_stats.read_buf_capacity, + AppendMetricWithoutLabels("client_read_buffer_bytes", "", conn_stats.read_buf_capacity, MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("blocked_clients", "", m.conn_stats.num_blocked_clients, + AppendMetricWithoutLabels("blocked_clients", "", conn_stats.num_blocked_clients, MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("dispatch_queue_bytes", "", m.conn_stats.dispatch_queue_bytes, + AppendMetricWithoutLabels("dispatch_queue_bytes", "", conn_stats.dispatch_queue_bytes, MetricType::GAUGE, &resp->body()); // Memory metrics @@ -807,10 +808,10 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { } // Stats metrics - AppendMetricWithoutLabels("connections_received_total", "", m.conn_stats.conn_received_cnt, + AppendMetricWithoutLabels("connections_received_total", "", conn_stats.conn_received_cnt, MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("commands_processed_total", "", m.conn_stats.command_cnt, + AppendMetricWithoutLabels("commands_processed_total", "", conn_stats.command_cnt, MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("keyspace_hits_total", "", m.events.hits, MetricType::COUNTER, &resp->body()); @@ -818,9 +819,9 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { &resp->body()); // Net metrics - AppendMetricWithoutLabels("net_input_bytes_total", "", m.conn_stats.io_read_bytes, + AppendMetricWithoutLabels("net_input_bytes_total", "", conn_stats.io_read_bytes, MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("net_output_bytes_total", "", m.conn_stats.io_write_bytes, + AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes, MetricType::COUNTER, &resp->body()); // DB stats @@ -1477,11 +1478,11 @@ Metrics ServerFamily::GetMetrics() const { result.fiber_longrun_cnt += fb2::FiberLongRunCnt(); result.fiber_longrun_usec += fb2::FiberLongRunSumUsec(); - result.coordinator_stats += ss->stats; - result.conn_stats += ss->connection_stats; + result.coordinator_stats.Add(shard_set->size(), ss->stats); result.uptime = time(NULL) - this->start_time_; result.qps += uint64_t(ss->MovingSum6()); + result.conn_stats += ss->connection_stats; if (shard) { result.heap_used_bytes += shard->UsedMemory(); @@ -1670,11 +1671,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("defrag_attempt_total", m.shard_stats.defrag_attempt_total); append("defrag_realloc_total", m.shard_stats.defrag_realloc_total); append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); - append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt); - append("eval_shardlocal_coordination_total", - m.coordinator_stats.eval_shardlocal_coordination_cnt); - append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); - append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); } if (should_enter("TIERED", true)) { @@ -1707,6 +1703,30 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("rdb_changes_since_last_save", m.events.update); } + if (should_enter("TRANSACTION", true)) { + const auto& tc = m.coordinator_stats.tx_type_cnt; + string val = StrCat("global=", tc[ServerState::GLOBAL], ",normal=", tc[ServerState::NORMAL], + ",ooo=", tc[ServerState::OOO], ",quick=", tc[ServerState::QUICK], + ",inline=", tc[ServerState::INLINE]); + append("tx_type_cnt", val); + val.clear(); + for (unsigned width = 0; width < shard_set->size(); ++width) { + if (m.coordinator_stats.tx_width_freq_arr[width] > 0) { + absl::StrAppend(&val, "w", width + 1, "=", m.coordinator_stats.tx_width_freq_arr[width], + ","); + } + } + if (!val.empty()) { + val.pop_back(); // last comma. + append("tx_width_freq", val); + } + append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt); + append("eval_shardlocal_coordination_total", + m.coordinator_stats.eval_shardlocal_coordination_cnt); + append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); + append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); + } + if (should_enter("REPLICATION")) { ServerState& etl = *ServerState::tlocal(); @@ -1945,6 +1965,12 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn if (replica_) replica_->Stop(); + // If we are called by "Replicate", cntx->transaction will be null but we do not need + // to flush anything. + if (cntx->transaction) { + Drakarys(cntx->transaction, DbSlice::kDbAll); + } + // Create a new replica and assing it auto new_replica = make_shared(string(host), port, &service_, master_id()); replica_ = new_replica; @@ -1980,9 +2006,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { string_view host = ArgS(args, 0); string_view port = ArgS(args, 1); - if (!IsReplicatingNoOne(host, port)) - Drakarys(cntx->transaction, DbSlice::kDbAll); - ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError); } @@ -1991,8 +2014,6 @@ void ServerFamily::Replicate(string_view host, string_view port) { ConnectionContext ctxt{&sink, nullptr}; ctxt.skip_acl_validation = true; - // we don't flush the database as the context is null - // (and also because there is nothing to flush) ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication); } diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 3328f13b9..127520516 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -23,14 +23,51 @@ namespace dfly { __thread ServerState* ServerState::state_ = nullptr; -ServerState::Stats& ServerState::Stats::operator+=(const ServerState::Stats& other) { - this->ooo_tx_cnt += other.ooo_tx_cnt; +ServerState::Stats::Stats() { + tx_type_cnt.fill(0); +} + +ServerState::Stats::~Stats() { + delete[] tx_width_freq_arr; +} + +auto ServerState::Stats::operator=(Stats&& other) -> Stats& { + for (int i = 0; i < NUM_TX_TYPES; ++i) { + this->tx_type_cnt[i] = other.tx_type_cnt[i]; + } + this->eval_io_coordination_cnt = other.eval_io_coordination_cnt; + this->eval_shardlocal_coordination_cnt = other.eval_shardlocal_coordination_cnt; + this->eval_squashed_flushes = other.eval_squashed_flushes; + this->tx_schedule_cancel_cnt = other.tx_schedule_cancel_cnt; + + delete[] this->tx_width_freq_arr; + this->tx_width_freq_arr = other.tx_width_freq_arr; + other.tx_width_freq_arr = nullptr; + + return *this; +} + +ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) { + static_assert(sizeof(Stats) == 10 * 8, "Stats size mismatch"); + + for (int i = 0; i < NUM_TX_TYPES; ++i) { + this->tx_type_cnt[i] += other.tx_type_cnt[i]; + } + this->eval_io_coordination_cnt += other.eval_io_coordination_cnt; this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt; this->eval_squashed_flushes += other.eval_squashed_flushes; this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt; - static_assert(sizeof(Stats) == 5 * 8); + if (this->tx_width_freq_arr == nullptr) { + this->tx_width_freq_arr = new uint64_t[num_shards]; + std::copy_n(other.tx_width_freq_arr, num_shards, this->tx_width_freq_arr); + } else { + for (unsigned i = 0; i < num_shards; ++i) { + this->tx_width_freq_arr[i] += other.tx_width_freq_arr[i]; + } + } + return *this; } @@ -73,11 +110,13 @@ ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_pe ServerState::~ServerState() { } -void ServerState::Init(uint32_t thread_index, acl::UserRegistry* registry) { +void ServerState::Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry) { state_ = new ServerState(); state_->gstate_ = GlobalState::ACTIVE; state_->thread_index_ = thread_index; state_->user_registry = registry; + state_->stats.tx_width_freq_arr = new uint64_t[num_shards]; + std::fill_n(state_->stats.tx_width_freq_arr, num_shards, 0); } void ServerState::Destroy() { diff --git a/src/server/server_state.h b/src/server/server_state.h index b2a9a1e74..02714bc71 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -93,8 +93,9 @@ class ServerState { // public struct - to allow initialization. void operator=(const ServerState&) = delete; public: + enum TxType { GLOBAL, NORMAL, OOO, QUICK, INLINE, NUM_TX_TYPES }; struct Stats { - uint64_t ooo_tx_cnt = 0; + std::array tx_type_cnt; uint64_t eval_io_coordination_cnt = 0; uint64_t eval_shardlocal_coordination_cnt = 0; @@ -102,7 +103,23 @@ class ServerState { // public struct - to allow initialization. uint64_t tx_schedule_cancel_cnt = 0; - Stats& operator+=(const Stats& other); + // Array of size of number of shards. + // Each entry is how many transactions we had with this width (unique_shard_cnt). + uint64_t* tx_width_freq_arr = nullptr; + + Stats(); + ~Stats(); + + Stats(Stats&& other) { + *this = std::move(other); + } + + Stats& operator=(Stats&& other); + + Stats& Add(unsigned num_shards, const Stats& other); + + Stats(const Stats&) = delete; + Stats& operator=(const Stats&) = delete; }; static ServerState* tlocal() { @@ -116,7 +133,7 @@ class ServerState { // public struct - to allow initialization. ServerState(); ~ServerState(); - static void Init(uint32_t thread_index, acl::UserRegistry* registry); + static void Init(uint32_t thread_index, uint32_t num_shards, acl::UserRegistry* registry); static void Destroy(); void EnterLameDuck() { diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index 32a0dde86..7981b19f9 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -48,7 +48,8 @@ TEST_F(StringFamilyTest, SetGet) { EXPECT_EQ(Run({"get", "key"}), "2"); auto metrics = GetMetrics(); - EXPECT_EQ(6, metrics.coordinator_stats.ooo_tx_cnt); + auto tc = metrics.coordinator_stats.tx_type_cnt; + EXPECT_EQ(6, tc[ServerState::QUICK] + tc[ServerState::INLINE]); } TEST_F(StringFamilyTest, Incr) { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index ccc4303d1..657a526fc 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -82,7 +82,7 @@ std::string TestConnection::RemoteEndpointStr() const { } void TransactionSuspension::Start() { - CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, acl::NONE}; + static CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, acl::NONE}; transaction_ = new dfly::Transaction{&cid}; @@ -317,6 +317,14 @@ unsigned BaseFamilyTest::NumLocked() { return count; } +void BaseFamilyTest::ClearMetrics() { + shard_set->pool()->Await([](auto*) { + ServerState::Stats stats; + stats.tx_width_freq_arr = new uint64_t[shard_set->size()]; + ServerState::tlocal()->stats = std::move(stats); + }); +} + void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) { auto step = 50us; auto timeout_micro = chrono::duration_cast(1000ms * timeout); @@ -659,8 +667,9 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function& c Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function& condition) { TransactionSuspension tx; - tx.Start(); - auto fb = pp_->at(0)->LaunchFiber([condition, tx = std::move(tx)]() mutable { + pp_->at(0)->Await([&] { tx.Start(); }); + + auto fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [condition, tx = std::move(tx)]() mutable { ExpectConditionWithinTimeout(condition); tx.Terminate(); }); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 83fbab863..ca01a3722 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -115,6 +115,8 @@ class BaseFamilyTest : public ::testing::Test { return service_->server_family().GetMetrics(); } + void ClearMetrics(); + void AdvanceTime(int64_t ms) { TEST_current_time_ms += ms; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7e7958257..0de467a20 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -591,6 +591,8 @@ void Transaction::ScheduleInternal() { } // Loop until successfully scheduled in all shards. + ServerState* ss = ServerState::tlocal(); + DCHECK(ss); while (true) { txid_ = op_seq.fetch_add(1, memory_order_relaxed); time_now_ms_ = GetCurrentTimeMs(); @@ -605,13 +607,22 @@ void Transaction::ScheduleInternal() { }; shard_set->RunBriefInParallel(std::move(cb), is_active); - bool ooo_disabled = IsGlobal() || (IsAtomicMulti() && multi_->mode != LOCK_AHEAD); - if (success.load(memory_order_acquire) == num_shards) { coordinator_state_ |= COORD_SCHED; - // If we granted all locks, we can run out of order. - if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + bool ooo_disabled = IsAtomicMulti() && multi_->mode != LOCK_AHEAD; + + DCHECK_GT(num_shards, 0u); + + ss->stats.tx_width_freq_arr[num_shards - 1]++; + + if (IsGlobal()) { + ss->stats.tx_type_cnt[ServerState::GLOBAL]++; + } else if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + // If we granted all locks, we can run out of order. coordinator_state_ |= COORD_OOO; + ss->stats.tx_type_cnt[ServerState::OOO]++; + } else { + ss->stats.tx_type_cnt[ServerState::NORMAL]++; } VLOG(2) << "Scheduled " << DebugId() << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) @@ -678,6 +689,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // If we run only on one shard and conclude, we can avoid scheduling at all // and directly dispatch the task to its destination shard. bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); + bool run_inline = false; + ServerState* ss = nullptr; + if (schedule_fast) { DCHECK_NE(unique_shard_id_, kInvalidSid); DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); @@ -707,10 +721,11 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } }; - if (auto* ss = ServerState::tlocal(); - ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { + ss = ServerState::tlocal(); + if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { DVLOG(2) << "Inline scheduling a transaction"; schedule_cb(); + run_inline = true; } else { shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. } @@ -727,12 +742,15 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { WaitForShardCallbacks(); DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId(); - if (was_ooo) { - coordinator_state_ |= COORD_OOO; - } - if (schedule_fast) { CHECK(!cb_ptr_); // we should have reset it within the callback. + if (was_ooo) { + coordinator_state_ |= COORD_OOO; + ss->stats.tx_type_cnt[run_inline ? ServerState::INLINE : ServerState::QUICK]++; + } else { + ss->stats.tx_type_cnt[ServerState::NORMAL]++; + } + ss->stats.tx_width_freq_arr[0]++; } cb_ptr_ = nullptr; return local_result_; @@ -934,8 +952,6 @@ void Transaction::RunQuickie(EngineShard* shard) { DCHECK_NE(unique_shard_id_, kInvalidSid); DCHECK_EQ(0u, txid_); - shard->IncQuickRun(); - auto& sd = shard_data_[SidToId(unique_shard_id_)]; DCHECK_EQ(0, sd.local_mask & (KEYLOCK_ACQUIRED | OUT_OF_ORDER)); diff --git a/tests/dragonfly/eval_test.py b/tests/dragonfly/eval_test.py index 19791a6dd..f687a16e4 100644 --- a/tests/dragonfly/eval_test.py +++ b/tests/dragonfly/eval_test.py @@ -262,5 +262,5 @@ async def test_lua_auto_async(async_client: aioredis.Redis): await async_client.eval(TEST_SCRIPT, 4, "a", "b", "c", "d") - flushes = (await async_client.info("stats"))["eval_squashed_flushes"] + flushes = (await async_client.info("transaction"))["eval_squashed_flushes"] assert 1 <= flushes <= 3 # all 100 commands are executed in at most 3 batches