Add basic replicaiton from redis test (#895)

Signed-off-by: ashotland <ari@dragonflydb.io>
This commit is contained in:
ashotland 2023-03-01 21:50:51 +02:00 committed by GitHub
parent ac280529cb
commit ccc784d9c4
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 88 additions and 9 deletions

View file

@ -0,0 +1,79 @@
import time
import pytest
import asyncio
import aioredis
import subprocess
from .utility import *
class RedisServer:
def __init__(self):
self.port = 5555
self.proc = None
def start(self):
self.proc = subprocess.Popen(["redis-server-6.2.11",
f"--port {self.port}",
"--save ''",
"--appendonly no",
"--protected-mode no",
"--repl-diskless-sync yes",
"--repl-diskless-sync-delay 0"])
print(self.proc.args)
def stop(self):
self.proc.terminate()
try:
self.proc.wait(timeout=10)
except Exception as e:
pass
@pytest.fixture(scope="function")
def redis_server() -> RedisServer:
s = RedisServer()
s.start()
time.sleep(1)
yield s
s.stop()
@pytest.mark.asyncio
async def test_replication_full_sync(df_local_factory, df_seeder_factory, redis_server):
c_master = aioredis.Redis(port=redis_server.port)
assert await c_master.ping()
seeder = df_seeder_factory.create(port=redis_server.port, keys=100, dbcount=1, unsupported_types=[ValueType.JSON])
await seeder.run(target_deviation=0.1)
replica = df_local_factory.create(port=redis_server.port + 1)
replica.start()
c_replica = aioredis.Redis(port=replica.port)
assert await c_replica.ping()
await c_replica.execute_command("REPLICAOF", "localhost", redis_server.port)
await wait_available_async(c_replica)
await asyncio.sleep(5)
capture = await seeder.capture()
assert await seeder.compare(capture, port=replica.port)
@pytest.mark.asyncio
async def test_replication_stable_sync(df_local_factory, df_seeder_factory, redis_server):
c_master = aioredis.Redis(port=redis_server.port)
assert await c_master.ping()
replica = df_local_factory.create(port=redis_server.port + 1)
replica.start()
c_replica = aioredis.Redis(port=replica.port)
assert await c_replica.ping()
await c_replica.execute_command("REPLICAOF", "localhost", redis_server.port)
await wait_available_async(c_replica)
seeder = df_seeder_factory.create(port=redis_server.port, keys=100, dbcount=1, unsupported_types=[ValueType.JSON])
await seeder.run(target_ops=100)
await asyncio.sleep(5)
capture = await seeder.capture()
assert await seeder.compare(capture, port=replica.port)

View file

@ -66,19 +66,16 @@ class ValueType(Enum):
ZSET = 4
JSON = 5
@staticmethod
def randomize():
return random.choice([t for t in ValueType])
class CommandGenerator:
"""Class for generating complex command sequences"""
def __init__(self, target_keys, val_size, batch_size, max_multikey):
def __init__(self, target_keys, val_size, batch_size, max_multikey, unsupported_types=[]):
self.key_cnt_target = target_keys
self.val_size = val_size
self.batch_size = min(batch_size, target_keys)
self.max_multikey = max_multikey
self.unsupported_types = unsupported_types
# Key management
self.key_sets = [set() for _ in ValueType]
@ -105,12 +102,15 @@ class CommandGenerator:
self.set_for_type(t).add(k)
return k
def random_type(self):
return random.choice([t for t in ValueType if (t not in self.unsupported_types)])
def randomize_nonempty_set(self):
"""Return random non-empty set and its type"""
if not any(self.key_sets):
return None, None
t = ValueType.randomize()
t = self.random_type()
s = self.set_for_type(t)
if len(s) == 0:
@ -229,7 +229,7 @@ class CommandGenerator:
Generate command that grows keyset: Initialize key of random type with filler value.
"""
# TODO: Implement COPY in Dragonfly.
t = ValueType.randomize()
t = self.random_type()
if t == ValueType.STRING:
count = random.randint(1, self.max_multikey)
else:
@ -339,9 +339,9 @@ class DflySeeder:
assert await seeder.compare(capture, port=1112)
"""
def __init__(self, port=6379, keys=1000, val_size=50, batch_size=100, max_multikey=5, dbcount=1, multi_transaction_probability=0.3, log_file=None):
def __init__(self, port=6379, keys=1000, val_size=50, batch_size=100, max_multikey=5, dbcount=1, multi_transaction_probability=0.3, log_file=None, unsupported_types=[]):
self.gen = CommandGenerator(
keys, val_size, batch_size, max_multikey
keys, val_size, batch_size, max_multikey, unsupported_types
)
self.port = port
self.dbcount = dbcount