chore(transaction): Untie scheduling from multi status (#2590)

* chore(transaction): Untie scheduling from multi status

Idea: We decide whether we have to schedule not based on our multi status (atomic multi), but solely based on the fact if COORD_SCHED is set

Goal: Being able to use ScheduleSingleHop()/Schedule() for multi transactions, and thus later allow single hop multi transactions
---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-02-18 12:25:57 +03:00 committed by GitHub
parent d802a2181a
commit 4d4fed6fec
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 98 additions and 48 deletions

View file

@ -55,6 +55,10 @@ bool CommandId::IsTransactional() const {
return false;
}
bool CommandId::IsMultiTransactional() const {
return CO::IsTransKind(name()) || CO::IsEvalKind(name());
}
uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
int64_t before = absl::GetCurrentTimeNanos();
handler_(args, cntx);

View file

@ -97,6 +97,8 @@ class CommandId : public facade::CommandId {
bool IsTransactional() const;
bool IsMultiTransactional() const;
bool IsReadOnly() const {
return opt_mask_ & CO::READONLY;
}

View file

@ -193,8 +193,7 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio
multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);
time_now_ms_ = parent->time_now_ms_;
MultiUpdateWithParent(parent);
if (slot_id.has_value()) {
unique_slot_checker_.Add(*slot_id);
}
@ -447,7 +446,6 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
MultiSwitchCmd(cid);
multi_->role = SQUASHER;
InitBase(db_index_, {});
// Because squashing already determines active shards by partitioning commands,
@ -465,6 +463,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
shard_data_[i].arg_start = 0;
shard_data_[i].arg_count = 0;
}
MultiBecomeSquasher();
}
void Transaction::StartMultiGlobal(DbIndex dbid) {
@ -479,7 +479,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) {
ScheduleInternal();
}
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) {
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling) {
DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys";
DCHECK(multi_);
@ -493,6 +493,7 @@ void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) {
InitBase(dbid, absl::MakeSpan(keys));
InitByKeys(KeyIndex::Range(0, keys.size()));
if (!skip_scheduling)
ScheduleInternal();
full_args_ = {nullptr, 0}; // InitBase set it to temporary keys, now we reset it.
@ -543,10 +544,28 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK_EQ(coordinator_state_, 0u);
}
// Each hop needs to be prepared, reset role
if (multi_->role == SQUASHER)
multi_->role = DEFAULT;
}
void Transaction::MultiUpdateWithParent(const Transaction* parent) {
// Disabled because of single shard lua optimization
// DCHECK(multi_);
// DCHECK(parent->multi_); // it might not be a squasher yet, but certainly is multi
DCHECK_EQ(multi_->role, SQUASHED_STUB);
txid_ = parent->txid_;
time_now_ms_ = parent->time_now_ms_;
unique_slot_checker_ = parent->unique_slot_checker_;
}
void Transaction::MultiBecomeSquasher() {
DCHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
DCHECK_GT(GetUniqueShardCnt(), 0u); // initialized and determined active shards
DCHECK(cid_->IsMultiTransactional()); // proper base command set
multi_->role = SQUASHER;
}
string Transaction::DebugId(std::optional<ShardId> sid) const {
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
string res = StrCat(Name(), "@", txid_, "/", unique_shard_cnt_);
@ -681,6 +700,11 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this);
if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop
DCHECK(cid_->IsMultiTransactional());
MultiReportJournalOnShard(shard);
}
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
@ -708,10 +732,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or
// auto-tune based on the static analysis (by identifying commands with hardcoded command names).
void Transaction::ScheduleInternal() {
DCHECK(!shard_data_.empty());
DCHECK_EQ(0u, txid_);
DCHECK_EQ(0, coordinator_state_ & COORD_SCHED);
DCHECK_EQ(txid_, 0u);
DCHECK_EQ(coordinator_state_ & COORD_SCHED, 0);
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK(!IsAtomicMulti() || cid_->IsMultiTransactional());
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards";
@ -776,26 +800,29 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
DCHECK(!cb_ptr_);
DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance.
cb_ptr_ = &cb;
if (IsAtomicMulti()) {
// We can be already scheduled if we're part of a multi transaction. Note: If a multi tx isn't
// scheduled, we assume it's not mimicking the interface, but actually preparing a single hop.
bool scheduled = (coordinator_state_ & COORD_SCHED) > 0;
if (scheduled) {
DCHECK(IsAtomicMulti());
multi_->concluding = true;
} else {
coordinator_state_ |= COORD_CONCLUDING; // Single hop means we conclude.
// For multi it only makes sense with squashing and thus a proper underlying command
DCHECK(!IsAtomicMulti() || (multi_->role == SQUASHER && cid_->IsMultiTransactional()));
coordinator_state_ |= COORD_CONCLUDING;
}
// If we run only on one shard and conclude, we can possibly avoid scheduling at all
// and directly run the callback on the destination thread if the locks are free.
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti();
bool schedule_fast = !scheduled && (unique_shard_cnt_ == 1) && !IsGlobal();
bool was_ooo = false, was_inline = false;
if (schedule_fast) {
DCHECK_NE(unique_shard_id_, kInvalidSid);
DCHECK(IsActive(unique_shard_id_));
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
DCHECK(shard_data_.size() == 1 || multi_);
InitTxTime();
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
@ -825,7 +852,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
}
} else {
// This transaction either spans multiple shards and/or is multi, which schedule in advance.
if (!IsAtomicMulti())
if (!scheduled)
ScheduleInternal();
ExecuteAsync();
@ -845,6 +872,9 @@ void Transaction::ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had
DCHECK(multi_);
for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++)
multi_->shard_journal_write[i] |= had_write(i);
// Update imemdiately if we decide to conclude after one hop without UnlockMulti
multi_->shard_journal_cnt = CalcMultiNumOfShardJournals();
}
// Runs in the coordinator fiber.
@ -853,7 +883,8 @@ void Transaction::UnlockMulti() {
DCHECK(multi_);
DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress.
if (multi_->mode == NON_ATOMIC)
// Return if we either didn't schedule at all (and thus run) or already did conclude
if ((coordinator_state_ & COORD_SCHED) == 0 || (coordinator_state_ & COORD_CONCLUDING) > 0)
return;
multi_->frozen_keys_set.clear();
@ -864,15 +895,14 @@ void Transaction::UnlockMulti() {
(*sharded_keys)[sid].emplace_back(key);
}
unsigned shard_journals_cnt =
ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
DCHECK_EQ(shard_data_.size(), shard_set->size());
for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add(i, [this, sharded_keys, i, shard_journals_cnt]() {
this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal(), shard_journals_cnt);
shard_set->Add(i, [this, sharded_keys, i]() {
this->UnlockMultiShardCb((*sharded_keys)[i], EngineShard::tlocal());
intrusive_ptr_release(this);
});
}
@ -894,7 +924,7 @@ void Transaction::Schedule() {
if (multi_ && multi_->role == SQUASHED_STUB)
return;
if (!IsAtomicMulti())
if ((coordinator_state_ & COORD_SCHED) == 0)
ScheduleInternal();
}
@ -987,10 +1017,8 @@ void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;
if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
}
multi_->shard_journal_cnt = 1;
MultiReportJournalOnShard(EngineShard::tlocal());
}
void Transaction::EnableShard(ShardId sid) {
@ -1363,16 +1391,21 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
return result;
}
void Transaction::UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys,
EngineShard* shard, uint32_t shard_journals_cnt) {
DCHECK(multi_ && multi_->lock_mode);
auto journal = shard->journal();
if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()]) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt,
void Transaction::MultiReportJournalOnShard(EngineShard* shard) const {
DCHECK_EQ(EngineShard::tlocal(), shard);
auto* journal = shard->journal();
size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id();
if (journal != nullptr && multi_->shard_journal_write[write_idx]) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, true);
}
}
void Transaction::UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys,
EngineShard* shard) {
DCHECK(multi_ && multi_->lock_mode);
MultiReportJournalOnShard(shard);
if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);

View file

@ -233,13 +233,11 @@ class Transaction {
void StartMultiGlobal(DbIndex dbid);
// Start multi in LOCK_AHEAD mode with given keys.
void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys);
void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys, bool skip_scheduling = false);
// Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic();
void InitTxTime();
// Report which shards had write commands that executed on stub transactions
// and thus did not mark itself in MultiData::shard_journal_write.
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);
@ -250,6 +248,12 @@ class Transaction {
// Set new command for multi transaction.
void MultiSwitchCmd(const CommandId* cid);
// Copy txid, time and unique slot from parent
void MultiUpdateWithParent(const Transaction* parent);
// Set squasher role
void MultiBecomeSquasher();
// Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
// Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
@ -297,6 +301,14 @@ class Transaction {
return multi_->mode;
}
// Whether the transaction is multi and runs in an atomic mode.
// This, instead of just IsMulti(), should be used to check for the possibility of
// different optimizations, because they can safely be applied to non-atomic multi
// transactions as well.
bool IsAtomicMulti() const {
return multi_ && (multi_->mode == LOCK_AHEAD || multi_->mode == GLOBAL);
}
bool IsGlobal() const;
// If blocking tx was woken up on this shard, get wake key.
@ -389,12 +401,14 @@ class Transaction {
// Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING
bool concluding = false;
unsigned cmd_seq_num = 0; // used for debugging purposes.
// The shard_journal_write vector variable is used to determine the number of shards
// involved in a multi-command transaction. This information is utilized by replicas when
// executing multi-command. For every write to a shard journal, the corresponding index in the
// vector is marked as true.
absl::InlinedVector<bool, 4> shard_journal_write;
unsigned cmd_seq_num = 0; // used for debugging purposes.
unsigned shard_journal_cnt;
};
enum CoordinatorState : uint8_t {
@ -507,8 +521,13 @@ class Transaction {
// Run callback inline as part of multi stub.
OpStatus RunSquashedMultiCb(RunnableType cb);
void UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys, EngineShard* shard,
uint32_t shard_journals_cnt);
// Set time_now_ms_
void InitTxTime();
// If journaling is enabled, report final exec opcode to finish the chain of commands.
void MultiReportJournalOnShard(EngineShard* shard) const;
void UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys, EngineShard* shard);
// In a multi-command transaction, we determine the number of shard journals that we wrote entries
// to by updating the shard_journal_write vector during command execution. The total number of
@ -525,14 +544,6 @@ class Transaction {
return use_count_.load(std::memory_order_relaxed);
}
// Whether the transaction is multi and runs in an atomic mode.
// This, instead of just IsMulti(), should be used to check for the possibility of
// different optimizations, because they can safely be applied to non-atomic multi
// transactions as well.
bool IsAtomicMulti() const {
return multi_ && multi_->mode != NON_ATOMIC;
}
bool IsActiveMulti() const {
return multi_ && multi_->role != SQUASHED_STUB;
}