diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 045e713a0..8a33e43e3 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -220,7 +220,9 @@ uint32_t EngineShard::DefragTask() { } EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t* heap) - : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), + : queue_(kQueueLen), + txq_([](const Transaction* t) { return t->txid(); }), + mi_resource_(heap), db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) { fiber_q_ = MakeFiber([this, index = pb->GetIndex()] { ThisFiber::SetName(absl::StrCat("shard_queue", index)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 1300986f3..6e73965c6 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1582,14 +1582,30 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, interpreter->SetGlobalArray("KEYS", eval_args.keys); interpreter->SetGlobalArray("ARGV", eval_args.args); - interpreter->SetRedisFunc([cntx, this](auto args) { CallFromScript(cntx, args); }); Interpreter::RunResult result; optional sid = GetRemoteShardToRunAt(*tx); - if (sid.has_value()) { + if (tx->GetMultiMode() != Transaction::NON_ATOMIC && sid.has_value()) { // If script runs on a single shard, we run it remotely to save hops. - pp_.at(sid.value())->Await([&]() { result = interpreter->RunFunction(eval_args.sha, &error); }); + interpreter->SetRedisFunc([cntx, this](Interpreter::CallArgs args) { + // Disable squashing, as we're using the squashing mechanism to run remotely. + args.async = false; + CallFromScript(cntx, args); + }); + + boost::intrusive_ptr stub_tx = new Transaction{cntx->transaction}; + cntx->transaction = stub_tx.get(); + tx->PrepareSquashedMultiHop(registry_.Find("EVAL"), [&](ShardId id) { return id == *sid; }); + tx->ScheduleSingleHop([&](Transaction* tx, EngineShard*) { + ++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt; + result = interpreter->RunFunction(eval_args.sha, &error); + return OpStatus::OK; + }); + cntx->transaction = tx; } else { + ++ServerState::tlocal()->stats.eval_io_coordination_cnt; + interpreter->SetRedisFunc( + [cntx, this](Interpreter::CallArgs args) { CallFromScript(cntx, args); }); result = interpreter->RunFunction(eval_args.sha, &error); } absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); }; diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index af851bc5f..f948bf789 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -378,6 +378,22 @@ TEST_F(MultiTest, Eval) { EXPECT_EQ(resp, "12345678912345-works"); resp = Run({"eval", kGetScore, "1", "z1", "c"}); EXPECT_EQ(resp, "12.5-works"); + + // Multiple calls in a Lua script + EXPECT_EQ(Run({"eval", + R"(redis.call('set', 'foo', '42') + return redis.call('get', 'foo'))", + "1", "foo"}), + "42"); + + auto condition = [&]() { return service_->IsLocked(0, "foo"); }; + auto fb = ExpectConditionWithSuspension(condition); + EXPECT_EQ(Run({"eval", + R"(redis.call('set', 'foo', '42') + return redis.call('get', 'foo'))", + "1", "foo"}), + "42"); + fb.Join(); } TEST_F(MultiTest, Watch) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 3aea9e90d..22529db04 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1211,6 +1211,8 @@ Metrics ServerFamily::GetMetrics() const { result.conn_stats += ss->connection_stats; result.qps += uint64_t(ss->MovingSum6()); result.ooo_tx_transaction_cnt += ss->stats.ooo_tx_cnt; + result.eval_io_coordination_cnt += ss->stats.eval_io_coordination_cnt; + result.eval_shardlocal_coordination_cnt += ss->stats.eval_shardlocal_coordination_cnt; service_.mutable_registry()->MergeCallStats( index, [&dest_map = result.cmd_stats_map](string_view name, const CmdCallStats& src) { @@ -1384,6 +1386,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("defrag_attempt_total", m.shard_stats.defrag_attempt_total); append("defrag_realloc_total", m.shard_stats.defrag_realloc_total); append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); + append("eval_io_coordination_total", m.eval_io_coordination_cnt); + append("eval_shardlocal_coordination_total", m.eval_shardlocal_coordination_cnt); } if (should_enter("TIERED", true)) { diff --git a/src/server/server_family.h b/src/server/server_family.h index c67cd15c2..5d386cc16 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -65,6 +65,8 @@ struct Metrics { size_t heap_comitted_bytes = 0; size_t small_string_bytes = 0; uint64_t ooo_tx_transaction_cnt = 0; + uint64_t eval_io_coordination_cnt = 0; + uint64_t eval_shardlocal_coordination_cnt = 0; uint32_t traverse_ttl_per_sec = 0; uint32_t delete_ttl_per_sec = 0; bool is_master = true; diff --git a/src/server/server_state.h b/src/server/server_state.h index d100d5e71..fa7d13dee 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -91,6 +91,8 @@ class ServerState { // public struct - to allow initialization. public: struct Stats { uint64_t ooo_tx_cnt = 0; + uint64_t eval_io_coordination_cnt = 0; + uint64_t eval_shardlocal_coordination_cnt = 0; }; static ServerState* tlocal() { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0a940728d..73eeee18a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -943,7 +943,7 @@ void Transaction::ExpireBlocking(WaitKeysProvider wcb) { } string_view Transaction::Name() const { - return cid_->name(); + return cid_ ? cid_->name() : "null-command"; } ShardId Transaction::GetUniqueShard() const { diff --git a/src/server/transaction.h b/src/server/transaction.h index ac6df7409..05d4fec79 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -540,7 +540,7 @@ class Transaction { std::vector reverse_index_; RunnableType* cb_ptr_ = nullptr; // Run on shard threads - const CommandId* cid_; // Underlying command + const CommandId* cid_ = nullptr; // Underlying command std::unique_ptr multi_; // Initialized when the transaction is multi/exec. TxId txid_{0};