From fbc55bb82da4fa457dc6e22f324d67444d0b00d0 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Wed, 3 Apr 2024 23:06:57 +0300 Subject: [PATCH] feat(transaction): Idempotent callbacks (immediate runs) (#2453) This commit generalizes the machanism of running transaction callbacks during scheduling, removing the need for specialized ScheduleUniqueShard/RunQuickie. Instead, transactions can be run now during ScheduleInShard - called "immediate" runs - if the transaction is concluding and either only a single shard is active or the operation can be safely repeated if scheduling failed (idempotent commands, like MGET). Updates transaction stats to mirror the new changes more closely. --------- Signed-off-by: Vladislav Oleshko --- src/server/command_registry.cc | 2 + src/server/command_registry.h | 7 +- src/server/engine_shard_set.cc | 3 +- src/server/engine_shard_set.h | 7 + src/server/multi_test.cc | 4 +- src/server/server_family.cc | 35 +--- src/server/server_state.cc | 11 +- src/server/server_state.h | 5 +- src/server/string_family.cc | 5 +- src/server/string_family_test.cc | 3 +- src/server/transaction.cc | 335 ++++++++++--------------------- src/server/transaction.h | 18 +- 12 files changed, 150 insertions(+), 285 deletions(-) diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 43aef8769..1ad459ca9 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -201,6 +201,8 @@ const char* OptName(CO::CommandOpt fl) { return "no-key-transactional"; case NO_KEY_TX_SPAN_ALL: return "no-key-tx-span-all"; + case IDEMPOTENT: + return "idempotent"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 7eb6f12c4..2b3ccb0c4 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -46,8 +46,11 @@ enum CommandOpt : uint32_t { // Allows commands without keys to respect transaction ordering and enables journaling by default NO_KEY_TRANSACTIONAL = 1U << 16, - NO_KEY_TX_SPAN_ALL = - 1U << 17, // If set, all shards are active for the no-key-transactional command + NO_KEY_TX_SPAN_ALL = 1U << 17, // All shards are active for the no-key-transactional command + + // The same callback can be run multiple times without corrupting the result. Used for + // opportunistic optimizations where inconsistencies can only be detected afterwards. + IDEMPOTENT = 1U << 18, }; const char* OptName(CommandOpt fl); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index bec550c1e..649a6be7f 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -205,13 +205,14 @@ EngineShardSet* shard_set = nullptr; uint64_t TEST_current_time_ms = 0; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 40); + static_assert(sizeof(Stats) == 48); defrag_attempt_total += o.defrag_attempt_total; defrag_realloc_total += o.defrag_realloc_total; defrag_task_invocation_total += o.defrag_task_invocation_total; poll_execution_total += o.poll_execution_total; tx_ooo_total += o.tx_ooo_total; + tx_immediate_total += o.tx_immediate_total; return *this; } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index 8a2ec7194..4b3d11481 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -40,7 +40,10 @@ class EngineShard { uint64_t defrag_realloc_total = 0; uint64_t defrag_task_invocation_total = 0; uint64_t poll_execution_total = 0; + + uint64_t tx_immediate_total = 0; uint64_t tx_ooo_total = 0; + Stats& operator+=(const Stats&); }; @@ -107,6 +110,10 @@ class EngineShard { return stats_; } + Stats& stats() { + return stats_; + } + // Returns used memory for this shard. size_t UsedMemory() const; diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index a89fbc21d..49ad93c4f 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -671,7 +671,7 @@ TEST_F(MultiTest, ExecGlobalFallback) { Run({"set", "a", "1"}); // won't run ooo, because it became part of global Run({"move", "a", "1"}); Run({"exec"}); - EXPECT_EQ(1, GetMetrics().coordinator_stats.tx_type_cnt[ServerState::GLOBAL]); + EXPECT_EQ(1, GetMetrics().coordinator_stats.tx_global_cnt); ClearMetrics(); @@ -683,7 +683,7 @@ TEST_F(MultiTest, ExecGlobalFallback) { Run({"exec"}); auto stats = GetMetrics().coordinator_stats; - EXPECT_EQ(1, stats.tx_type_cnt[ServerState::QUICK] + stats.tx_type_cnt[ServerState::INLINE]); + EXPECT_EQ(1, stats.tx_normal_cnt); // move is global } TEST_F(MultiTest, ScriptFlagsCommand) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index bbc51e2fe..c0de8e99d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1221,18 +1221,6 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { } } - const auto& tc = m.coordinator_stats.tx_type_cnt; - AppendMetricHeader("transaction_types_total", "Transaction counts by their types", - MetricType::COUNTER, &resp->body()); - - const char* kTxTypeNames[ServerState::NUM_TX_TYPES] = {"global", "normal", "quick", "inline"}; - for (unsigned type = 0; type < ServerState::NUM_TX_TYPES; ++type) { - if (tc[type] > 0) { - AppendMetricValue("transaction_types_total", tc[type], {"type"}, {kTxTypeNames[type]}, - &resp->body()); - } - } - absl::StrAppend(&resp->body(), db_key_metrics); absl::StrAppend(&resp->body(), db_key_expire_metrics); } @@ -2105,24 +2093,17 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("TRANSACTION", true)) { - const auto& tc = m.coordinator_stats.tx_type_cnt; - string val = StrCat("global=", tc[ServerState::GLOBAL], ",normal=", tc[ServerState::NORMAL], - ",quick=", tc[ServerState::QUICK], ",inline=", tc[ServerState::INLINE]); - append("tx_type_cnt", val); - val.clear(); - for (unsigned width = 0; width < shard_set->size(); ++width) { - if (m.coordinator_stats.tx_width_freq_arr[width] > 0) { - absl::StrAppend(&val, "w", width + 1, "=", m.coordinator_stats.tx_width_freq_arr[width], - ","); - } - } - if (!val.empty()) { - val.pop_back(); // last comma. - append("tx_width_freq", val); - } + append("tx_shard_immediate_total", m.shard_stats.tx_immediate_total); append("tx_shard_ooo_total", m.shard_stats.tx_ooo_total); + + append("tx_global_total", m.coordinator_stats.tx_global_cnt); + append("tx_normal_total", m.coordinator_stats.tx_normal_cnt); + append("tx_inline_runs_total", m.coordinator_stats.tx_inline_runs); append("tx_schedule_cancel_total", m.coordinator_stats.tx_schedule_cancel_cnt); + + append("tx_with_freq", absl::StrJoin(m.coordinator_stats.tx_width_freq_arr, ",")); append("tx_queue_len", m.tx_queue_len); + append("eval_io_coordination_total", m.coordinator_stats.eval_io_coordination_cnt); append("eval_shardlocal_coordination_total", m.coordinator_stats.eval_shardlocal_coordination_cnt); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 2b40249fd..f03a1a035 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -24,19 +24,18 @@ namespace dfly { __thread ServerState* ServerState::state_ = nullptr; ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) { - tx_type_cnt.fill(0); } ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 17 * 8, "Stats size mismatch"); - - for (int i = 0; i < NUM_TX_TYPES; ++i) { - this->tx_type_cnt[i] += other.tx_type_cnt[i]; - } + static_assert(sizeof(Stats) == 16 * 8, "Stats size mismatch"); this->eval_io_coordination_cnt += other.eval_io_coordination_cnt; this->eval_shardlocal_coordination_cnt += other.eval_shardlocal_coordination_cnt; this->eval_squashed_flushes += other.eval_squashed_flushes; + + this->tx_global_cnt += other.tx_global_cnt; + this->tx_normal_cnt += other.tx_normal_cnt; + this->tx_inline_runs += other.tx_inline_runs; this->tx_schedule_cancel_cnt += other.tx_schedule_cancel_cnt; this->multi_squash_executions += other.multi_squash_executions; diff --git a/src/server/server_state.h b/src/server/server_state.h index c28a808c4..0ebd18717 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -94,7 +94,6 @@ class ServerState { // public struct - to allow initialization. void operator=(const ServerState&) = delete; public: - enum TxType { GLOBAL, NORMAL, QUICK, INLINE, NUM_TX_TYPES }; struct Stats { Stats(unsigned num_shards = 0); // Default initialization should be valid for Add() @@ -105,7 +104,9 @@ class ServerState { // public struct - to allow initialization. Stats& Add(const Stats& other); - std::array tx_type_cnt; + uint64_t tx_global_cnt = 0; + uint64_t tx_normal_cnt = 0; + uint64_t tx_inline_runs = 0; uint64_t tx_schedule_cancel_cnt = 0; uint64_t eval_io_coordination_cnt = 0; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 90acf38ab..c930d1004 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -1557,8 +1557,9 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx} .HFUNC(GetEx) << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) - << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, acl::kMGet}.HFUNC( - MGet) + << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1, + acl::kMGet} + .HFUNC(MGet) << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index fe1359d23..10c94bacd 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -49,8 +49,7 @@ TEST_F(StringFamilyTest, SetGet) { EXPECT_THAT(Run({"get", "key3"}), ArgType(RespExpr::NIL)); auto metrics = GetMetrics(); - auto tc = metrics.coordinator_stats.tx_type_cnt; - EXPECT_EQ(7, tc[ServerState::QUICK] + tc[ServerState::INLINE]); + EXPECT_EQ(7, metrics.coordinator_stats.tx_normal_cnt); EXPECT_EQ(3, metrics.events.hits); EXPECT_EQ(1, metrics.events.misses); EXPECT_EQ(3, metrics.events.mutations); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 36578ff7d..8ae0e9dc7 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -7,9 +7,11 @@ #include #include "base/logging.h" -#include "glog/logging.h" +#include "facade/op_status.h" +#include "redis/redis_aux.h" #include "server/blocking_controller.h" #include "server/command_registry.h" +#include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/journal/journal.h" @@ -64,24 +66,8 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) { void RecordTxScheduleStats(const Transaction* tx) { auto* ss = ServerState::tlocal(); - DCHECK(ss); - ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]++; - if (tx->IsGlobal()) { - ss->stats.tx_type_cnt[ServerState::GLOBAL]++; - } else { - ss->stats.tx_type_cnt[ServerState::NORMAL]++; - } -} - -void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inline) { - DCHECK_EQ(tx->GetUniqueShardCnt(), 1u); - auto* ss = ServerState::tlocal(); - if (was_ooo) { - ss->stats.tx_type_cnt[was_inline ? ServerState::INLINE : ServerState::QUICK]++; - } else { - ss->stats.tx_type_cnt[ServerState::NORMAL]++; - } - ss->stats.tx_width_freq_arr[0]++; + ++(tx->IsGlobal() ? ss->stats.tx_global_cnt : ss->stats.tx_normal_cnt); + ++ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]; } std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) { @@ -654,14 +640,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { } void Transaction::RunCallback(EngineShard* shard) { - DCHECK_EQ(EngineShard::tlocal(), shard); + DCHECK_EQ(shard, EngineShard::tlocal()); - // Actually running the callback. - // If you change the logic here, also please change the logic RunnableResult result; try { - // if a transaction is suspended, we still run it because of brpoplpush/blmove case - // that needs to run lpush on its suspended shard. result = (*cb_ptr_)(this, shard); if (unique_shard_cnt_ == 1) { @@ -688,12 +670,11 @@ void Transaction::RunCallback(EngineShard* shard) { // Handle result flags to alter behaviour. if (result.flags & RunnableResult::AVOID_CONCLUDING) { - // Multi shard callbacks should either all or none choose to conclude. Because they can't - // communicate, the must know their decision ahead, consequently there is no point in using this - // flag. + // Multi shard callbacks should either all or none choose to conclude. They can't communicate, + // so they must know their decision ahead, consequently there is no point in using this flag. CHECK_EQ(unique_shard_cnt_, 1u); DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding); - coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard + coordinator_state_ &= ~COORD_CONCLUDING; } // Log to journal only once the command finished running @@ -711,7 +692,14 @@ void Transaction::ScheduleInternal() { DCHECK_GT(unique_shard_cnt_, 0u); DCHECK(!IsAtomicMulti() || cid_->IsMultiTransactional()); - DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards"; + // Try running immediately (during scheduling) if we're concluding and either: + // - have a single shard, and thus never have to cancel scheduling due to reordering + // - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails + bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) && + (unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT)); + + DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards " + << " immediate run: " << can_run_immediately; auto is_active = [this](uint32_t i) { return IsActive(i); }; @@ -719,22 +707,35 @@ void Transaction::ScheduleInternal() { while (true) { stats_.schedule_attempts++; - txid_ = op_seq.fetch_add(1, memory_order_relaxed); + // This is a contention point for all threads - avoid using it unless necessary. + // Single shard operations can assign txid later if the immediate run failed. + if (unique_shard_cnt_ > 1) + txid_ = op_seq.fetch_add(1, memory_order_relaxed); + InitTxTime(); atomic_uint32_t schedule_fails = 0; - auto cb = [this, &schedule_fails](EngineShard* shard) { - if (!ScheduleInShard(shard)) { + auto cb = [this, &schedule_fails, can_run_immediately]() { + if (!ScheduleInShard(EngineShard::tlocal(), can_run_immediately)) { schedule_fails.fetch_add(1, memory_order_relaxed); } + run_barrier_.Dec(); }; - shard_set->RunBriefInParallel(std::move(cb), is_active); + + run_barrier_.Start(unique_shard_cnt_); + if (CanRunInlined()) { + // single shard schedule operation can't fail + CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately)); + run_barrier_.Dec(); + } else { + IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); }); + run_barrier_.Wait(); + } if (schedule_fails.load(memory_order_relaxed) == 0) { coordinator_state_ |= COORD_SCHED; RecordTxScheduleStats(this); - VLOG(2) << "Scheduled " << DebugId() << " num_shards: " << unique_shard_cnt_; break; } @@ -767,82 +768,6 @@ void Transaction::ScheduleInternal() { } } -// Optimized "Schedule and execute" function for the most common use-case of a single hop -// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or -// BLPOP where a data must be read from multiple shards before performing another hop. -OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { - if (multi_ && multi_->role == SQUASHED_STUB) { - return RunSquashedMultiCb(cb); - } - - DCHECK(!cb_ptr_); - cb_ptr_ = &cb; - - // We can be already scheduled if we're part of a multi transaction. Note: If a multi tx isn't - // scheduled, we assume it's not mimicking the interface, but actually preparing a single hop. - bool scheduled = (coordinator_state_ & COORD_SCHED) > 0; - if (scheduled) { - DCHECK(IsAtomicMulti()); - multi_->concluding = true; - } else { - // For multi it only makes sense with squashing and thus a proper underlying command - DCHECK(!IsAtomicMulti() || (multi_->role == SQUASHER && cid_->IsMultiTransactional())); - coordinator_state_ |= COORD_CONCLUDING; - } - - // If we run only on one shard and conclude, we can possibly avoid scheduling at all - // and directly run the callback on the destination thread if the locks are free. - bool schedule_fast = !scheduled && (unique_shard_cnt_ == 1) && !IsGlobal(); - bool was_ooo = false, was_inline = false; - - if (schedule_fast) { - DCHECK_NE(unique_shard_id_, kInvalidSid); - DCHECK(IsActive(unique_shard_id_)); - DCHECK(shard_data_.size() == 1 || multi_); - - InitTxTime(); - - run_barrier_.Start(1); - shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_release); - - auto schedule_cb = [this, &was_ooo] { - bool run_fast = ScheduleUniqueShard(EngineShard::tlocal()); - if (run_fast) { - // We didn't decrease the barrier, so the scope is valid UNTIL Dec() below - DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u); - was_ooo = true; - FinishHop(); - } - // Otherwise it's not safe to access the function scope, as - // ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below. - }; - - auto* ss = ServerState::tlocal(); - if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) { - DVLOG(2) << "Inline scheduling a transaction"; - schedule_cb(); - was_inline = true; - } else { - shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. - } - } else { - // This transaction either spans multiple shards and/or is multi, which schedule in advance. - if (!scheduled) - ScheduleInternal(); - - ExecuteAsync(); - } - - run_barrier_.Wait(); - - if (schedule_fast) { - CHECK(!cb_ptr_); // we should have reset it within the callback. - RecordTxScheduleFastStats(this, was_ooo, was_inline); - } - cb_ptr_ = nullptr; - return local_result_; -} - void Transaction::ReportWritesSquashedMulti(absl::FunctionRef had_write) { DCHECK(multi_); for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++) @@ -895,24 +820,23 @@ uint32_t Transaction::CalcMultiNumOfShardJournals() const { return shard_journals_cnt; } -void Transaction::Schedule() { - if (multi_ && multi_->role == SQUASHED_STUB) - return; +OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { + Execute(cb, true); + return local_result_; +} - if ((coordinator_state_ & COORD_SCHED) == 0) - ScheduleInternal(); +void Transaction::Schedule() { + // no-op } // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { if (multi_ && multi_->role == SQUASHED_STUB) { - RunSquashedMultiCb(cb); + local_result_ = RunSquashedMultiCb(cb); return; } - DCHECK(coordinator_state_ & COORD_SCHED); - DCHECK(!cb_ptr_); - + local_result_ = OpStatus::OK; cb_ptr_ = &cb; if (IsAtomicMulti()) { @@ -922,16 +846,21 @@ void Transaction::Execute(RunnableType cb, bool conclude) { : (coordinator_state_ & ~COORD_CONCLUDING); } - ExecuteAsync(); + if ((coordinator_state_ & COORD_SCHED) == 0) { + ScheduleInternal(); + } + DispatchHop(); run_barrier_.Wait(); - cb_ptr_ = nullptr; + + if (coordinator_state_ & COORD_CONCLUDING) + coordinator_state_ &= ~COORD_SCHED; } // Runs in coordinator thread. -void Transaction::ExecuteAsync() { - DVLOG(1) << "ExecuteAsync " << DebugId(); +void Transaction::DispatchHop() { + DVLOG(1) << "DispatchHop " << DebugId(); DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value()); @@ -941,22 +870,36 @@ void Transaction::ExecuteAsync() { // initialize the run barrier before arming, as well as copy indices // of active shards to avoid reading concurrently accessed shard data. std::bitset<1024> poll_flags(0); - run_barrier_.Start(unique_shard_cnt_); + unsigned run_cnt = 0; + IterateActiveShards([&poll_flags, &run_cnt](auto& sd, auto i) { + if ((sd.local_mask & RAN_IMMEDIATELY) == 0) { + run_cnt++; + poll_flags.set(i, true); + } + sd.local_mask &= ~RAN_IMMEDIATELY; // we'll run it next time if it avoided concluding + }); + + DCHECK_EQ(run_cnt, poll_flags.count()); + if (run_cnt == 0) // all callbacks were run immediately + return; + + run_barrier_.Start(run_cnt); // Set armed flags on all active shards. - std::atomic_thread_fence(memory_order_release); // once to avoid flushing poll_flags in loop + std::atomic_thread_fence(memory_order_release); // once fence to avoid flushing writes in loop IterateActiveShards([&poll_flags](auto& sd, auto i) { - sd.is_armed.store(true, memory_order_relaxed); - poll_flags.set(i, true); + if (poll_flags.test(i)) + sd.is_armed.store(true, memory_order_relaxed); }); if (CanRunInlined()) { + DCHECK_EQ(run_cnt, 1u); DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId(); EngineShard::tlocal()->PollExecution("exec_cb", this); return; } - use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); // for each pointer from poll_cb + use_count_.fetch_add(run_cnt, memory_order_relaxed); // for each pointer from poll_cb auto poll_cb = [this] { EngineShard::tlocal()->PollExecution("exec_cb", this); @@ -1030,39 +973,6 @@ void Transaction::EnableAllShards() { sd.local_mask |= ACTIVE; } -Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { - DCHECK(!IsAtomicMulti()); - DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); - DCHECK_NE(unique_shard_id_, kInvalidSid); - DCHECK_EQ(0u, txid_); - - auto& sd = shard_data_[SidToId(unique_shard_id_)]; - DCHECK_EQ(0, sd.local_mask & OUT_OF_ORDER); - - DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id(); - DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id(); - - sd.stats.total_runs++; - - // Calling the callback in somewhat safe way - RunnableResult result; - try { - result = (*cb_ptr_)(this, shard); - } catch (std::bad_alloc&) { - LOG_FIRST_N(ERROR, 16) << " out of memory"; - result = OpStatus::OUT_OF_MEMORY; - } catch (std::exception& e) { - LOG(FATAL) << "Unexpected exception " << e.what(); - } - - shard->db_slice().OnCbFinish(); - - // Handling the result, along with conclusion and journaling, is done by the caller - - cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback. - return result; -} - // runs in coordinator thread. // Marks the transaction as expired and removes it from the waiting queue. void Transaction::ExpireBlocking(WaitKeysProvider wcb) { @@ -1142,88 +1052,41 @@ OpArgs Transaction::GetOpArgs(EngineShard* shard) const { return OpArgs{shard, this, GetDbContext()}; } -// Runs within a engine shard thread. -// Optimized path that schedules and runs transactions out of order if possible. -// Returns true if eagerly executed, false if the callback will be handled by the transaction -// queue. -bool Transaction::ScheduleUniqueShard(EngineShard* shard) { - DCHECK(!IsAtomicMulti()); - DCHECK_EQ(txid_, 0u); - DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); - DCHECK_NE(unique_shard_id_, kInvalidSid); - - auto mode = LockMode(); - auto lock_args = GetLockArgs(shard->shard_id()); - - auto& sd = shard_data_[SidToId(unique_shard_id_)]; - DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); - - bool shard_unlocked = shard->shard_lock()->Check(mode); - bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args); - bool quick_run = shard_unlocked && keys_unlocked; - bool continue_scheduling = !quick_run; - - sd.local_mask |= KEYLOCK_ACQUIRED; - - // Fast path. If none of the keys are locked, we can run briefly atomically on the thread - // without acquiring them at all. - if (quick_run) { - CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); - auto result = RunQuickie(shard); - local_result_ = result.status; - - if (result.flags & RunnableResult::AVOID_CONCLUDING) { - // If we want to run again, we have to actually schedule this transaction - DCHECK(!sd.is_armed.load(memory_order_relaxed)); - continue_scheduling = true; - } else { - LogAutoJournalOnShard(shard, result); - shard->db_slice().Release(mode, lock_args); - sd.local_mask &= ~KEYLOCK_ACQUIRED; - } - } - - // Slow path. Some of the keys are locked, so we schedule on the transaction queue. - if (continue_scheduling) { - coordinator_state_ |= COORD_SCHED; // safe because single shard - txid_ = op_seq.fetch_add(1, memory_order_relaxed); // - - sd.pq_pos = shard->txq()->Insert(this); - - DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size(); - - // If there are blocked transactons waiting for these tx keys, we add this transaction - // to the tx-queue (these keys will be contended). This happen even if the queue is empty. - // In that case we must poll the queue, because there will be no other callback trigerring the - // queue before us. - shard->PollExecution("schedule_unique", nullptr); - } - - return quick_run; -} - // This function should not block since it's run via RunBriefInParallel. -bool Transaction::ScheduleInShard(EngineShard* shard) { +bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) { ShardId sid = SidToId(shard->shard_id()); auto& sd = shard_data_[sid]; DCHECK(sd.local_mask & ACTIVE); DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); - sd.local_mask &= ~OUT_OF_ORDER; - - // If a more recent transaction already commited, we abort - if (shard->committed_txid() >= txid_) - return false; + sd.local_mask &= ~(OUT_OF_ORDER | RAN_IMMEDIATELY); TxQueue* txq = shard->txq(); KeyLockArgs lock_args; IntentLock::Mode mode = LockMode(); bool lock_granted = false; + // If a more recent transaction already commited, we abort + if (txid_ > 0 && shard->committed_txid() >= txid_) + return false; + // Acquire intent locks. Intent locks are always acquired, even if already locked by others. if (!IsGlobal()) { lock_args = GetLockArgs(shard->shard_id()); - bool shard_unlocked = shard->shard_lock()->Check(mode); + + // Check if we can run immediately + if (shard_unlocked && can_run_immediately && shard->db_slice().CheckLock(mode, lock_args)) { + sd.local_mask |= RAN_IMMEDIATELY; + shard->stats().tx_immediate_total++; + + RunCallback(shard); + // Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag. + // Only possible for single shard. + if (coordinator_state_ & COORD_CONCLUDING) + return true; + } + bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args); lock_granted = shard_unlocked && keys_unlocked; @@ -1235,6 +1098,13 @@ bool Transaction::ScheduleInShard(EngineShard* shard) { DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId(); } + // Single shard operations might have delayed acquiring txid unless neccessary. + if (txid_ == 0) { + DCHECK_EQ(unique_shard_cnt_, 1u); + txid_ = op_seq.fetch_add(1, memory_order_relaxed); + DCHECK_GT(txid_, shard->committed_txid()); + } + // If the new transaction requires reordering of the pending queue (i.e. it comes before tail) // and some other transaction already locked its keys we can not reorder 'trans' because // the transaction could have deduced that it can run OOO and eagerly execute. Hence, we @@ -1330,6 +1200,9 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p }; Execute(std::move(cb), true); + // Don't reset the scheduled flag because we didn't release the locks + coordinator_state_ |= COORD_SCHED; + auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId(); @@ -1606,8 +1479,12 @@ void Transaction::CancelBlocking(std::function status_cb) { bool Transaction::CanRunInlined() const { auto* ss = ServerState::tlocal(); - return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() && - ss->AllowInlineScheduling(); + if (unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() && + ss->AllowInlineScheduling()) { + ss->stats.tx_inline_runs++; + return true; + } + return false; } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 6aa9d0dbc..ab3dd50ad 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -164,9 +164,10 @@ class Transaction { OUT_OF_ORDER = 1 << 2, // Whether its key locks are acquired, never set for global commands. KEYLOCK_ACQUIRED = 1 << 3, - SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) - AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) - UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb + SUSPENDED_Q = 1 << 4, // Whether it suspended (by WatchInShard()) + AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) + UNLOCK_MULTI = 1 << 6, // Whether this shard executed UnlockMultiShardCb + RAN_IMMEDIATELY = 1 << 7, // Whether the shard executed immediately (during schedule) }; public: @@ -503,19 +504,12 @@ class Transaction { // Generic schedule used from Schedule() and ScheduleSingleHop() on slow path. void ScheduleInternal(); - // Schedule if only one shard is active. - // Returns true if transaction ran out-of-order during the scheduling phase. - bool ScheduleUniqueShard(EngineShard* shard); - // Schedule on shards transaction queue. Returns true if scheduled successfully, // false if inconsistent order was detected and the schedule needs to be cancelled. - bool ScheduleInShard(EngineShard* shard); - - // Optimized version of RunInShard for single shard uncontended cases. - RunnableResult RunQuickie(EngineShard* shard); + bool ScheduleInShard(EngineShard* shard, bool can_run_immediately); // Set ARMED flags, start run barrier and submit poll tasks. Doesn't wait for the run barrier - void ExecuteAsync(); + void DispatchHop(); // Finish hop, decrement run barrier void FinishHop();