mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore(tiering): More advanced tiering tests (#3201)
* chore(tiering): More advanced tiering tests * fix: fixes
This commit is contained in:
parent
f01aa2d76b
commit
40ede6f61a
1 changed files with 69 additions and 27 deletions
|
@ -1,39 +1,81 @@
|
|||
from . import dfly_args
|
||||
|
||||
import async_timeout
|
||||
import asyncio
|
||||
import itertools
|
||||
import pytest
|
||||
import random
|
||||
import redis.asyncio as aioredis
|
||||
|
||||
BASIC_ARGS = {"port": 6379, "proactor_threads": 1, "tiered_prefix": "/tmp/tiering_test_backing"}
|
||||
from . import dfly_args
|
||||
from .seeder import StaticSeeder
|
||||
|
||||
|
||||
# remove once proudct requirments are tested
|
||||
BASIC_ARGS = {"port": 6379, "proactor_threads": 4, "tiered_prefix": "/tmp/tiering_test_backing"}
|
||||
|
||||
|
||||
@pytest.mark.opt_only
|
||||
@dfly_args(BASIC_ARGS)
|
||||
async def test_tiering_simple(async_client: aioredis.Redis):
|
||||
fill_script = """#!lua flags=disable-atomicity
|
||||
for i = 1, 100 do
|
||||
redis.call('SET', 'k' .. i, string.rep('a', 3000))
|
||||
end
|
||||
async def test_basic_memory_usage(async_client: aioredis.Redis):
|
||||
"""
|
||||
Loading 1GB of mixed size strings (256b-16kb) will keep most of them on disk and thus RAM remains almost unused
|
||||
"""
|
||||
|
||||
# Store 100 entries
|
||||
await async_client.eval(fill_script, 0)
|
||||
seeder = StaticSeeder(
|
||||
key_target=200_000, data_size=2048, variance=8, samples=100, types=["STRING"]
|
||||
)
|
||||
await seeder.run(async_client)
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Wait for all to be offloaded
|
||||
with async_timeout.timeout(1):
|
||||
info = await async_client.info("TIERED")
|
||||
print(info)
|
||||
while info["tiered_total_stashes"] != 100:
|
||||
info = await async_client.info("TIERED")
|
||||
await asyncio.sleep(0.1)
|
||||
assert 3000 * 100 <= info["tiered_allocated_bytes"] <= 4096 * 100
|
||||
info = await async_client.info("ALL")
|
||||
assert info["num_entries"] == 200_000
|
||||
|
||||
# Fetch back
|
||||
for key in (f"k{i}" for i in range(1, 100 + 1)):
|
||||
assert len(await async_client.execute_command("GET", key)) == 3000
|
||||
assert (await async_client.info("TIERED"))["tiered_total_fetches"] == 100
|
||||
assert info["tiered_entries"] > 195_000 # some remain in unfilled small bins
|
||||
assert (
|
||||
info["tiered_allocated_bytes"] > 195_000 * 2048 * 0.8
|
||||
) # 0.8 just to be sure because it fluctuates due to variance
|
||||
|
||||
# Wait to be deleted
|
||||
with async_timeout.timeout(1):
|
||||
while (await async_client.info("TIERED"))["tiered_allocated_bytes"] > 0:
|
||||
await asyncio.sleep(0.1)
|
||||
assert info["used_memory"] < 50 * 1024 * 1024
|
||||
assert (
|
||||
info["used_memory_rss"] < 500 * 1024 * 1024
|
||||
) # the grown table itself takes up lots of space
|
||||
|
||||
|
||||
@pytest.mark.opt_only
|
||||
@dfly_args(
|
||||
{
|
||||
**BASIC_ARGS,
|
||||
"maxmemory": "1G",
|
||||
"tiered_offload_threshold": "0.0",
|
||||
"tiered_storage_write_depth": 1000,
|
||||
}
|
||||
)
|
||||
async def test_mixed_append(async_client: aioredis.Redis):
|
||||
"""
|
||||
Issue conflicting mixed APPEND calls for a limited subset of keys with aggressive offloading in the background.
|
||||
Make sure no appends were lost
|
||||
"""
|
||||
|
||||
# Generate operations and shuffle them, key number `k` will have `k` append operations
|
||||
key_range = list(range(100, 300))
|
||||
ops = list(itertools.chain(*map(lambda k: itertools.repeat(k, k), key_range)))
|
||||
random.shuffle(ops)
|
||||
|
||||
# Split list into n workers and run it
|
||||
async def run(sub_ops):
|
||||
p = async_client.pipeline(transaction=False)
|
||||
for k in sub_ops:
|
||||
p.append(f"k{k}", 10 * "x")
|
||||
await p.execute()
|
||||
|
||||
n = 20
|
||||
await asyncio.gather(*(run(ops[i::n]) for i in range(n)))
|
||||
|
||||
info = await async_client.info("tiered")
|
||||
assert info["tiered_entries"] > len(key_range) / 5
|
||||
|
||||
# Verify lengths
|
||||
p = async_client.pipeline(transaction=False)
|
||||
for k in key_range:
|
||||
p.strlen(f"k{k}")
|
||||
res = await p.execute()
|
||||
|
||||
assert res == [10 * k for k in key_range]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue