test(cluster): Enable seeder to work against a Dragonfly cluster (#2462)

This commit is contained in:
Shahar Mike 2024-01-24 20:02:04 +02:00 committed by GitHub
parent aeb2b00ac8
commit e6f418575b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 25 additions and 10 deletions

View file

@ -561,7 +561,10 @@ async def test_cluster_blocking_command(df_server):
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(df_local_factory: DflyInstanceFactory):
async def test_cluster_native_client(
df_local_factory: DflyInstanceFactory,
df_seeder_factory: DflySeederFactory,
):
# Start and configure cluster with 3 masters and 3 replicas
masters = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@ -647,6 +650,9 @@ async def test_cluster_native_client(df_local_factory: DflyInstanceFactory):
"""
await push_config(config, c_masters_admin + c_replicas_admin)
seeder = df_seeder_factory.create(port=masters[0].port, cluster_mode=True)
await seeder.run(target_deviation=0.1)
client = aioredis.RedisCluster(decode_responses=True, host="localhost", port=masters[0].port)
assert await client.set("key0", "value") == True

View file

@ -194,7 +194,7 @@ class CommandGenerator:
key, _ = self.randomize_key(pop=True)
if key == None:
return None, 0
return f"PEXPIRE k{key} {random.randint(0, 50)}", -1
return ("PEXPIRE", f"k{key}", f"{random.randint(0, 50)}"), -1
else:
keys_gen = (
self.randomize_key(pop=True) for _ in range(random.randint(1, self.max_multikey))
@ -203,7 +203,7 @@ class CommandGenerator:
if len(keys) == 0:
return None, 0
return "DEL " + " ".join(keys), -len(keys)
return ("DEL", *keys), -len(keys)
UPDATE_ACTIONS = [
("APPEND {k} {val}", ValueType.STRING),
@ -228,7 +228,7 @@ class CommandGenerator:
cmd, t = random.choice(self.UPDATE_ACTIONS)
k, _ = self.randomize_key(t)
val = "".join(random.choices(string.ascii_letters, k=3))
return cmd.format(k=f"k{k}", val=val) if k is not None else None, 0
return cmd.format(k=f"k{k}", val=val).split() if k is not None else None, 0
GROW_ACTINONS = {
ValueType.STRING: "MSET",
@ -363,7 +363,14 @@ class DflySeeder:
log_file=None,
unsupported_types=[],
stop_on_failure=True,
cluster_mode=False,
):
if cluster_mode:
max_multikey = 1
multi_transaction_probability = 0
unsupported_types = [ValueType.JSON] # Cluster aio client doesn't support JSON
self.cluster_mode = cluster_mode
self.gen = CommandGenerator(keys, val_size, batch_size, max_multikey, unsupported_types)
self.port = port
self.dbcount = dbcount
@ -499,7 +506,10 @@ class DflySeeder:
return submitted
async def _executor_task(self, db, queue):
client = aioredis.Redis(port=self.port, db=db)
if self.cluster_mode:
client = aioredis.RedisCluster(host="localhost", port=self.port, db=db)
else:
client = aioredis.Redis(port=self.port, db=db)
while True:
tx_data = await queue.get()
@ -509,10 +519,7 @@ class DflySeeder:
pipe = client.pipeline(transaction=tx_data[1])
for cmd in tx_data[0]:
if isinstance(cmd, str):
pipe.execute_command(cmd)
else:
pipe.execute_command(*cmd)
pipe.execute_command(*cmd)
try:
await pipe.execute()
@ -522,7 +529,9 @@ class DflySeeder:
except Exception as e:
raise SystemExit(e)
queue.task_done()
await client.connection_pool.disconnect()
await client.close()
if not self.cluster_mode:
await client.connection_pool.disconnect()
CAPTURE_COMMANDS = {
ValueType.STRING: lambda pipe, k: pipe.get(k),