From 2091f53777241a34778b86c4f848fc3307a55e99 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Fri, 22 Sep 2023 14:09:17 +0300 Subject: [PATCH] opt(lua): Avoid separate hops for lock & unlock on single shard execution (#1900) --- src/server/main_service.cc | 95 ++++++++++++++++++++++---------------- src/server/main_service.h | 3 +- src/server/multi_test.cc | 3 +- src/server/transaction.cc | 16 ++++++- src/server/transaction.h | 5 ++ 5 files changed, 78 insertions(+), 44 deletions(-) diff --git a/src/server/main_service.cc b/src/server/main_service.cc index f1834f688..cbd6f6ca3 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -605,27 +605,6 @@ optional DeduceExecMode(ExecEvalState state, return multi_mode; } -optional GetRemoteShardToRunAt(const Transaction& tx) { - if (tx.GetMultiMode() != Transaction::LOCK_AHEAD) { - return nullopt; - } - - if (tx.GetUniqueShardCnt() != 1) { - return nullopt; - } - - // At this point `tx` can run on a single shard, but we only return `sid` if that shard != - // current shard. - - ShardId sid = tx.GetUniqueShard(); - - if (ServerState::tlocal()->thread_index() == sid) { - // Same shard, so no point in an extra Await() and a new Fiber - return nullopt; - } - - return sid; -} } // namespace Service::Service(ProactorPool* pp) @@ -1443,7 +1422,7 @@ void Service::CallSHA(CmdArgList args, string_view sha, Interpreter* interpreter ev_args.args = args.subspan(2 + num_keys); uint64_t start = absl::GetCurrentTimeNanos(); - EvalInternal(ev_args, interpreter, cntx); + EvalInternal(args, ev_args, interpreter, cntx); uint64_t end = absl::GetCurrentTimeNanos(); ServerState::tlocal()->RecordCallLatency(sha, (end - start) / 1000); @@ -1548,7 +1527,27 @@ std::pair Service::FindCmd(CmdArgList args) const return {res, args.subspan(1)}; } -void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, +static bool CanRunSingleShardMulti(optional sid, const ScriptMgr::ScriptParams& params, + const Transaction& tx) { + if (!sid.has_value()) { + return false; + } + + if (DetermineMultiMode(params) != Transaction::LOCK_AHEAD) { + return false; + } + + if (tx.GetMultiMode() != Transaction::NOT_DETERMINED) { + // We may be running EVAL under MULTI. Currently RunSingleShardMulti() will attempt to lock + // keys, in which case will be already locked by MULTI. We could optimize this path as well + // though. + return false; + } + + return true; +} + +void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx) { DCHECK(!eval_args.sha.empty()); @@ -1565,52 +1564,70 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, DCHECK(!cntx->conn_state.script_info); // we should not call eval from the script. + optional sid; + // TODO: to determine whether the script is RO by scanning all "redis.p?call" calls // and checking whether all invocations consist of RO commands. // we can do it once during script insertion into script mgr. auto& sinfo = cntx->conn_state.script_info; sinfo = make_unique(); for (size_t i = 0; i < eval_args.keys.size(); ++i) { - sinfo->keys.insert(KeyLockArgs::GetLockKey(ArgS(eval_args.keys, i))); + string_view key = KeyLockArgs::GetLockKey(ArgS(eval_args.keys, i)); + sinfo->keys.insert(key); + + ShardId cur_sid = Shard(key, shard_count()); + if (i == 0) { + sid = cur_sid; + } + if (sid.has_value() && *sid != cur_sid) { + sid = nullopt; + } } + sinfo->async_cmds_heap_limit = absl::GetFlag(FLAGS_multi_eval_squash_buffer); Transaction* tx = cntx->transaction; CHECK(tx != nullptr); - optional scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx); - if (!scheduled) { - return; - } - interpreter->SetGlobalArray("KEYS", eval_args.keys); interpreter->SetGlobalArray("ARGV", eval_args.args); + absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); }; Interpreter::RunResult result; - optional sid = GetRemoteShardToRunAt(*tx); - if (tx->GetMultiMode() != Transaction::NON_ATOMIC && sid.has_value()) { + + if (CanRunSingleShardMulti(sid, *params, *tx)) { // If script runs on a single shard, we run it remotely to save hops. - interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) { + interpreter->SetRedisFunc([cntx, tx, this](Interpreter::CallArgs args) { // Disable squashing, as we're using the squashing mechanism to run remotely. args.async = false; CallFromScript(cntx, args); }); - boost::intrusive_ptr stub_tx = new Transaction{cntx->transaction}; + ++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt; + boost::intrusive_ptr stub_tx = new Transaction{tx}; cntx->transaction = stub_tx.get(); - tx->PrepareSquashedMultiHop(registry_.Find("EVAL"), [&](ShardId id) { return id == *sid; }); - tx->ScheduleSingleHop([&](Transaction* tx, EngineShard*) { - ++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt; + + tx->PrepareMultiForScheduleSingleHop(*sid, 0, args); + tx->ScheduleSingleHop([&](Transaction*, EngineShard*) { result = interpreter->RunFunction(eval_args.sha, &error); return OpStatus::OK; }); + cntx->transaction = tx; } else { + optional scheduled = StartMultiEval(cntx->db_index(), eval_args.keys, *params, cntx); + if (!scheduled) { + return; + } + ++ServerState::tlocal()->stats.eval_io_coordination_cnt; interpreter->SetRedisFunc( [cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); }); result = interpreter->RunFunction(eval_args.sha, &error); + + // Conclude the transaction. + if (*scheduled) + cntx->transaction->UnlockMulti(); } - absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); }; if (auto err = FlushEvalAsyncCmds(cntx, true); err) { auto err_ref = CapturingReplyBuilder::GetError(*err); @@ -1620,10 +1637,6 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, cntx->conn_state.script_info.reset(); // reset script_info - // Conclude the transaction. - if (*scheduled) - cntx->transaction->UnlockMulti(); - if (result == Interpreter::RUN_ERR) { string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error); return (*cntx)->SendError(resp, facade::kScriptErrType); diff --git a/src/server/main_service.h b/src/server/main_service.h index 8931405b6..d2c11861b 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -154,7 +154,8 @@ class Service : public facade::ServiceInterface { std::optional CheckKeysOwnership(const CommandId* cid, CmdArgList args, const ConnectionContext& dfly_cntx); - void EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx); + void EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter, + ConnectionContext* cntx); void CallSHA(CmdArgList args, std::string_view sha, Interpreter* interpreter, ConnectionContext* cntx); diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index f948bf789..1d49bdc2a 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -548,7 +548,8 @@ TEST_F(MultiTest, EvalOOO) { } auto metrics = GetMetrics(); - EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt); + EXPECT_EQ(1 + 2 * kTimes, + metrics.eval_io_coordination_cnt + metrics.eval_shardlocal_coordination_cnt); } // Run MULTI/EXEC commands in parallel, where each command is: diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 28bc05f5d..1c1d1a094 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -54,7 +54,12 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { Transaction::Transaction(const Transaction* parent) : multi_{make_unique()}, txid_{parent->txid()} { - multi_->mode = parent->multi_->mode; + if (parent->multi_) { + multi_->mode = parent->multi_->mode; + } else { + // Use squashing mechanism for inline execution of single-shard EVAL + multi_->mode = LOCK_AHEAD; + } multi_->role = SQUASHED_STUB; time_now_ms_ = parent->time_now_ms_; @@ -405,6 +410,15 @@ string Transaction::DebugId() const { return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); } +void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args) { + multi_.reset(); + InitBase(db, args); + EnableShard(sid); + OpResult key_index = DetermineKeys(cid_, args); + CHECK(key_index); + StoreKeysInArgs(*key_index, false); +} + // Runs in the dbslice thread. Returns true if the transaction continues running in the thread. bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); diff --git a/src/server/transaction.h b/src/server/transaction.h index abaf0002d..e26a37352 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -306,6 +306,11 @@ class Transaction { std::string DebugId() const; + // Prepares for running ScheduleSingleHop() for a single-shard multi tx. + // It is safe to call ScheduleSingleHop() after calling this method, but the callback passed + // to it must not block. + void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args); + // Write a journal entry to a shard journal with the given payload. When logging a non-automatic // journal command, multiple journal entries may be necessary. In this case, call with set // multi_commands to true and call the FinishLogJournalOnShard function after logging the final