diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 8bb0bb5fa..b2833c9fb 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -650,6 +650,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state, const ScriptMgr& script_mgr) { // Check if script most LIKELY has global eval transactions bool contains_global = false; + bool contains_admin_cmd = false; Transaction::MultiMode multi_mode = Transaction::LOCK_AHEAD; if (state == ExecScriptUse::SCRIPT_RUN) { @@ -670,6 +671,7 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state, transactional |= scmd.Cid()->IsTransactional(); } contains_global |= scmd.Cid()->opt_mask() & CO::GLOBAL_TRANS; + contains_admin_cmd |= scmd.Cid()->opt_mask() & CO::ADMIN; // We can't run no-key-transactional commands in lock-ahead mode currently, // because it means we have to schedule on all shards @@ -685,9 +687,13 @@ Transaction::MultiMode DeduceExecMode(ExecScriptUse state, if (!transactional && exec_info.watched_keys.empty()) return Transaction::NOT_DETERMINED; + if (contains_admin_cmd) { + multi_mode = Transaction::NON_ATOMIC; + } // Atomic modes fall back to GLOBAL if they contain global commands. - if (contains_global && multi_mode == Transaction::LOCK_AHEAD) + else if (contains_global && multi_mode == Transaction::LOCK_AHEAD) { multi_mode = Transaction::GLOBAL; + } return multi_mode; } diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 5d6d9a76e..836f71c82 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2799,3 +2799,35 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa await wait_for_replicas_state(*c_replicas) await fill_task + + +@dfly_args({"proactor_threads": 2}) +async def test_replicaof_inside_multi(df_factory): + master = df_factory.create() + replica = df_factory.create() + df_factory.start_all([master, replica]) + + async def replicate_inside_multi(): + try: + c_master = master.client() + p = c_master.pipeline(transaction=True) + for i in range(5): + p.execute_command("dbsize") + p.execute_command(f"replicaof localhost {replica.port}") + await p.execute() + return True + except redis.exceptions.ResponseError: + return False + + MULTI_COMMANDS_TO_ISSUE = 30 + replication_commands = [ + asyncio.create_task(replicate_inside_multi()) for _ in range(MULTI_COMMANDS_TO_ISSUE) + ] + + num_successes = 0 + for result in asyncio.as_completed(replication_commands, timeout=80): + num_successes += await result + + logging.info(f"succeses: {num_successes}") + assert MULTI_COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" + assert num_successes > 0, "At least one REPLICAOF must success"