fix(server): deadlock with replicaof inside multi (#4685)

* fix server: fix deadlock with replicaof inside multi

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2025-03-04 09:48:18 +02:00 committed by Roman Gershman
parent cf47d684a5
commit 4f9103073b
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
2 changed files with 39 additions and 1 deletions

View file

@ -650,6 +650,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
const ScriptMgr& script_mgr) {
// Check if script most LIKELY has global eval transactions
bool contains_global = false;
bool contains_admin_cmd = false;
Transaction::MultiMode multi_mode = Transaction::LOCK_AHEAD;
if (state == ExecScriptUse::SCRIPT_RUN) {
@ -670,6 +671,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
transactional |= scmd.Cid()->IsTransactional();
}
contains_global |= scmd.Cid()->opt_mask() & CO::GLOBAL_TRANS;
contains_admin_cmd |= scmd.Cid()->opt_mask() & CO::ADMIN;
// We can't run no-key-transactional commands in lock-ahead mode currently,
// because it means we have to schedule on all shards
@ -685,9 +687,13 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state,
if (!transactional && exec_info.watched_keys.empty())
return Transaction::NOT_DETERMINED;
if (contains_admin_cmd) {
multi_mode = Transaction::NON_ATOMIC;
}
// Atomic modes fall back to GLOBAL if they contain global commands.
if (contains_global && multi_mode == Transaction::LOCK_AHEAD)
else if (contains_global && multi_mode == Transaction::LOCK_AHEAD) {
multi_mode = Transaction::GLOBAL;
}
return multi_mode;
}

View file

@ -2799,3 +2799,35 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
await wait_for_replicas_state(*c_replicas)
await fill_task
@dfly_args({"proactor_threads": 2})
async def test_replicaof_inside_multi(df_factory):
master = df_factory.create()
replica = df_factory.create()
df_factory.start_all([master, replica])
async def replicate_inside_multi():
try:
c_master = master.client()
p = c_master.pipeline(transaction=True)
for i in range(5):
p.execute_command("dbsize")
p.execute_command(f"replicaof localhost {replica.port}")
await p.execute()
return True
except redis.exceptions.ResponseError:
return False
MULTI_COMMANDS_TO_ISSUE = 30
replication_commands = [
asyncio.create_task(replicate_inside_multi()) for _ in range(MULTI_COMMANDS_TO_ISSUE)
]
num_successes = 0
for result in asyncio.as_completed(replication_commands, timeout=80):
num_successes += await result
logging.info(f"succeses: {num_successes}")
assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
assert num_successes > 0, "At least one REPLICAOF must success"