feat(transaction): Idempotent callbacks (immediate runs) (#2453)

This commit generalizes the machanism of running transaction callbacks during scheduling, removing the need for specialized ScheduleUniqueShard/RunQuickie. Instead, transactions can be run now during ScheduleInShard - called "immediate" runs - if the transaction is concluding and either only a single shard is active or the operation can be safely repeated if scheduling failed (idempotent commands, like MGET).

Updates transaction stats to mirror the new changes more closely.

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-04-03 23:06:57 +03:00 committed by GitHub
parent 84d451fbed
commit fbc55bb82d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 150 additions and 285 deletions

View file

@ -7,9 +7,11 @@
#include <absl/strings/match.h>
#include "base/logging.h"
#include "glog/logging.h"
#include "facade/op_status.h"
#include "redis/redis_aux.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/common.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
@ -64,24 +66,8 @@ void AnalyzeTxQueue(const EngineShard* shard, const TxQueue* txq) {
void RecordTxScheduleStats(const Transaction* tx) {
auto* ss = ServerState::tlocal();
DCHECK(ss);
ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1]++;
if (tx->IsGlobal()) {
ss->stats.tx_type_cnt[ServerState::GLOBAL]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
}
void RecordTxScheduleFastStats(const Transaction* tx, bool was_ooo, bool was_inline) {
DCHECK_EQ(tx->GetUniqueShardCnt(), 1u);
auto* ss = ServerState::tlocal();
if (was_ooo) {
ss->stats.tx_type_cnt[was_inline ? ServerState::INLINE : ServerState::QUICK]++;
} else {
ss->stats.tx_type_cnt[ServerState::NORMAL]++;
}
ss->stats.tx_width_freq_arr[0]++;
++(tx->IsGlobal() ? ss->stats.tx_global_cnt : ss->stats.tx_normal_cnt);
++ss->stats.tx_width_freq_arr[tx->GetUniqueShardCnt() - 1];
}
std::ostream& operator<<(std::ostream& os, Transaction::time_point tp) {
@ -654,14 +640,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
}
void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(EngineShard::tlocal(), shard);
DCHECK_EQ(shard, EngineShard::tlocal());
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);
if (unique_shard_cnt_ == 1) {
@ -688,12 +670,11 @@ void Transaction::RunCallback(EngineShard* shard) {
// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
// Multi shard callbacks should either all or none choose to conclude. They can't communicate,
// so they must know their decision ahead, consequently there is no point in using this flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding);
coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard
coordinator_state_ &= ~COORD_CONCLUDING;
}
// Log to journal only once the command finished running
@ -711,7 +692,14 @@ void Transaction::ScheduleInternal() {
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK(!IsAtomicMulti() || cid_->IsMultiTransactional());
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards";
// Try running immediately (during scheduling) if we're concluding and either:
// - have a single shard, and thus never have to cancel scheduling due to reordering
// - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails
bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) &&
(unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT));
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards "
<< " immediate run: " << can_run_immediately;
auto is_active = [this](uint32_t i) { return IsActive(i); };
@ -719,22 +707,35 @@ void Transaction::ScheduleInternal() {
while (true) {
stats_.schedule_attempts++;
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
// This is a contention point for all threads - avoid using it unless necessary.
// Single shard operations can assign txid later if the immediate run failed.
if (unique_shard_cnt_ > 1)
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
InitTxTime();
atomic_uint32_t schedule_fails = 0;
auto cb = [this, &schedule_fails](EngineShard* shard) {
if (!ScheduleInShard(shard)) {
auto cb = [this, &schedule_fails, can_run_immediately]() {
if (!ScheduleInShard(EngineShard::tlocal(), can_run_immediately)) {
schedule_fails.fetch_add(1, memory_order_relaxed);
}
run_barrier_.Dec();
};
shard_set->RunBriefInParallel(std::move(cb), is_active);
run_barrier_.Start(unique_shard_cnt_);
if (CanRunInlined()) {
// single shard schedule operation can't fail
CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately));
run_barrier_.Dec();
} else {
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
run_barrier_.Wait();
}
if (schedule_fails.load(memory_order_relaxed) == 0) {
coordinator_state_ |= COORD_SCHED;
RecordTxScheduleStats(this);
VLOG(2) << "Scheduled " << DebugId() << " num_shards: " << unique_shard_cnt_;
break;
}
@ -767,82 +768,6 @@ void Transaction::ScheduleInternal() {
}
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop.
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
if (multi_ && multi_->role == SQUASHED_STUB) {
return RunSquashedMultiCb(cb);
}
DCHECK(!cb_ptr_);
cb_ptr_ = &cb;
// We can be already scheduled if we're part of a multi transaction. Note: If a multi tx isn't
// scheduled, we assume it's not mimicking the interface, but actually preparing a single hop.
bool scheduled = (coordinator_state_ & COORD_SCHED) > 0;
if (scheduled) {
DCHECK(IsAtomicMulti());
multi_->concluding = true;
} else {
// For multi it only makes sense with squashing and thus a proper underlying command
DCHECK(!IsAtomicMulti() || (multi_->role == SQUASHER && cid_->IsMultiTransactional()));
coordinator_state_ |= COORD_CONCLUDING;
}
// If we run only on one shard and conclude, we can possibly avoid scheduling at all
// and directly run the callback on the destination thread if the locks are free.
bool schedule_fast = !scheduled && (unique_shard_cnt_ == 1) && !IsGlobal();
bool was_ooo = false, was_inline = false;
if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(IsActive(unique_shard_id_));
DCHECK(shard_data_.size() == 1 || multi_);
InitTxTime();
run_barrier_.Start(1);
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_release);
auto schedule_cb = [this, &was_ooo] {
bool run_fast = ScheduleUniqueShard(EngineShard::tlocal());
if (run_fast) {
// We didn't decrease the barrier, so the scope is valid UNTIL Dec() below
DCHECK_EQ(run_barrier_.DEBUG_Count(), 1u);
was_ooo = true;
FinishHop();
}
// Otherwise it's not safe to access the function scope, as
// ScheduleUniqueShard() -> PollExecution() might have unlocked the barrier below.
};
auto* ss = ServerState::tlocal();
if (ss->thread_index() == unique_shard_id_ && ss->AllowInlineScheduling()) {
DVLOG(2) << "Inline scheduling a transaction";
schedule_cb();
was_inline = true;
} else {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
}
} else {
// This transaction either spans multiple shards and/or is multi, which schedule in advance.
if (!scheduled)
ScheduleInternal();
ExecuteAsync();
}
run_barrier_.Wait();
if (schedule_fast) {
CHECK(!cb_ptr_); // we should have reset it within the callback.
RecordTxScheduleFastStats(this, was_ooo, was_inline);
}
cb_ptr_ = nullptr;
return local_result_;
}
void Transaction::ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write) {
DCHECK(multi_);
for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++)
@ -895,24 +820,23 @@ uint32_t Transaction::CalcMultiNumOfShardJournals() const {
return shard_journals_cnt;
}
void Transaction::Schedule() {
if (multi_ && multi_->role == SQUASHED_STUB)
return;
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
Execute(cb, true);
return local_result_;
}
if ((coordinator_state_ & COORD_SCHED) == 0)
ScheduleInternal();
void Transaction::Schedule() {
// no-op
}
// Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) {
if (multi_ && multi_->role == SQUASHED_STUB) {
RunSquashedMultiCb(cb);
local_result_ = RunSquashedMultiCb(cb);
return;
}
DCHECK(coordinator_state_ & COORD_SCHED);
DCHECK(!cb_ptr_);
local_result_ = OpStatus::OK;
cb_ptr_ = &cb;
if (IsAtomicMulti()) {
@ -922,16 +846,21 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
: (coordinator_state_ & ~COORD_CONCLUDING);
}
ExecuteAsync();
if ((coordinator_state_ & COORD_SCHED) == 0) {
ScheduleInternal();
}
DispatchHop();
run_barrier_.Wait();
cb_ptr_ = nullptr;
if (coordinator_state_ & COORD_CONCLUDING)
coordinator_state_ &= ~COORD_SCHED;
}
// Runs in coordinator thread.
void Transaction::ExecuteAsync() {
DVLOG(1) << "ExecuteAsync " << DebugId();
void Transaction::DispatchHop() {
DVLOG(1) << "DispatchHop " << DebugId();
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
DCHECK(!IsAtomicMulti() || multi_->lock_mode.has_value());
@ -941,22 +870,36 @@ void Transaction::ExecuteAsync() {
// initialize the run barrier before arming, as well as copy indices
// of active shards to avoid reading concurrently accessed shard data.
std::bitset<1024> poll_flags(0);
run_barrier_.Start(unique_shard_cnt_);
unsigned run_cnt = 0;
IterateActiveShards([&poll_flags, &run_cnt](auto& sd, auto i) {
if ((sd.local_mask & RAN_IMMEDIATELY) == 0) {
run_cnt++;
poll_flags.set(i, true);
}
sd.local_mask &= ~RAN_IMMEDIATELY; // we'll run it next time if it avoided concluding
});
DCHECK_EQ(run_cnt, poll_flags.count());
if (run_cnt == 0) // all callbacks were run immediately
return;
run_barrier_.Start(run_cnt);
// Set armed flags on all active shards.
std::atomic_thread_fence(memory_order_release); // once to avoid flushing poll_flags in loop
std::atomic_thread_fence(memory_order_release); // once fence to avoid flushing writes in loop
IterateActiveShards([&poll_flags](auto& sd, auto i) {
sd.is_armed.store(true, memory_order_relaxed);
poll_flags.set(i, true);
if (poll_flags.test(i))
sd.is_armed.store(true, memory_order_relaxed);
});
if (CanRunInlined()) {
DCHECK_EQ(run_cnt, 1u);
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
return;
}
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); // for each pointer from poll_cb
use_count_.fetch_add(run_cnt, memory_order_relaxed); // for each pointer from poll_cb
auto poll_cb = [this] {
EngineShard::tlocal()->PollExecution("exec_cb", this);
@ -1030,39 +973,6 @@ void Transaction::EnableAllShards() {
sd.local_mask |= ACTIVE;
}
Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK_EQ(0u, txid_);
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(0, sd.local_mask & OUT_OF_ORDER);
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id();
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
sd.stats.total_runs++;
// Calling the callback in somewhat safe way
RunnableResult result;
try {
result = (*cb_ptr_)(this, shard);
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory";
result = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
shard->db_slice().OnCbFinish();
// Handling the result, along with conclusion and journaling, is done by the caller
cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback.
return result;
}
// runs in coordinator thread.
// Marks the transaction as expired and removes it from the waiting queue.
void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
@ -1142,88 +1052,41 @@ OpArgs Transaction::GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, this, GetDbContext()};
}
// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if eagerly executed, false if the callback will be handled by the transaction
// queue.
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK_EQ(txid_, 0u);
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
auto mode = LockMode();
auto lock_args = GetLockArgs(shard->shard_id());
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
bool shard_unlocked = shard->shard_lock()->Check(mode);
bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args);
bool quick_run = shard_unlocked && keys_unlocked;
bool continue_scheduling = !quick_run;
sd.local_mask |= KEYLOCK_ACQUIRED;
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
// without acquiring them at all.
if (quick_run) {
CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
auto result = RunQuickie(shard);
local_result_ = result.status;
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// If we want to run again, we have to actually schedule this transaction
DCHECK(!sd.is_armed.load(memory_order_relaxed));
continue_scheduling = true;
} else {
LogAutoJournalOnShard(shard, result);
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
}
// Slow path. Some of the keys are locked, so we schedule on the transaction queue.
if (continue_scheduling) {
coordinator_state_ |= COORD_SCHED; // safe because single shard
txid_ = op_seq.fetch_add(1, memory_order_relaxed); // -
sd.pq_pos = shard->txq()->Insert(this);
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
// If there are blocked transactons waiting for these tx keys, we add this transaction
// to the tx-queue (these keys will be contended). This happen even if the queue is empty.
// In that case we must poll the queue, because there will be no other callback trigerring the
// queue before us.
shard->PollExecution("schedule_unique", nullptr);
}
return quick_run;
}
// This function should not block since it's run via RunBriefInParallel.
bool Transaction::ScheduleInShard(EngineShard* shard) {
bool Transaction::ScheduleInShard(EngineShard* shard, bool can_run_immediately) {
ShardId sid = SidToId(shard->shard_id());
auto& sd = shard_data_[sid];
DCHECK(sd.local_mask & ACTIVE);
DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0);
sd.local_mask &= ~OUT_OF_ORDER;
// If a more recent transaction already commited, we abort
if (shard->committed_txid() >= txid_)
return false;
sd.local_mask &= ~(OUT_OF_ORDER | RAN_IMMEDIATELY);
TxQueue* txq = shard->txq();
KeyLockArgs lock_args;
IntentLock::Mode mode = LockMode();
bool lock_granted = false;
// If a more recent transaction already commited, we abort
if (txid_ > 0 && shard->committed_txid() >= txid_)
return false;
// Acquire intent locks. Intent locks are always acquired, even if already locked by others.
if (!IsGlobal()) {
lock_args = GetLockArgs(shard->shard_id());
bool shard_unlocked = shard->shard_lock()->Check(mode);
// Check if we can run immediately
if (shard_unlocked && can_run_immediately && shard->db_slice().CheckLock(mode, lock_args)) {
sd.local_mask |= RAN_IMMEDIATELY;
shard->stats().tx_immediate_total++;
RunCallback(shard);
// Check state again, it could've been updated if the callback returned AVOID_CONCLUDING flag.
// Only possible for single shard.
if (coordinator_state_ & COORD_CONCLUDING)
return true;
}
bool keys_unlocked = shard->db_slice().Acquire(mode, lock_args);
lock_granted = shard_unlocked && keys_unlocked;
@ -1235,6 +1098,13 @@ bool Transaction::ScheduleInShard(EngineShard* shard) {
DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId();
}
// Single shard operations might have delayed acquiring txid unless neccessary.
if (txid_ == 0) {
DCHECK_EQ(unique_shard_cnt_, 1u);
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
DCHECK_GT(txid_, shard->committed_txid());
}
// If the new transaction requires reordering of the pending queue (i.e. it comes before tail)
// and some other transaction already locked its keys we can not reorder 'trans' because
// the transaction could have deduced that it can run OOO and eagerly execute. Hence, we
@ -1330,6 +1200,9 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
};
Execute(std::move(cb), true);
// Don't reset the scheduled flag because we didn't release the locks
coordinator_state_ |= COORD_SCHED;
auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients;
DVLOG(1) << "WaitOnWatch wait for " << tp << " " << DebugId();
@ -1606,8 +1479,12 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
bool Transaction::CanRunInlined() const {
auto* ss = ServerState::tlocal();
return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() &&
ss->AllowInlineScheduling();
if (unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() &&
ss->AllowInlineScheduling()) {
ss->stats.tx_inline_runs++;
return true;
}
return false;
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {