tests(replication): Test script replication (#960)

Script replication test
This commit is contained in:
Vladislav 2023-03-20 22:57:52 +03:00 committed by GitHub
parent 23e055e675
commit 6d3a191a5f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23

View file

@ -679,3 +679,76 @@ async def test_expiry(df_local_factory, n_keys=1000):
await asyncio.sleep(1.0)
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is None for v in res)
"""
Test script replication.
Fill multiple lists with values and rotate them one by one with LMOVE until they're at the same place again.
"""
# t_master, t_replicas, num_ops, num_keys, num_parallel, flags
script_cases = [
(4, [4, 4, 4], 50, 5, 5, ""),
(4, [4, 4, 4], 50, 5, 5, "disable-atomicity"),
]
script_test_s1 = """
{flags}
local N = ARGV[1]
-- fill each list with its k value
for i, k in pairs(KEYS) do
for j = 1, N do
redis.call('LPUSH', k, i-1)
end
end
-- rotate #KEYS times
for l = 1, #KEYS do
for j = 1, N do
for i, k in pairs(KEYS) do
redis.call('LMOVE', k, KEYS[i%#KEYS+1], 'LEFT', 'RIGHT')
end
end
end
return 'OK'
"""
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, num_ops, num_keys, num_par, flags", script_cases)
async def test_scripts(df_local_factory, t_master, t_replicas, num_ops, num_keys, num_par, flags):
master = df_local_factory.create(
port=BASE_PORT, proactor_threads=t_master)
replicas = [df_local_factory.create(
port=BASE_PORT+i+1, proactor_threads=t) for i, t in enumerate(t_replicas)]
df_local_factory.start_all([master] + replicas)
c_master = aioredis.Redis(port=master.port)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
for c_replica in c_replicas:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
script = script_test_s1.format(
flags=f'#!lua flags={flags}' if flags else '')
sha = await c_master.script_load(script)
key_sets = [
[f'{i}-{j}' for j in range(num_keys)] for i in range(num_par)
]
rsps = await asyncio.gather(*(c_master.evalsha(sha, len(keys), *keys, num_ops) for keys in key_sets))
assert rsps == [b'OK'] * num_par
await check_all_replicas_finished(c_replicas, c_master)
for c_replica in c_replicas:
for key_set in key_sets:
for j, k in enumerate(key_set):
l = await c_replica.lrange(k, 0, -1)
assert l == [f'{j}'.encode()] * num_ops