opt(lua): Coordinate single-shard Lua evals in remote thread (#1845)

* opt(lua): !!WIP!! Coordinate single-shard Lua evals in remote thread

This removes the need for an additional (and costly) Fiber.
This commit is contained in:
Shahar Mike 2023-09-13 11:52:35 +03:00 committed by GitHub
parent ef0502238c
commit b91435e360
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 48 additions and 6 deletions

View file

@ -220,7 +220,9 @@ uint32_t EngineShard::DefragTask() {
}
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap),
: queue_(kQueueLen),
txq_([](const Transaction* t) { return t->txid(); }),
mi_resource_(heap),
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
fiber_q_ = MakeFiber([this, index = pb->GetIndex()] {
ThisFiber::SetName(absl::StrCat("shard_queue", index));

View file

@ -1582,14 +1582,30 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);
interpreter->SetRedisFunc([cntx, this](auto args) { CallFromScript(cntx, args); });
Interpreter::RunResult result;
optional<ShardId> sid = GetRemoteShardToRunAt(*tx);
if (sid.has_value()) {
if (tx->GetMultiMode() != Transaction::NON_ATOMIC && sid.has_value()) {
// If script runs on a single shard, we run it remotely to save hops.
pp_.at(sid.value())->Await([&]() { result = interpreter->RunFunction(eval_args.sha, &error); });
interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) {
// Disable squashing, as we're using the squashing mechanism to run remotely.
args.async = false;
CallFromScript(cntx, args);
});
boost::intrusive_ptr<Transaction> stub_tx = new Transaction{cntx->transaction};
cntx->transaction = stub_tx.get();
tx->PrepareSquashedMultiHop(registry_.Find("EVAL"), [&](ShardId id) { return id == *sid; });
tx->ScheduleSingleHop([&](Transaction* tx, EngineShard*) {
++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt;
result = interpreter->RunFunction(eval_args.sha, &error);
return OpStatus::OK;
});
cntx->transaction = tx;
} else {
++ServerState::tlocal()->stats.eval_io_coordination_cnt;
interpreter->SetRedisFunc(
[cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); });
result = interpreter->RunFunction(eval_args.sha, &error);
}
absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); };

View file

@ -378,6 +378,22 @@ TEST_F(MultiTest, Eval) {
EXPECT_EQ(resp, "12345678912345-works");
resp = Run({"eval", kGetScore, "1", "z1", "c"});
EXPECT_EQ(resp, "12.5-works");
// Multiple calls in a Lua script
EXPECT_EQ(Run({"eval",
R"(redis.call('set', 'foo', '42')
return redis.call('get', 'foo'))",
"1", "foo"}),
"42");
auto condition = [&]() { return service_->IsLocked(0, "foo"); };
auto fb = ExpectConditionWithSuspension(condition);
EXPECT_EQ(Run({"eval",
R"(redis.call('set', 'foo', '42')
return redis.call('get', 'foo'))",
"1", "foo"}),
"42");
fb.Join();
}
TEST_F(MultiTest, Watch) {

View file

@ -1211,6 +1211,8 @@ Metrics ServerFamily::GetMetrics() const {
result.conn_stats += ss->connection_stats;
result.qps += uint64_t(ss->MovingSum6());
result.ooo_tx_transaction_cnt += ss->stats.ooo_tx_cnt;
result.eval_io_coordination_cnt += ss->stats.eval_io_coordination_cnt;
result.eval_shardlocal_coordination_cnt += ss->stats.eval_shardlocal_coordination_cnt;
service_.mutable_registry()->MergeCallStats(
index, [&dest_map = result.cmd_stats_map](string_view name, const CmdCallStats& src) {
@ -1384,6 +1386,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("eval_io_coordination_total", m.eval_io_coordination_cnt);
append("eval_shardlocal_coordination_total", m.eval_shardlocal_coordination_cnt);
}
if (should_enter("TIERED", true)) {

View file

@ -65,6 +65,8 @@ struct Metrics {
size_t heap_comitted_bytes = 0;
size_t small_string_bytes = 0;
uint64_t ooo_tx_transaction_cnt = 0;
uint64_t eval_io_coordination_cnt = 0;
uint64_t eval_shardlocal_coordination_cnt = 0;
uint32_t traverse_ttl_per_sec = 0;
uint32_t delete_ttl_per_sec = 0;
bool is_master = true;

View file

@ -91,6 +91,8 @@ class ServerState { // public struct - to allow initialization.
public:
struct Stats {
uint64_t ooo_tx_cnt = 0;
uint64_t eval_io_coordination_cnt = 0;
uint64_t eval_shardlocal_coordination_cnt = 0;
};
static ServerState* tlocal() {

View file

@ -943,7 +943,7 @@ void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
}
string_view Transaction::Name() const {
return cid_->name();
return cid_ ? cid_->name() : "null-command";
}
ShardId Transaction::GetUniqueShard() const {

View file

@ -540,7 +540,7 @@ class Transaction {
std::vector<uint32_t> reverse_index_;
RunnableType* cb_ptr_ = nullptr; // Run on shard threads
const CommandId* cid_; // Underlying command
const CommandId* cid_ = nullptr; // Underlying command
std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec.
TxId txid_{0};