From b7b4cabacc0803e3a4bfa245c7edb2ab5414643d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 23 Sep 2024 13:16:50 +0300 Subject: [PATCH] chore: some renames + fix a typo in RETURN_ON_BAD_STATUS (#3763) * chore: some renames + fix a typo in RETURN_ON_BAD_STATUS Renames in transaction.h - no functional changes. Fix a typo in error.h following #3758 --------- Signed-off-by: Roman Gershman --- src/server/engine_shard.cc | 2 +- src/server/engine_shard.h | 3 ++- src/server/error.h | 2 +- src/server/server_family.cc | 2 +- src/server/stream_family_test.cc | 4 ++-- src/server/transaction.cc | 15 ++++++++------- src/server/transaction.h | 13 +++++++------ 7 files changed, 22 insertions(+), 19 deletions(-) diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 1c23e9196..a10292828 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -217,7 +217,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) defrag_task_invocation_total += o.defrag_task_invocation_total; poll_execution_total += o.poll_execution_total; tx_ooo_total += o.tx_ooo_total; - tx_immediate_total += o.tx_immediate_total; + tx_optimistic_total += o.tx_optimistic_total; return *this; } diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 8ad8fc361..18af18792 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -34,7 +34,8 @@ class EngineShard { uint64_t defrag_task_invocation_total = 0; uint64_t poll_execution_total = 0; - uint64_t tx_immediate_total = 0; + // number of optimistic executions - that were run as part of the scheduling. + uint64_t tx_optimistic_total = 0; uint64_t tx_ooo_total = 0; Stats& operator+=(const Stats&); diff --git a/src/server/error.h b/src/server/error.h index 37e5a8a12..2ef33d5cf 100644 --- a/src/server/error.h +++ b/src/server/error.h @@ -39,7 +39,7 @@ using facade::kWrongTypeErr; do { \ OpStatus __s = (x).status(); \ if (__s != OpStatus::OK) { \ - return (x).status(); \ + return __s; \ } \ } while (0) diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5f62df3eb..58a6fbce3 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2426,7 +2426,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { if (should_enter("TRANSACTION", true)) { append("tx_shard_polls", m.shard_stats.poll_execution_total); - append("tx_shard_immediate_total", m.shard_stats.tx_immediate_total); + append("tx_shard_optimistic_total", m.shard_stats.tx_optimistic_total); append("tx_shard_ooo_total", m.shard_stats.tx_ooo_total); append("tx_global_total", m.coordinator_stats.tx_global_cnt); append("tx_normal_total", m.coordinator_stats.tx_normal_cnt); diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index c0b26c855..8ff1ba1e4 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -128,12 +128,12 @@ TEST_F(StreamFamilyTest, XRead) { Run({"xadd", "foo", "1-*", "k2", "v2"}); Run({"xadd", "foo", "1-*", "k3", "v3"}); Run({"xadd", "bar", "1-*", "k4", "v4"}); - EXPECT_EQ(GetMetrics().shard_stats.tx_immediate_total, 4u); + EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 4u); // Receive all records from a single stream, in a single hop auto resp = Run({"xread", "streams", "foo", "0"}); EXPECT_THAT(resp.GetVec(), ElementsAre("foo", ArrLen(3))); - EXPECT_EQ(GetMetrics().shard_stats.tx_immediate_total, 5u); + EXPECT_EQ(GetMetrics().shard_stats.tx_optimistic_total, 5u); // Receive all records from both streams. resp = Run({"xread", "streams", "foo", "bar", "0", "0"}); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 4f8dd98a8..04d1618e2 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -841,11 +841,11 @@ void Transaction::DispatchHop() { std::bitset<1024> poll_flags(0); unsigned run_cnt = 0; IterateActiveShards([&poll_flags, &run_cnt](auto& sd, auto i) { - if ((sd.local_mask & RAN_IMMEDIATELY) == 0) { + if ((sd.local_mask & OPTIMISTIC_EXECUTION) == 0) { run_cnt++; poll_flags.set(i, true); } - sd.local_mask &= ~RAN_IMMEDIATELY; // we'll run it next time if it avoided concluding + sd.local_mask &= ~OPTIMISTIC_EXECUTION; // we'll run it next time if it avoided concluding }); DCHECK_EQ(run_cnt, poll_flags.count()); @@ -1019,13 +1019,13 @@ OpArgs Transaction::GetOpArgs(EngineShard* shard) const { } // This function should not block since it's run via RunBriefInParallel. -bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) { +bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) { ShardId sid = SidToId(shard->shard_id()); auto& sd = shard_data_[sid]; DCHECK(sd.local_mask & ACTIVE); DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); - sd.local_mask &= ~(OUT_OF_ORDER | RAN_IMMEDIATELY); + sd.local_mask &= ~(OUT_OF_ORDER | OPTIMISTIC_EXECUTION); TxQueue* txq = shard->txq(); KeyLockArgs lock_args; @@ -1042,12 +1042,13 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) bool shard_unlocked = shard->shard_lock()->Check(mode); // Check if we can run immediately - if (shard_unlocked && can_run_immediately && + if (shard_unlocked && execute_optimistic && CheckLocks(GetDbSlice(shard->shard_id()), mode, lock_args)) { - sd.local_mask |= RAN_IMMEDIATELY; - shard->stats().tx_immediate_total++; + sd.local_mask |= OPTIMISTIC_EXECUTION; + shard->stats().tx_optimistic_total++; RunCallback(shard); + // Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag. // Only possible for single shard. if (coordinator_state_ & COORD_CONCLUDING) diff --git a/src/server/transaction.h b/src/server/transaction.h index 2e6ed5a05..b15a022a6 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -160,15 +160,14 @@ class Transaction { // State on specific shard. enum LocalMask : uint16_t { ACTIVE = 1, // Whether its active on this shard (to schedule or execute hops) - // ARMED = 1 << 1, // Whether its armed (the hop was prepared) + OPTIMISTIC_EXECUTION = 1 << 7, // Whether the shard executed optimistically (during schedule) // Whether it can run out of order. Undefined if KEYLOCK_ACQUIRED isn't set OUT_OF_ORDER = 1 << 2, // Whether its key locks are acquired, never set for global commands. KEYLOCK_ACQUIRED = 1 << 3, - SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) - AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) - UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb - RAN_IMMEDIATELY = 1 << 7, // Whether the shard executed immediately (during schedule) + SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) + AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) + UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb }; struct Guard { @@ -512,7 +511,9 @@ class Transaction { // Schedule on shards transaction queue. Returns true if scheduled successfully, // false if inconsistent order was detected and the schedule needs to be cancelled. - bool ScheduleInShard(EngineShard* shard, bool can_run_immediately); + // if execute_optimistic is true - means we can try executing during the scheduling, + // subject to uncontended keys. + bool ScheduleInShard(EngineShard* shard, bool execute_optimistic); // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier void DispatchHop();