From 83a12b99c655c6252f1146e69084de378c381bb7 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 6 Feb 2024 13:54:14 +0300 Subject: [PATCH] fix: fix interpreter acquisition with MULTI (#2549) * fix: fix interpreter acquisition with MULTI --------- Signed-off-by: Vladislav Oleshko --- src/core/interpreter.cc | 3 ++ src/core/interpreter.h | 2 +- src/server/conn_context.cc | 1 + src/server/main_service.cc | 78 ++++++++++++++++++++++-------------- src/server/server_family.cc | 1 + src/server/server_state.cc | 9 ++++- src/server/server_state.h | 5 ++- tests/dragonfly/eval_test.py | 49 ++++++++++++++++++++++ 8 files changed, 114 insertions(+), 34 deletions(-) diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index f8c6db6ad..5fc4a7371 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -885,12 +885,15 @@ Interpreter* InterpreterManager::Get() { } waker_.await([this]() { return available_.size() > 0; }); + Interpreter* ir = available_.back(); available_.pop_back(); return ir; } void InterpreterManager::Return(Interpreter* ir) { + DCHECK_LE(storage_.data(), ir); // ensure the pointer + DCHECK_GE(storage_.data() + storage_.size(), ir); // belongs to storage_ available_.push_back(ir); waker_.notify(); } diff --git a/src/core/interpreter.h b/src/core/interpreter.h index b550710ff..043435800 100644 --- a/src/core/interpreter.h +++ b/src/core/interpreter.h @@ -146,9 +146,9 @@ class InterpreterManager { // Borrow interpreter. Always return it after usage. Interpreter* Get(); - void Return(Interpreter*); + // Clear all interpreters, keeps capacity. Waits until all are returned. void Reset(); private: diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index a6286fb1a..8763ecf16 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -252,6 +252,7 @@ size_t ConnectionContext::UsedMemory() const { } void ConnectionState::ExecInfo::Clear() { + DCHECK(!preborrowed_interpreter); // Must have been released properly state = EXEC_INACTIVE; body.clear(); is_write = false; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 268e81dbe..45f819dff 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -128,20 +128,22 @@ constexpr size_t kMaxThreadSize = 1024; // Unwatch all keys for a connection and unregister from DbSlices. // Used by UNWATCH, DICARD and EXEC. -void UnwatchAllKeys(ConnectionContext* cntx) { - auto& exec_info = cntx->conn_state.exec_info; - if (!exec_info.watched_keys.empty()) { - auto cb = [&](EngineShard* shard) { - shard->db_slice().UnregisterConnectionWatches(&exec_info); - }; +void UnwatchAllKeys(ConnectionState::ExecInfo* exec_info) { + if (!exec_info->watched_keys.empty()) { + auto cb = [&](EngineShard* shard) { shard->db_slice().UnregisterConnectionWatches(exec_info); }; shard_set->RunBriefInParallel(std::move(cb)); } - exec_info.ClearWatched(); + exec_info->ClearWatched(); } void MultiCleanup(ConnectionContext* cntx) { - UnwatchAllKeys(cntx); - cntx->conn_state.exec_info.Clear(); + auto& exec_info = cntx->conn_state.exec_info; + if (auto* borrowed = exec_info.preborrowed_interpreter; borrowed) { + ServerState::tlocal()->ReturnInterpreter(borrowed); + exec_info.preborrowed_interpreter = nullptr; + } + UnwatchAllKeys(&exec_info); + exec_info.Clear(); } void DeactivateMonitoring(ConnectionContext* server_ctx) { @@ -691,13 +693,24 @@ string CreateExecDescriptor(const std::vector& stored_cmds, unsigned return result; } -// Either take the interpreter from the preborrowed multi exec transaction or borrow one. +// Ensures availability of an interpreter for EVAL-like commands and it's automatic release. +// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired. struct BorrowedInterpreter { explicit BorrowedInterpreter(ConnectionContext* cntx) { + // Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our + // preborrowed interpreter (which can't be shared on multiple threads). + CHECK(!cntx->conn_state.squashing_info); + if (auto borrowed = cntx->conn_state.exec_info.preborrowed_interpreter; borrowed) { - DCHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING); + // Ensure a preborrowed interpreter is only set for an already running MULTI transaction. + CHECK_EQ(cntx->conn_state.exec_info.state, ConnectionState::ExecInfo::EXEC_RUNNING); + interpreter_ = borrowed; } else { + // A scheduled transaction occupies a place in the transaction queue and holds locks, + // preventing other transactions from progressing. Blocking below can deadlock! + CHECK(!cntx->transaction->IsScheduled()); + interpreter_ = ServerState::tlocal()->BorrowInterpreter(); owned_ = true; } @@ -708,6 +721,13 @@ struct BorrowedInterpreter { ServerState::tlocal()->ReturnInterpreter(interpreter_); } + // Give up ownership of the interpreter, it must be returned manually. + Interpreter* Release() && { + DCHECK(owned_); + owned_ = false; + return interpreter_; + } + operator Interpreter*() { return interpreter_; } @@ -1538,7 +1558,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) { } void Service::Unwatch(CmdArgList args, ConnectionContext* cntx) { - UnwatchAllKeys(cntx); + UnwatchAllKeys(&cntx->conn_state.exec_info); return cntx->SendOk(); } @@ -2007,14 +2027,16 @@ void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* void Service::Exec(CmdArgList args, ConnectionContext* cntx) { auto* rb = static_cast(cntx->reply_builder()); + auto& exec_info = cntx->conn_state.exec_info; + // Clean the context no matter the outcome absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); }; - if (!cntx->conn_state.exec_info.IsCollecting()) { + // Check basic invariants + if (!exec_info.IsCollecting()) { return rb->SendError("EXEC without MULTI"); } - auto& exec_info = cntx->conn_state.exec_info; if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) { return rb->SendError("Dragonfly does not allow WATCH and EXEC on different databases"); } @@ -2028,12 +2050,17 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { } cntx->last_command_debug.exec_body_len = exec_info.body.size(); - const CommandId* const exec_cid = cntx->cid; - CmdArgVec arg_vec; + + // The transaction can contain scripts, determine their presence ahead to customize logic below. ExecEvalState state = DetermineEvalPresense(exec_info.body); - // We adjust the atomicity level of multi transaction inside StartMultiExec. i.e if multi mode is - // lock ahead and we run global script in the transaction then multi mode will be global. + // We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup + if (state != ExecEvalState::NONE) { + exec_info.preborrowed_interpreter = BorrowedInterpreter(cntx).Release(); + } + + // Determine according multi mode, not only only flag, but based on presence of global commands + // and scripts optional multi_mode = DeduceExecMode(state, exec_info, *script_mgr()); if (!multi_mode) return rb->SendError( @@ -2059,9 +2086,6 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(exec_info.body.size()); - if (state != ExecEvalState::NONE) - exec_info.preborrowed_interpreter = ServerState::tlocal()->BorrowInterpreter(); - if (!exec_info.body.empty()) { if (GetFlag(FLAGS_track_exec_frequencies)) { string descr = CreateExecDescriptor(exec_info.body, cntx->transaction->GetUniqueShardCnt()); @@ -2071,6 +2095,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx, this); } else { + CmdArgVec arg_vec; for (auto& scmd : exec_info.body) { VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs(); @@ -2097,19 +2122,12 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { } } - if (exec_info.preborrowed_interpreter) { - // Use SafeTLocal() to avoid accessing the wrong thread local instance - ServerState::SafeTLocal()->ReturnInterpreter(exec_info.preborrowed_interpreter); - exec_info.preborrowed_interpreter = nullptr; - } - if (scheduled) { VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands"; cntx->transaction->UnlockMulti(); } - cntx->cid = exec_cid; - + cntx->cid = exec_cid_; VLOG(1) << "Exec completed"; } @@ -2413,7 +2431,7 @@ void Service::OnClose(facade::ConnectionContext* cntx) { DCHECK(!conn_state.subscribe_info); } - UnwatchAllKeys(server_cntx); + UnwatchAllKeys(&conn_state.exec_info); DeactivateMonitoring(server_cntx); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 9013d6740..207f96980 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1868,6 +1868,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); append("reply_count", reply_stats.send_stats.count); append("reply_latency_usec", reply_stats.send_stats.total_duration); + append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter); } if (should_enter("TIERED", true)) { diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 1a75268e5..800be5dd5 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -48,7 +48,7 @@ auto ServerState::Stats::operator=(Stats&& other) -> Stats& { } ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerState::Stats& other) { - static_assert(sizeof(Stats) == 12 * 8, "Stats size mismatch"); + static_assert(sizeof(Stats) == 13 * 8, "Stats size mismatch"); for (int i = 0; i < NUM_TX_TYPES; ++i) { this->tx_type_cnt[i] += other.tx_type_cnt[i]; @@ -63,6 +63,8 @@ ServerState::Stats& ServerState::Stats::Add(unsigned num_shards, const ServerSta this->multi_squash_exec_hop_usec += other.multi_squash_exec_hop_usec; this->multi_squash_exec_reply_usec += other.multi_squash_exec_reply_usec; + this->blocked_on_interpreter += other.blocked_on_interpreter; + if (this->tx_width_freq_arr == nullptr) { this->tx_width_freq_arr = new uint64_t[num_shards]; std::copy_n(other.tx_width_freq_arr, num_shards, this->tx_width_freq_arr); @@ -174,7 +176,10 @@ bool ServerState::IsPaused() const { } Interpreter* ServerState::BorrowInterpreter() { - return interpreter_mgr_.Get(); + stats.blocked_on_interpreter++; + auto* ptr = interpreter_mgr_.Get(); + stats.blocked_on_interpreter--; + return ptr; } void ServerState::ReturnInterpreter(Interpreter* ir) { diff --git a/src/server/server_state.h b/src/server/server_state.h index 1e915d1fc..6ca2ea969 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -106,6 +106,8 @@ class ServerState { // public struct - to allow initialization. uint64_t multi_squash_exec_hop_usec = 0; uint64_t multi_squash_exec_reply_usec = 0; + uint64_t blocked_on_interpreter = 0; + // Array of size of number of shards. // Each entry is how many transactions we had with this width (unique_shard_cnt). uint64_t* tx_width_freq_arr = nullptr; @@ -175,7 +177,8 @@ class ServerState { // public struct - to allow initialization. bool AllowInlineScheduling() const; - // Borrow interpreter from internal manager. Return int with ReturnInterpreter. + // Borrow interpreter from interpreter pool, return it with ReturnInterpreter. + // Will block if no interpreters are aviable. Use with caution! Interpreter* BorrowInterpreter(); // Return interpreter to internal manager to be re-used. diff --git a/tests/dragonfly/eval_test.py b/tests/dragonfly/eval_test.py index f687a16e4..4eac5b1af 100644 --- a/tests/dragonfly/eval_test.py +++ b/tests/dragonfly/eval_test.py @@ -1,10 +1,13 @@ import asyncio +import async_timeout from redis import asyncio as aioredis import time import json import pytest import random import itertools +import random +import string from . import dfly_args, dfly_multi_test_args DJANGO_CACHEOPS_SCRIPT = """ @@ -264,3 +267,49 @@ async def test_lua_auto_async(async_client: aioredis.Redis): flushes = (await async_client.info("transaction"))["eval_squashed_flushes"] assert 1 <= flushes <= 3 # all 100 commands are executed in at most 3 batches + + +""" +Ensure liveness even with only a single interpreter in scenarios where EVAL and EVAL inside multi run concurrently while also contending for keys +""" + + +@dfly_args({"proactor_threads": 3, "interpreter_per_thread": 1}) +async def test_one_interpreter(async_client: aioredis.Redis): + sha = await async_client.script_load("redis.call('GET', KEYS[1])") + all_keys = [string.ascii_lowercase[i] for i in range(5)] + total_commands = 100 + + async def run_multi(): + for _ in range(total_commands): + p = async_client.pipeline(transaction=True) + pkeys = random.choices(all_keys, k=3) + for key in pkeys: + p.evalsha(sha, 1, key) + await p.execute() + + async def run_single(): + for _ in range(total_commands): + await async_client.evalsha(sha, 1, random.choice(all_keys)) + + max_blocked = 0 + + async def measure_blocked(): + nonlocal max_blocked + while True: + max_blocked = max( + max_blocked, (await async_client.info("STATS"))["blocked_on_interpreter"] + ) + await asyncio.sleep(0.01) + + tm = [asyncio.create_task(run_multi()) for _ in range(5)] + ts = [asyncio.create_task(run_single()) for _ in range(5)] + block_measure = asyncio.create_task(measure_blocked()) + + async with async_timeout.timeout(5): + await asyncio.gather(*(tm + ts)) + + block_measure.cancel() + + # At least some of the commands were seen blocking on the interpreter + assert max_blocked > 3