mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feature(server): Bring back inline scheduling (#1130)
* feat: run tx-schedule inline if the dest shard is on the same thread (#908) The optimization is applied within ScheduleSingleHop call. Signed-off-by: Roman Gershman <roman@dragonflydb.io> * fix(server): Don't inline schedule when in LOADING * Fix the another pre-emption bug with inline scheduling * Better locking around journal callbacks --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io> Co-authored-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
6697478a28
commit
7adf3799f0
9 changed files with 51 additions and 11 deletions
|
@ -50,9 +50,9 @@ class Transaction;
|
||||||
class EngineShard;
|
class EngineShard;
|
||||||
|
|
||||||
struct KeyLockArgs {
|
struct KeyLockArgs {
|
||||||
DbIndex db_index;
|
DbIndex db_index = 0;
|
||||||
ArgSlice args;
|
ArgSlice args;
|
||||||
unsigned key_step;
|
unsigned key_step = 1;
|
||||||
};
|
};
|
||||||
|
|
||||||
// Describes key indices.
|
// Describes key indices.
|
||||||
|
|
|
@ -346,6 +346,7 @@ void DebugCmd::PopulateRangeFiber(uint64_t from, uint64_t len, std::string_view
|
||||||
void DebugCmd::Inspect(string_view key) {
|
void DebugCmd::Inspect(string_view key) {
|
||||||
EngineShardSet& ess = *shard_set;
|
EngineShardSet& ess = *shard_set;
|
||||||
ShardId sid = Shard(key, ess.size());
|
ShardId sid = Shard(key, ess.size());
|
||||||
|
VLOG(1) << "DebugCmd::Inspect " << key;
|
||||||
|
|
||||||
auto cb = [&]() -> ObjInfo {
|
auto cb = [&]() -> ObjInfo {
|
||||||
auto& db_slice = EngineShard::tlocal()->db_slice();
|
auto& db_slice = EngineShard::tlocal()->db_slice();
|
||||||
|
@ -376,7 +377,9 @@ void DebugCmd::Inspect(string_view key) {
|
||||||
|
|
||||||
KeyLockArgs lock_args;
|
KeyLockArgs lock_args;
|
||||||
lock_args.args = ArgSlice{&key, 1};
|
lock_args.args = ArgSlice{&key, 1};
|
||||||
|
lock_args.key_step = 1;
|
||||||
lock_args.db_index = cntx_->db_index();
|
lock_args.db_index = cntx_->db_index();
|
||||||
|
|
||||||
if (!db_slice.CheckLock(IntentLock::EXCLUSIVE, lock_args)) {
|
if (!db_slice.CheckLock(IntentLock::EXCLUSIVE, lock_args)) {
|
||||||
oinfo.lock_status =
|
oinfo.lock_status =
|
||||||
db_slice.CheckLock(IntentLock::SHARED, lock_args) ? ObjInfo::S : ObjInfo::X;
|
db_slice.CheckLock(IntentLock::SHARED, lock_args) ? ObjInfo::S : ObjInfo::X;
|
||||||
|
|
|
@ -86,6 +86,10 @@ void Journal::UnregisterOnChange(uint32_t id) {
|
||||||
journal_slice.UnregisterOnChange(id);
|
journal_slice.UnregisterOnChange(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool Journal::HasRegisteredCallbacks() const {
|
||||||
|
return journal_slice.HasRegisteredCallbacks();
|
||||||
|
}
|
||||||
|
|
||||||
LSN Journal::GetLsn() const {
|
LSN Journal::GetLsn() const {
|
||||||
return journal_slice.cur_lsn();
|
return journal_slice.cur_lsn();
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,6 +33,7 @@ class Journal {
|
||||||
|
|
||||||
uint32_t RegisterOnChange(ChangeCallback cb);
|
uint32_t RegisterOnChange(ChangeCallback cb);
|
||||||
void UnregisterOnChange(uint32_t id);
|
void UnregisterOnChange(uint32_t id);
|
||||||
|
bool HasRegisteredCallbacks() const;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
void AddCmd(TxId txid, Op opcode, Span args) {
|
void AddCmd(TxId txid, Op opcode, Span args) {
|
||||||
|
|
|
@ -114,14 +114,15 @@ error_code JournalSlice::Close() {
|
||||||
|
|
||||||
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
|
||||||
DCHECK(ring_buffer_);
|
DCHECK(ring_buffer_);
|
||||||
cb_mu_.lock_shared();
|
{
|
||||||
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
|
std::shared_lock lk(cb_mu_);
|
||||||
<< " num callbacks: " << change_cb_arr_.size();
|
DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString()
|
||||||
|
<< " num callbacks: " << change_cb_arr_.size();
|
||||||
|
|
||||||
for (const auto& k_v : change_cb_arr_) {
|
for (const auto& k_v : change_cb_arr_) {
|
||||||
k_v.second(entry, await);
|
k_v.second(entry, await);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
cb_mu_.unlock_shared();
|
|
||||||
|
|
||||||
// TODO: This is preparation for AOC style journaling, currently unused.
|
// TODO: This is preparation for AOC style journaling, currently unused.
|
||||||
RingItem item;
|
RingItem item;
|
||||||
|
|
|
@ -44,6 +44,10 @@ class JournalSlice {
|
||||||
|
|
||||||
uint32_t RegisterOnChange(ChangeCallback cb);
|
uint32_t RegisterOnChange(ChangeCallback cb);
|
||||||
void UnregisterOnChange(uint32_t);
|
void UnregisterOnChange(uint32_t);
|
||||||
|
bool HasRegisteredCallbacks() const {
|
||||||
|
std::shared_lock lk(cb_mu_);
|
||||||
|
return !change_cb_arr_.empty();
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
struct RingItem;
|
struct RingItem;
|
||||||
|
@ -52,8 +56,8 @@ class JournalSlice {
|
||||||
std::unique_ptr<LinuxFile> shard_file_;
|
std::unique_ptr<LinuxFile> shard_file_;
|
||||||
std::optional<base::RingBuffer<RingItem>> ring_buffer_;
|
std::optional<base::RingBuffer<RingItem>> ring_buffer_;
|
||||||
|
|
||||||
util::SharedMutex cb_mu_;
|
mutable util::SharedMutex cb_mu_;
|
||||||
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;
|
std::vector<std::pair<uint32_t, ChangeCallback>> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_);
|
||||||
|
|
||||||
size_t file_offset_ = 0;
|
size_t file_offset_ = 0;
|
||||||
LSN lsn_ = 1;
|
LSN lsn_ = 1;
|
||||||
|
|
|
@ -13,6 +13,7 @@ extern "C" {
|
||||||
#include "base/flags.h"
|
#include "base/flags.h"
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
#include "facade/conn_context.h"
|
#include "facade/conn_context.h"
|
||||||
|
#include "server/journal/journal.h"
|
||||||
|
|
||||||
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
|
ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread");
|
||||||
|
|
||||||
|
@ -70,6 +71,24 @@ void ServerState::Destroy() {
|
||||||
state_ = nullptr;
|
state_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool ServerState::AllowInlineScheduling() const {
|
||||||
|
// We can't allow inline scheduling during a full sync, because then journaling transactions
|
||||||
|
// will be scheduled before RdbLoader::LoadItemsBuffer is finished. We can't use the regular
|
||||||
|
// locking mechanism because RdbLoader is not using transactions.
|
||||||
|
if (gstate_ == GlobalState::LOADING)
|
||||||
|
return false;
|
||||||
|
|
||||||
|
// Journal callbacks can preempt; This means we have to disallow inline scheduling
|
||||||
|
// because then we might interleave the callbacks loop from an inlined-scheduled command
|
||||||
|
// and a normally-scheduled command.
|
||||||
|
// The problematic loop is in JournalSlice::AddLogRecord, going over all the callbacks.
|
||||||
|
|
||||||
|
if (journal_ && journal_->HasRegisteredCallbacks())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
Interpreter* ServerState::BorrowInterpreter() {
|
Interpreter* ServerState::BorrowInterpreter() {
|
||||||
return interpreter_mgr_.Get();
|
return interpreter_mgr_.Get();
|
||||||
}
|
}
|
||||||
|
|
|
@ -130,6 +130,8 @@ class ServerState { // public struct - to allow initialization.
|
||||||
gstate_ = s;
|
gstate_ = s;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool AllowInlineScheduling() const;
|
||||||
|
|
||||||
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
|
// Borrow interpreter from internal manager. Return int with ReturnInterpreter.
|
||||||
Interpreter* BorrowInterpreter();
|
Interpreter* BorrowInterpreter();
|
||||||
|
|
||||||
|
|
|
@ -688,7 +688,13 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
|
||||||
|
if (coordinator_index_ == unique_shard_id_ && ServerState::tlocal()->AllowInlineScheduling()) {
|
||||||
|
DVLOG(2) << "Inline scheduling a transaction";
|
||||||
|
schedule_cb();
|
||||||
|
} else {
|
||||||
|
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
// This transaction either spans multiple shards and/or is multi.
|
// This transaction either spans multiple shards and/or is multi.
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue