chore: schedule chains (#3819)

Use intrusive queue that allows batching of scheduling calls instead of handling each call separately.
This optimizations improves latency and throughput by 3-5%
In addition, we expose batching statistics in info transaction block.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-10-11 22:41:31 +03:00 committed by GitHub
parent e71f083f34
commit 5d2c308c99
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 130 additions and 26 deletions

View file

@ -6,6 +6,8 @@
#include <absl/strings/match.h>
#include <new>
#include "base/flags.h"
#include "base/logging.h"
#include "facade/op_status.h"
@ -86,12 +88,32 @@ uint16_t trans_id(const Transaction* ptr) {
struct ScheduleContext {
Transaction* trans;
bool optimistic_execution = false;
std::atomic<ScheduleContext*> next{nullptr};
std::atomic_uint32_t fail_cnt{0};
ScheduleContext(Transaction* t, bool optimistic) : trans(t), optimistic_execution(optimistic) {
}
};
constexpr size_t kAvoidFalseSharingSize = 64;
struct ScheduleQ {
alignas(kAvoidFalseSharingSize) base::MPSCIntrusiveQueue<ScheduleContext> queue;
alignas(kAvoidFalseSharingSize) atomic_bool armed{false};
};
void MPSC_intrusive_store_next(ScheduleContext* dest, ScheduleContext* next_node) {
dest->next.store(next_node, std::memory_order_relaxed);
}
ScheduleContext* MPSC_intrusive_load_next(const ScheduleContext& src) {
return src.next.load(std::memory_order_acquire);
}
// of shard_num arity.
ScheduleQ* schedule_queues = nullptr;
} // namespace
bool Transaction::BatonBarrier::IsClaimed() const {
@ -139,6 +161,17 @@ Transaction::Guard::~Guard() {
tx->Refurbish();
}
void Transaction::Init(unsigned num_shards) {
DCHECK(schedule_queues == nullptr);
schedule_queues = new ScheduleQ[num_shards];
}
void Transaction::Shutdown() {
DCHECK(schedule_queues);
delete[] schedule_queues;
schedule_queues = nullptr;
}
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
InitTxTime();
string_view cmd_name(cid_->name());
@ -685,11 +718,11 @@ void Transaction::ScheduleInternal() {
// Try running immediately (during scheduling) if we're concluding and either:
// - have a single shard, and thus never have to cancel scheduling due to reordering
// - run as an idempotent command, meaning we can safely repeat the operation if scheduling fails
bool can_run_immediately = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) &&
(unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT));
bool optimistic_exec = !IsGlobal() && (coordinator_state_ & COORD_CONCLUDING) &&
(unique_shard_cnt_ == 1 || (cid_->opt_mask() & CO::IDEMPOTENT));
DVLOG(1) << "ScheduleInternal " << cid_->name() << " on " << unique_shard_cnt_ << " shards "
<< " immediate run: " << can_run_immediately;
<< " optimistic_execution: " << optimistic_exec;
auto is_active = [this](uint32_t i) { return IsActive(i); };
@ -711,29 +744,40 @@ void Transaction::ScheduleInternal() {
// in the lower-level code. It's not really needed otherwise because we run inline.
// single shard schedule operation can't fail
CHECK(ScheduleInShard(EngineShard::tlocal(), can_run_immediately));
CHECK(ScheduleInShard(EngineShard::tlocal(), optimistic_exec));
run_barrier_.Dec();
break;
}
ScheduleContext schedule_ctx{this, can_run_immediately};
ScheduleContext schedule_ctx{this, optimistic_exec};
auto cb = [&schedule_ctx]() {
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
schedule_ctx.optimistic_execution)) {
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
if (unique_shard_cnt_ == 1) {
// Single shard optimization. Note: we could apply the same optimization
// to multi-shard transactions as well by creating a vector of ScheduleContext.
schedule_queues[unique_shard_id_].queue.Push(&schedule_ctx);
bool current_val = false;
if (schedule_queues[unique_shard_id_].armed.compare_exchange_strong(current_val, true,
memory_order_acq_rel)) {
shard_set->Add(unique_shard_id_, &Transaction::ScheduleBatchInShard);
}
schedule_ctx.trans->FinishHop();
};
} else {
auto cb = [&schedule_ctx]() {
if (!schedule_ctx.trans->ScheduleInShard(EngineShard::tlocal(),
schedule_ctx.optimistic_execution)) {
schedule_ctx.fail_cnt.fetch_add(1, memory_order_relaxed);
}
schedule_ctx.trans->FinishHop();
};
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
IterateActiveShards([cb](const auto& sd, ShardId i) { shard_set->Add(i, cb); });
// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
// Add this debugging function to print more information when we experience deadlock
// during tests.
ThisFiber::PrintLocalsCallback locals([&] {
return absl::StrCat("unique_shard_cnt_: ", unique_shard_cnt_,
" run_barrier_cnt: ", run_barrier_.DEBUG_Count(), "\n");
});
}
run_barrier_.Wait();
if (schedule_ctx.fail_cnt.load(memory_order_relaxed) == 0) {
@ -1115,6 +1159,45 @@ bool Transaction::ScheduleInShard(EngineShard* shard, bool execute_optimistic) {
return true;
}
void Transaction::ScheduleBatchInShard() {
EngineShard* shard = EngineShard::tlocal();
auto& stats = shard->stats();
stats.tx_batch_schedule_calls_total++;
ShardId sid = shard->shard_id();
auto& sq = schedule_queues[sid];
for (unsigned j = 0;; ++j) {
// We pull the items from the queue in a loop until we reach the stop condition.
// TODO: we may have fairness problem here, where transactions being added up all the time
// and we never break from the loop. It is possible to break early but it's not trivial
// because we must ensure that there is another ScheduleBatchInShard callback in the queue.
// Can be checked with testing sq.armed is true when j == 1.
while (true) {
ScheduleContext* item = sq.queue.Pop();
if (!item)
break;
if (!item->trans->ScheduleInShard(shard, item->optimistic_execution)) {
item->fail_cnt.fetch_add(1, memory_order_relaxed);
}
item->trans->FinishHop();
stats.tx_batch_scheduled_items_total++;
};
// j==1 means we already signalled that we're done with the current batch.
if (j == 1)
break;
// We signal that we're done with the current batch but then we check if there are more
// transactions to fetch in the next iteration.
// We do this to avoid the situation where we have a data race, where
// a transaction is added to the queue, we've checked that sq.armed is true and skipped
// adding the callback that fetches the transaction.
sq.armed.store(false, memory_order_release);
}
}
bool Transaction::CancelShardCb(EngineShard* shard) {
ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];