opt(lua): Avoid separate hops for lock & unlock on single shard execution (#1900)

This commit is contained in:
Shahar Mike 2023-09-22 14:09:17 +03:00 committed by GitHub
parent 8cc448f6b4
commit 2091f53777
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 78 additions and 44 deletions

View file

@ -605,27 +605,6 @@ optional<Transaction::MultiMode> DeduceExecMode(ExecEvalState state,
return multi_mode;
}
optional<ShardId> GetRemoteShardToRunAt(const Transaction& tx) {
if (tx.GetMultiMode() != Transaction::LOCK_AHEAD) {
return nullopt;
}
if (tx.GetUniqueShardCnt() != 1) {
return nullopt;
}
// At this point `tx` can run on a single shard, but we only return `sid` if that shard !=
// current shard.
ShardId sid = tx.GetUniqueShard();
if (ServerState::tlocal()->thread_index() == sid) {
// Same shard, so no point in an extra Await() and a new Fiber
return nullopt;
}
return sid;
}
} // namespace
Service::Service(ProactorPool* pp)
@ -1443,7 +1422,7 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter
ev_args.args = args.subspan(2 + num_keys);
uint64_t start = absl::GetCurrentTimeNanos();
EvalInternal(ev_args, interpreter, cntx);
EvalInternal(args, ev_args, interpreter, cntx);
uint64_t end = absl::GetCurrentTimeNanos();
ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000);
@ -1548,7 +1527,27 @@ std::pair<const CommandId*, CmdArgList> Service::FindCmd(CmdArgList args) const
return {res, args.subspan(1)};
}
void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::ScriptParams& params,
const Transaction& tx) {
if (!sid.has_value()) {
return false;
}
if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) {
return false;
}
if (tx.GetMultiMode() != Transaction::NOT_DETERMINED) {
// We may be running EVAL under MULTI. Currently RunSingleShardMulti() will attempt to lock
// keys, in which case will be already locked by MULTI. We could optimize this path as well
// though.
return false;
}
return true;
}
void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
@ -1565,52 +1564,70 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
DCHECK(!cntx->conn_state.script_info); // we should not call eval from the script.
optional<ShardId> sid;
// TODO: to determine whether the script is RO by scanning all "redis.p?call" calls
// and checking whether all invocations consist of RO commands.
// we can do it once during script insertion into script mgr.
auto& sinfo = cntx->conn_state.script_info;
sinfo = make_unique<ConnectionState::ScriptInfo>();
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
sinfo->keys.insert(KeyLockArgs::GetLockKey(ArgS(eval_args.keys, i)));
string_view key = KeyLockArgs::GetLockKey(ArgS(eval_args.keys, i));
sinfo->keys.insert(key);
ShardId cur_sid = Shard(key, shard_count());
if (i == 0) {
sid = cur_sid;
}
if (sid.has_value() && *sid != cur_sid) {
sid = nullopt;
}
}
sinfo->async_cmds_heap_limit = absl::GetFlag(FLAGS_multi_eval_squash_buffer);
Transaction* tx = cntx->transaction;
CHECK(tx != nullptr);
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}
interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);
absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); };
Interpreter::RunResult result;
optional<ShardId> sid = GetRemoteShardToRunAt(*tx);
if (tx->GetMultiMode() != Transaction::NON_ATOMIC && sid.has_value()) {
if (CanRunSingleShardMulti(sid, *params, *tx)) {
// If script runs on a single shard, we run it remotely to save hops.
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
interpreter->SetRedisFunc([cntx, tx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
CallFromScript(cntx, args);
});
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{cntx->transaction};
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{tx};
cntx->transaction = stub_tx.get();
tx->PrepareSquashedMultiHop(registry_.Find("EVAL"), [&](ShardId id) { return id == *sid; });
tx->ScheduleSingleHop([&](Transaction* tx, EngineShard*) {
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
tx->PrepareMultiForScheduleSingleHop(*sid, 0, args);
tx->ScheduleSingleHop([&](Transaction*, EngineShard*) {
result = interpreter->RunFunction(eval_args.sha, &error);
return OpStatus::OK;
});
cntx->transaction = tx;
} else {
optional<bool> scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx);
if (!scheduled) {
return;
}
++ServerState::tlocal()->stats.eval_io_coordination_cnt;
interpreter->SetRedisFunc(
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });
result = interpreter->RunFunction(eval_args.sha, &error);
// Conclude the transaction.
if (*scheduled)
cntx->transaction->UnlockMulti();
}
absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); };
if (auto err = FlushEvalAsyncCmds(cntx, true); err) {
auto err_ref = CapturingReplyBuilder::GetError(*err);
@ -1620,10 +1637,6 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
cntx->conn_state.script_info.reset(); // reset script_info
// Conclude the transaction.
if (*scheduled)
cntx->transaction->UnlockMulti();
if (result == Interpreter::RUN_ERR) {
string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error);
return (*cntx)->SendError(resp, facade::kScriptErrType);

View file

@ -154,7 +154,8 @@ class Service : public facade::ServiceInterface {
std::optional<facade::ErrorReply> CheckKeysOwnership(const CommandId* cid, CmdArgList args,
const ConnectionContext& dfly_cntx);
void EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx);
void EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx);
void CallSHA(CmdArgList args, std::string_view sha, Interpreter* interpreter,
ConnectionContext* cntx);

View file

@ -548,7 +548,8 @@ TEST_F(MultiTest, EvalOOO) {
}
auto metrics = GetMetrics();
EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt);
EXPECT_EQ(1 + 2 * kTimes,
metrics.eval_io_coordination_cnt + metrics.eval_shardlocal_coordination_cnt);
}
// Run MULTI/EXEC commands in parallel, where each command is:

View file

@ -54,7 +54,12 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
Transaction::Transaction(const Transaction* parent)
: multi_{make_unique<MultiData>()}, txid_{parent->txid()} {
multi_->mode = parent->multi_->mode;
if (parent->multi_) {
multi_->mode = parent->multi_->mode;
} else {
// Use squashing mechanism for inline execution of single-shard EVAL
multi_->mode = LOCK_AHEAD;
}
multi_->role = SQUASHED_STUB;
time_now_ms_ = parent->time_now_ms_;
@ -405,6 +410,15 @@ string Transaction::DebugId() const {
return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")");
}
void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) {
multi_.reset();
InitBase(db, args);
EnableShard(sid);
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
CHECK(key_index);
StoreKeysInArgs(*key_index, false);
}
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);

View file

@ -306,6 +306,11 @@ class Transaction {
std::string DebugId() const;
// Prepares for running ScheduleSingleHop() for a single-shard multi tx.
// It is safe to call ScheduleSingleHop() after calling this method, but the callback passed
// to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);
// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final