mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore(server): track ooo transactions via metrics. (#763)
This change allows to track which transactions are run as out of order. OOO txs are more performant and inhibit substantially less latency. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
4a826fdb7b
commit
46b42e571f
7 changed files with 63 additions and 15 deletions
|
@ -184,8 +184,9 @@ TEST_F(DflyEngineTest, HitMissStats) {
|
|||
resp = Run({"get", "Key2"});
|
||||
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
|
||||
|
||||
EXPECT_THAT(GetMetrics().events.hits, 1);
|
||||
EXPECT_THAT(GetMetrics().events.misses, 1);
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_THAT(metrics.events.hits, 1);
|
||||
EXPECT_THAT(metrics.events.misses, 1);
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, MultiEmpty) {
|
||||
|
@ -830,6 +831,32 @@ TEST_F(DflyEngineTest, Watch) {
|
|||
ASSERT_THAT(Run({"exec"}), kExecSuccess);
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, MultiOOO) {
|
||||
auto fb0 = pp_->at(0)->LaunchFiber([&] {
|
||||
for (unsigned i = 0; i < 100; i++) {
|
||||
Run({"multi"});
|
||||
Run({"rpush", "a", "bar"});
|
||||
Run({"exec"});
|
||||
}
|
||||
});
|
||||
|
||||
pp_->at(1)->Await([&] {
|
||||
for (unsigned i = 0; i < 100; ++i) {
|
||||
Run({"multi"});
|
||||
Run({"rpush", "b", "bar"});
|
||||
Run({"exec"});
|
||||
}
|
||||
});
|
||||
|
||||
fb0.Join();
|
||||
auto metrics = GetMetrics();
|
||||
|
||||
// TODO: This is a performance bug that causes substantial latency penatly when
|
||||
// running multi-transactions or lua scripts.
|
||||
// We should be able to allow OOO multi-transactions.
|
||||
EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt);
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, Bug468) {
|
||||
RespExpr resp = Run({"multi"});
|
||||
ASSERT_EQ(resp, "OK");
|
||||
|
|
|
@ -691,8 +691,10 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
|
||||
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
|
||||
if (dist_trans) {
|
||||
bool is_ooo = dist_trans->IsOOO();
|
||||
dfly_cntx->last_command_debug.clock = dist_trans->txid();
|
||||
dfly_cntx->last_command_debug.is_ooo = dist_trans->IsOOO();
|
||||
dfly_cntx->last_command_debug.is_ooo = is_ooo;
|
||||
etl.stats.ooo_tx_cnt += is_ooo;
|
||||
}
|
||||
|
||||
if (!under_script) {
|
||||
|
|
|
@ -1190,6 +1190,7 @@ Metrics ServerFamily::GetMetrics() const {
|
|||
result.uptime = time(NULL) - this->start_time_;
|
||||
result.conn_stats += ss->connection_stats;
|
||||
result.qps += uint64_t(ss->MovingSum6());
|
||||
result.ooo_tx_transaction_cnt += ss->stats.ooo_tx_cnt;
|
||||
|
||||
if (shard) {
|
||||
MergeInto(shard->db_slice().GetStats(), &result);
|
||||
|
|
|
@ -42,6 +42,7 @@ struct Metrics {
|
|||
size_t heap_used_bytes = 0;
|
||||
size_t heap_comitted_bytes = 0;
|
||||
size_t small_string_bytes = 0;
|
||||
uint64_t ooo_tx_transaction_cnt = 0;
|
||||
uint32_t traverse_ttl_per_sec = 0;
|
||||
uint32_t delete_ttl_per_sec = 0;
|
||||
|
||||
|
|
|
@ -85,6 +85,10 @@ class ServerState { // public struct - to allow initialization.
|
|||
void operator=(const ServerState&) = delete;
|
||||
|
||||
public:
|
||||
struct Stats {
|
||||
uint64_t ooo_tx_cnt = 0;
|
||||
};
|
||||
|
||||
static ServerState* tlocal() {
|
||||
return &state_;
|
||||
}
|
||||
|
@ -161,6 +165,8 @@ class ServerState { // public struct - to allow initialization.
|
|||
call_latency_histos_[sha].Add(latency_usec);
|
||||
}
|
||||
|
||||
Stats stats;
|
||||
|
||||
private:
|
||||
int64_t live_transactions_ = 0;
|
||||
mi_heap_t* data_heap_;
|
||||
|
|
|
@ -40,14 +40,15 @@ vector<int64_t> ToIntArr(const RespExpr& e) {
|
|||
}
|
||||
|
||||
TEST_F(StringFamilyTest, SetGet) {
|
||||
auto resp = Run({"set", "key", "val"});
|
||||
|
||||
EXPECT_EQ(resp, "OK");
|
||||
EXPECT_EQ(Run({"set", "key", "val"}), "OK");
|
||||
EXPECT_EQ(Run({"get", "key"}), "val");
|
||||
EXPECT_EQ(Run({"set", "key1", "1"}), "OK");
|
||||
EXPECT_EQ(Run({"get", "key1"}), "1");
|
||||
EXPECT_EQ(Run({"set", "key", "2"}), "OK");
|
||||
EXPECT_EQ(Run({"get", "key"}), "2");
|
||||
|
||||
auto metrics = GetMetrics();
|
||||
EXPECT_EQ(6, metrics.ooo_tx_transaction_cnt);
|
||||
}
|
||||
|
||||
TEST_F(StringFamilyTest, Incr) {
|
||||
|
|
|
@ -560,6 +560,8 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
}
|
||||
|
||||
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
|
||||
bool run_eager = false;
|
||||
|
||||
if (schedule_fast) { // Single shard (local) optimization.
|
||||
// We never resize shard_data because that would affect MULTI transaction correctness.
|
||||
DCHECK_EQ(1u, shard_data_.size());
|
||||
|
@ -572,15 +574,20 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
run_count_.fetch_add(1, memory_order_release);
|
||||
time_now_ms_ = GetCurrentTimeMs();
|
||||
|
||||
// Please note that schedule_cb can not update any data on ScheduleSingleHop stack
|
||||
// since the latter can exit before ScheduleUniqueShard returns.
|
||||
// The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then
|
||||
// call PollExecute that runs the callback which calls DecreaseRunCnt.
|
||||
// As a result WaitForShardCallbacks below is unblocked.
|
||||
auto schedule_cb = [this] {
|
||||
bool run_eager = ScheduleUniqueShard(EngineShard::tlocal());
|
||||
if (run_eager) {
|
||||
// it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned.
|
||||
// Please note that schedule_cb can not update any data on ScheduleSingleHop stack when
|
||||
// run_fast is false.
|
||||
// since ScheduleSingleHop can finish before ScheduleUniqueShard returns.
|
||||
// The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue
|
||||
// (hence run_fast is false), and then calls PollExecute that in turn runs
|
||||
// the callback which calls DecreaseRunCnt.
|
||||
// As a result WaitForShardCallbacks below is unblocked before schedule_cb returns.
|
||||
// However, if run_fast is true, then we may mutate stack variables, but only
|
||||
// before DecreaseRunCnt is called.
|
||||
auto schedule_cb = [&] {
|
||||
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
|
||||
if (run_fast) {
|
||||
run_eager = true;
|
||||
// it's important to DecreaseRunCnt only for run_fast and after run_eager is assigned.
|
||||
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes
|
||||
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value
|
||||
// to run_eager and cause stack corruption.
|
||||
|
@ -605,6 +612,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
|
||||
WaitForShardCallbacks();
|
||||
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
|
||||
if (run_eager) {
|
||||
coordinator_state_ |= COORD_OOO;
|
||||
}
|
||||
|
||||
cb_ = nullptr;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue