From 7adf3799f0a36f822f90dd350913eb36bbb82020 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Sun, 21 May 2023 10:10:38 +0300 Subject: [PATCH] feature(server): Bring back inline scheduling (#1130) * feat: run tx-schedule inline if the dest shard is on the same thread (#908) The optimization is applied within ScheduleSingleHop call. Signed-off-by: Roman Gershman * fix(server): Don't inline schedule when in LOADING * Fix the another pre-emption bug with inline scheduling * Better locking around journal callbacks --------- Signed-off-by: Roman Gershman Co-authored-by: Roman Gershman --- src/server/common.h | 4 ++-- src/server/debugcmd.cc | 3 +++ src/server/journal/journal.cc | 4 ++++ src/server/journal/journal.h | 1 + src/server/journal/journal_slice.cc | 13 +++++++------ src/server/journal/journal_slice.h | 8 ++++++-- src/server/server_state.cc | 19 +++++++++++++++++++ src/server/server_state.h | 2 ++ src/server/transaction.cc | 8 +++++++- 9 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/server/common.h b/src/server/common.h index 785d95a2d..e46b16f2d 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -50,9 +50,9 @@ class Transaction; class EngineShard; struct KeyLockArgs { - DbIndex db_index; + DbIndex db_index = 0; ArgSlice args; - unsigned key_step; + unsigned key_step = 1; }; // Describes key indices. diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 932ba2915..c17e4c86e 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -346,6 +346,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view void DebugCmd::Inspect(string_view key) { EngineShardSet& ess = *shard_set; ShardId sid = Shard(key, ess.size()); + VLOG(1) << "DebugCmd::Inspect " << key; auto cb = [&]() -> ObjInfo { auto& db_slice = EngineShard::tlocal()->db_slice(); @@ -376,7 +377,9 @@ void DebugCmd::Inspect(string_view key) { KeyLockArgs lock_args; lock_args.args = ArgSlice{&key, 1}; + lock_args.key_step = 1; lock_args.db_index = cntx_->db_index(); + if (!db_slice.CheckLock(IntentLock::EXCLUSIVE, lock_args)) { oinfo.lock_status = db_slice.CheckLock(IntentLock::SHARED, lock_args) ? ObjInfo::S : ObjInfo::X; diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index e541e85e9..55883b35d 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -86,6 +86,10 @@ void Journal::UnregisterOnChange(uint32_t id) { journal_slice.UnregisterOnChange(id); } +bool Journal::HasRegisteredCallbacks() const { + return journal_slice.HasRegisteredCallbacks(); +} + LSN Journal::GetLsn() const { return journal_slice.cur_lsn(); } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 2182f74b0..bd25a7bf1 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -33,6 +33,7 @@ class Journal { uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t id); + bool HasRegisteredCallbacks() const; /* void AddCmd(TxId txid, Op opcode, Span args) { diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 0555b3b4e..494d7186c 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -114,14 +114,15 @@ error_code JournalSlice::Close() { void JournalSlice::AddLogRecord(const Entry& entry, bool await) { DCHECK(ring_buffer_); - cb_mu_.lock_shared(); - DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString() - << " num callbacks: " << change_cb_arr_.size(); + { + std::shared_lock lk(cb_mu_); + DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString() + << " num callbacks: " << change_cb_arr_.size(); - for (const auto& k_v : change_cb_arr_) { - k_v.second(entry, await); + for (const auto& k_v : change_cb_arr_) { + k_v.second(entry, await); + } } - cb_mu_.unlock_shared(); // TODO: This is preparation for AOC style journaling, currently unused. RingItem item; diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index c811a0aff..7f2f59a96 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -44,6 +44,10 @@ class JournalSlice { uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t); + bool HasRegisteredCallbacks() const { + std::shared_lock lk(cb_mu_); + return !change_cb_arr_.empty(); + } private: struct RingItem; @@ -52,8 +56,8 @@ class JournalSlice { std::unique_ptr shard_file_; std::optional> ring_buffer_; - util::SharedMutex cb_mu_; - std::vector> change_cb_arr_; + mutable util::SharedMutex cb_mu_; + std::vector> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_); size_t file_offset_ = 0; LSN lsn_ = 1; diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 057018a8a..f32c0c07a 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -13,6 +13,7 @@ extern "C" { #include "base/flags.h" #include "base/logging.h" #include "facade/conn_context.h" +#include "server/journal/journal.h" ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread"); @@ -70,6 +71,24 @@ void ServerState::Destroy() { state_ = nullptr; } +bool ServerState::AllowInlineScheduling() const { + // We can't allow inline scheduling during a full sync, because then journaling transactions + // will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular + // locking mechanism because RdbLoader is not using transactions. + if (gstate_ == GlobalState::LOADING) + return false; + + // Journal callbacks can preempt; This means we have to disallow inline scheduling + // because then we might interleave the callbacks loop from an inlined-scheduled command + // and a normally-scheduled command. + // The problematic loop is in JournalSlice::AddLogRecord, going over all the callbacks. + + if (journal_ && journal_->HasRegisteredCallbacks()) + return false; + + return true; +} + Interpreter* ServerState::BorrowInterpreter() { return interpreter_mgr_.Get(); } diff --git a/src/server/server_state.h b/src/server/server_state.h index 65af93cd9..f5a6caae5 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization. gstate_ = s; } + bool AllowInlineScheduling() const; + // Borrow interpreter from internal manager. Return int with ReturnInterpreter. Interpreter* BorrowInterpreter(); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 053d4fac9..e60abd491 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -688,7 +688,13 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { CHECK_GE(DecreaseRunCnt(), 1u); } }; - shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. + + if (coordinator_index_ == unique_shard_id_ && ServerState::tlocal()->AllowInlineScheduling()) { + DVLOG(2) << "Inline scheduling a transaction"; + schedule_cb(); + } else { + shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. + } } else { // This transaction either spans multiple shards and/or is multi.