feat(transaction): Single hop blocking, callback flags (#2393)

* feat(transaction): Single hop blocking, callback flags
This commit is contained in:
Vladislav 2024-01-15 21:13:22 +03:00 committed by GitHub
parent b6f4370ae7
commit de817098a7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 211 additions and 116 deletions

View file

@ -472,23 +472,22 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
/*************************************************************************/
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
OpStatus status = OpStatus::OK;
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
status = (*cb_ptr_)(this, shard);
result = (*cb_ptr_)(this, shard);
if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = status;
local_result_ = result;
} else {
if (status == OpStatus::OUT_OF_MEMORY) {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = status;
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, status);
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
@ -500,15 +499,25 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
}
/*************************************************************************/
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);
shard->db_slice().OnCbFinish();
// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK(is_concluding || multi_->concluding);
is_concluding = false;
}
// Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard);
shard->db_slice().OnCbFinish();
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
@ -928,6 +937,8 @@ void Transaction::ExecuteAsync() {
}
void Transaction::Conclude() {
if (!IsScheduled())
return;
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
Execute(std::move(cb), true);
}
@ -963,7 +974,7 @@ void Transaction::EnableAllShards() {
sd.local_mask |= ACTIVE;
}
void Transaction::RunQuickie(EngineShard* shard) {
Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
@ -976,19 +987,23 @@ void Transaction::RunQuickie(EngineShard* shard) {
DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id();
// Calling the callback in somewhat safe way
RunnableResult result;
try {
local_result_ = (*cb_ptr_)(this, shard);
result = (*cb_ptr_)(this, shard);
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory";
local_result_ = OpStatus::OUT_OF_MEMORY;
result = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard);
// Handling the result, along with conclusion and journaling, is done by the caller
sd.is_armed.store(false, memory_order_relaxed);
cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback.
return result;
}
// runs in coordinator thread.
@ -1030,10 +1045,11 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
// Runs within a engine shard thread.
// Optimized path that schedules and runs transactions out of order if possible.
// Returns true if was eagerly executed, false if it was scheduled into queue.
// Returns true if eagerly executed, false if the callback will be handled by the transaction
// queue.
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(txid_, 0u);
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
DCHECK_NE(unique_shard_id_, kInvalidSid);
@ -1043,31 +1059,45 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
auto& sd = shard_data_[SidToId(unique_shard_id_)];
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
// Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) {
RunQuickie(shard);
return true;
bool unlocked_keys =
shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode);
bool quick_run = unlocked_keys;
// Fast path. If none of the keys are locked, we can run briefly atomically on the thread
// without acquiring them at all.
if (quick_run) {
auto result = RunQuickie(shard);
local_result_ = result.status;
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// If we want to run again, we have to actually acquire keys, but keep ourselves disarmed
DCHECK_EQ(sd.is_armed, false);
unlocked_keys = false;
} else {
LogAutoJournalOnShard(shard);
}
}
// we can do it because only a single thread writes into txid_ and sd.
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
sd.pq_pos = shard->txq()->Insert(this);
// Slow path. Some of the keys are locked, so we schedule on the transaction queue.
if (!unlocked_keys) {
coordinator_state_ |= COORD_SCHED; // safe because single shard
txid_ = op_seq.fetch_add(1, memory_order_relaxed); // -
sd.pq_pos = shard->txq()->Insert(this);
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0);
shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED;
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size();
// If there are blocked transactons waiting for this tx keys, we will add this transaction
// to the tx-queue (these keys will be contended). This will happen even if the queue was empty.
// In that case we must poll the queue, because there will be no other callback trigerring the
// queue before us.
shard->PollExecution("schedule_unique", nullptr);
}
// If there are blocked transactons waiting for this tx keys, we will add this transaction
// to the tx-queue (these keys will be contended). This will happen even if the queue was empty.
// In that case we must poll the queue, because there will be no other callback trigerring the
// queue before us.
shard->PollExecution("schedule_unique", nullptr);
return false;
return quick_run;
}
// This function should not block since it's run via RunBriefInParallel.
@ -1303,11 +1333,14 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
DCHECK(multi_ && multi_->role == SQUASHED_STUB);
DCHECK_EQ(unique_shard_cnt_, 1u);
auto* shard = EngineShard::tlocal();
auto status = cb(this, shard);
auto result = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard);
return status;
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
return result;
}
void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard,