feat(pytest): Gen2 seeder, part 1 (#2556)

* feat(pytest): Gen2 seeder

Implement new seeder that uses lua scripts to improve performance

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-02-09 19:20:25 +03:00 committed by GitHub
parent 06d88ddc88
commit 881edb501e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 535 additions and 1 deletions

View file

@ -4,11 +4,13 @@
#include "core/interpreter.h"
#include <absl/base/casts.h>
#include <absl/container/fixed_array.h>
#include <absl/strings/str_cat.h>
#include <absl/time/clock.h>
#include <mimalloc.h>
#include <openssl/evp.h>
#include <xxhash.h>
#include <cstring>
#include <optional>
@ -303,6 +305,41 @@ void ToHex(const uint8_t* src, char* dest) {
dest[40] = '\0';
}
int DragonflyHashCommand(lua_State* lua) {
int argc = lua_gettop(lua);
if (argc != 2) {
lua_pushstring(lua, "wrong number of arguments");
return lua_error(lua);
}
XXH64_hash_t hash = absl::bit_cast<XXH64_hash_t>(lua_tointeger(lua, 1));
auto update_hash = [&hash](string_view sv) { hash = XXH64(sv.data(), sv.length(), hash); };
auto digest_value = [&hash, &update_hash, lua](int pos) {
if (int type = lua_type(lua, pos); type == LUA_TSTRING) {
const char* str = lua_tostring(lua, pos);
update_hash(string_view{str, strlen(str)});
} else {
CHECK_EQ(type, LUA_TNUMBER) << "Only strings and integers can be hashed";
update_hash(to_string(lua_tointeger(lua, pos)));
}
};
if (lua_type(lua, 2) == LUA_TTABLE) {
lua_pushnil(lua);
while (lua_next(lua, 2) != 0) {
digest_value(-2); // key, included for correct hashing
digest_value(-1); // value
lua_pop(lua, 1);
}
} else {
digest_value(2);
}
lua_pushinteger(lua, absl::bit_cast<lua_Integer>(hash));
return 1;
}
int RedisSha1Command(lua_State* lua) {
int argc = lua_gettop(lua);
if (argc != 1) {
@ -387,6 +424,17 @@ Interpreter::Interpreter() {
*ptr = this;
// SaveOnRegistry(lua_, kInstanceKey, this);
/* Register the dragonfly commands table and fields */
lua_newtable(lua_);
lua_pushstring(lua_, "ihash");
lua_pushcfunction(lua_, DragonflyHashCommand);
lua_settable(lua_, -3);
/* Finally set the table as 'dragonfly' global var. */
lua_setglobal(lua_, "dragonfly");
CHECK(lua_checkstack(lua_, 64));
/* Register the redis commands table and fields */
lua_newtable(lua_);

View file

@ -319,7 +319,7 @@ class DflyInstanceFactory:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1"
args.setdefault("vmodule", vmod)
# args.setdefault("vmodule", vmod)
for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v

View file

@ -23,3 +23,4 @@ redis-om==0.2.1
pytest-emoji==0.2.0
pytest-icdiff==0.8
pytest-timeout==2.2.0
asyncio==3.4.3

View file

@ -0,0 +1,65 @@
## Seeder library
Please use the testing frameworks factories to obtain proper seeder instances!
### 1. Filling data
The seeder tries to maintain a specific number of keys, quickly filling or emptying the instance to reach the target. Once reached, it will issue also modification commands, trying to maintain an equilibrium with mixed load
```python
# Configure how many keys we want
s = Seeder(key_target=10_000)
# Fill instance with keys until it's 10k +- 1%
# Will create many new keys with data and reach equilibrium
await s.run(client, target_deviation=0.01)
assert abs(client.dbsize() - 10_000) <= 100
# Run 5k operations, balanced mix of create/delete/modify
await s.run(client, target_ops=5000)
# Now we want only 500 keys, issue many deletes
s.change_key_target(500)
await s.run(client, target_deviation=0.01)
```
### 2. Checking consistency
Use `Seeder.capture()` to calculate a "state hashes" based on all the data inside an instance. Equal data produces equal hashes (equal hashes don't guarantee equal data but what are the odds...).
```python
# Fill master with 10k (+- 1%) keys
s = Seeder(key_target=10_000)
await seeder.run(master, target_deviation=0.01)
# "Replicate" or other operations
replicate(master, replica)
# Ensure master and replica have same state hashes
master_hashes, replica_hashes = asyncio.gather(
Seeder.capture(master), # note it's a static method
Seeder.capture(replica)
)
assert master_hashes == replica_hashes
```
### 3. Working with load
A seeders `run(client)` can be called without any target. It can only be stopped with
```python
# Fill instance with keys
s = Seeder()
await seeder.run(client, target_deviation=0.01)
# Start seeder without target
# Because the instance reached its key target, the seeder
# will issue a balanced mix of modifications/additions/deletions
seeding_task = asyncio.create_task(s.run(client))
# Do operations under fuzzy load
save(client)
await s.stop(client) # request stop, no immediate effect
await seeding_task # wait for actual stop and cleanup
```

View file

@ -0,0 +1,117 @@
import asyncio
import random
import re
import typing
import redis.asyncio as aioredis
from dataclasses import dataclass
try:
from importlib import resources as impresources
except ImportError:
# CI runs on python < 3.8
import importlib_resources as impresources
class SeederBase:
UID_COUNTER = 1 # multiple generators should not conflict on keys
CACHED_SCRIPTS = {}
TYPES = ["string"]
@classmethod
async def capture(clz, client: aioredis.Redis) -> typing.List[int]:
"""Generate hash capture for all data stored in instance pointed by client"""
sha = await client.script_load(clz._load_script("hash"))
return await asyncio.gather(*(client.evalsha(sha, 0, data_type) for data_type in clz.TYPES))
@classmethod
def _next_id(clz):
clz.UID_COUNTER += 1
return clz.UID_COUNTER
@staticmethod
def _read_file(fname):
try:
script_file = impresources.files(__package__) / fname
with script_file.open("rt") as f:
return f.read()
except AttributeError:
return impresources.read_text(__package__, fname)
@classmethod
def _load_script(clz, fname):
if fname in clz.CACHED_SCRIPTS:
return clz.CACHED_SCRIPTS[fname]
script = clz._read_file(f"script-{fname}.lua")
requested = re.findall(r"-- import:(.*?) --", script)
for request in requested:
lib = clz._read_file(f"script-{request}.lua")
script = script.replace(f"-- import:{request} --", lib)
clz.CACHED_SCRIPTS[fname] = script
return script
class Seeder(SeederBase):
@dataclass
class Unit:
prefix: str
type: str
counter: int
stop_key: str
units: typing.List[Unit]
def __init__(self, units=10, key_target=10_000, data_size=10):
self.uid = Seeder._next_id()
self.key_target = key_target
self.data_size = data_size
self.units = [
Seeder.Unit(
prefix=f"k-s{self.uid}u{i}-",
type=random.choice(Seeder.TYPES),
counter=0,
stop_key=f"_s{self.uid}u{i}-stop",
)
for i in range(units)
]
async def run(self, client: aioredis.Redis, target_ops=None, target_deviation=None):
"""Run seeder until one of the targets or until stopped if none are set"""
using_stopkey = target_ops is None and target_deviation is None
args = [
self.key_target / len(self.units),
target_ops if target_ops is not None else 0,
target_deviation if target_deviation is not None else -1,
self.data_size,
]
sha = await client.script_load(Seeder._load_script("generate"))
await asyncio.gather(
*(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units)
)
async def stop(self, client: aioredis.Redis):
"""Reqeust seeder seeder if it's running without a target, future returned from start() must still be awaited"""
await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units))
def change_key_target(self, target: int):
"""Change key target, applied only on succeeding runs"""
self.key_target = max(target, 100) # math breaks with low values
@staticmethod
async def _run_unit(client: aioredis.Redis, sha: str, unit: Unit, using_stopkey, args):
await client.delete(unit.stop_key)
args = [
unit.prefix,
unit.type,
unit.counter,
unit.stop_key if using_stopkey else "",
] + args
unit.counter = await client.evalsha(sha, 0, *args)

View file

@ -0,0 +1,126 @@
#!lua flags=disable-atomicity
--[[
Script for quickly generating various data
]] --
-- import:genlib --
-- import:utillib --
-- inputs: unit identifiers
local prefix = ARGV[1]
local type = ARGV[2]
local key_counter = tonumber(ARGV[3])
local stop_key = ARGV[4]
-- inputs: task specific
local key_target = tonumber(ARGV[5])
local total_ops = tonumber(ARGV[6])
local min_dev = tonumber(ARGV[7])
local data_size = tonumber(ARGV[8])
-- collect all keys belonging to this script
-- assumes exclusive ownership
local keys = LU_collect_keys(prefix, type)
local addfunc = LG_funcs['add_' .. type]
local modfunc = LG_funcs['mod_' .. type]
local function action_add()
local key = prefix .. tostring(key_counter)
key_counter = key_counter + 1
table.insert(keys, key)
addfunc(key, data_size)
end
local function action_mod()
local key = keys[math.random(#keys)]
modfunc(key, data_size)
end
local function action_del()
local key_idx = math.random(#keys)
keys[key_idx], keys[#keys] = keys[#keys], keys[key_idx]
local key = table.remove(keys)
redis.acall('DEL', key)
end
-- set equilibrium point as key target, see intensity calculations below
local real_target = key_target
key_target = key_target / 0.956
-- accumulative probabilities: [add, add + delete, modify = 1-( add + delete) ]
local p_add = 0
local p_del = 0
local counter = 0
while true do
counter = counter + 1
-- break if we reached target ops
if total_ops > 0 and counter > total_ops then
break
end
if key_target < 100 and min_dev > 0 then
print(real_target, key_target, math.abs(#keys - real_target) / real_target)
print()
end
-- break if we reached our target deviation
if min_dev > 0 and math.abs(#keys - real_target) / real_target < min_dev then
break
end
-- break if stop key was set (every 100 ops to not slow down)
if stop_key ~= '' and counter % 100 == 0 and redis.call('EXISTS', stop_key) then
break
end
-- fast path, if we have less than half of the target, always grow
if #keys * 2 < key_target then
action_add()
goto continue
end
-- update probability only every 10 iterations
if counter % 10 == 0 then
-- calculate intensity (not normalized probabilities)
-- please see attached plots in PR to undertand convergence
-- the add intensity is monotonically decreasing with keycount growing,
-- the delete intensity is monotonically increasing with keycount growing,
-- the point where the intensities are equal is the equilibrium point,
-- based on the formulas it's ~0.82 * key_target
local i_add = math.max(0, 1 - (#keys / key_target) ^ 16)
local i_del = (#keys / key_target) ^ 16
-- we are only interested in large amounts of modification commands when we are in an
-- equilibrium, where there are no low intensities
local i_mod = math.max(0, 7 * math.min(i_add, i_del) ^ 3)
-- transform intensities to [0, 1] probability ranges
local sum = i_add + i_del + i_mod
p_add = i_add / sum
p_del = p_add + i_del / sum
end
-- generate random action
local p = math.random()
if p < p_add then
action_add()
elseif p < p_del then
action_del()
else
action_mod()
end
::continue::
end
-- clear stop key
if stop_key ~= '' then
redis.call('DEL', stop_key)
end
return key_counter

View file

@ -0,0 +1,20 @@
local LG_funcs = {}
-- strings
function LG_funcs.add_string(key, dsize)
local char = string.char(math.random(65, 90))
redis.apcall('SET', key, string.rep(char, dsize))
end
function LG_funcs.mod_string(key, dsize)
-- APPEND and SETRANGE are the only modifying operations for strings,
-- issue APPEND rarely to not grow data too much
if math.random() < 0.05 then
redis.apcall('APPEND', key, '+')
else
local char = string.char(math.random(65, 90))
local replacement = string.rep(char, math.random(0, dsize / 2))
redis.apcall('SETRANGE', key, math.random(0, dsize / 2), replacement)
end
end

View file

@ -0,0 +1,34 @@
#!lua flags=disable-atomicity
--[[
Script for quickly computing single 64bit hash for keys of types specified in ARGV[].
Keys of every type are sorted lexicographically to ensure consistent order.
]]--
-- import:hashlib --
-- import:utillib --
-- inputs
local requested_types = ARGV
local OUT_HASH = 0
local function process(type)
local keys = LU_collect_keys('', type)
local hfunc = LH_funcs[type]
-- sort to provide consistent order
table.sort(keys)
for _, key in ipairs(keys) do
-- add key to hash
OUT_HASH = dragonfly.ihash(OUT_HASH, key)
-- hand hash over to callback
OUT_HASH = hfunc(key, OUT_HASH)
end
end
for _, type in ipairs(requested_types) do
process(type)
end
return OUT_HASH

View file

@ -0,0 +1,39 @@
local LH_funcs = {}
function LH_funcs.string(key, hash)
-- add value to hash
return dragonfly.ihash(hash, redis.call('GET', key))
end
function LH_funcs.list(key, hash)
-- add values to hash
return dragonfly.ihash(hash, redis.call('LRANGE', key, 0, -1))
end
function LH_funcs.set(key, hash)
-- add values to hash, sort before to avoid ambiguity
local items = redis.call('SMEMBERS', key)
table.sort(items)
return dragonfly.ihash(hash, items)
end
function LH_funcs.zset(key, hash)
-- add values to hash, ZRANGE returns always sorted values
return dragonfly.ihash(hash, redis.call('ZRANGE', key, 0, -1, 'WITHSCORES'))
end
function LH_funcs.hash(key, hash)
-- add values to hash, first convert to key-value pairs and sort
local items = redis.call('HGETALL', key)
local paired_items = {}
for i = 1, #items, 2 do
table.insert(paired_items, items[i] .. '->' .. items[i+1])
end
table.sort(paired_items)
return dragonfly.ihash(hash, paired_items)
end
function LH_funcs.json(key, hash)
-- add values to hash, note JSON.GET returns just a string
return dragonfly.ihash(hash, redis.call('JSON.GET', key))
end

View file

@ -0,0 +1,15 @@
-- collect all keys into table specific type on specific prefix. Uses SCAN--
local function LU_collect_keys(prefix, type)
local pattern = prefix .. "*"
local cursor = "0"
local keys = {}
repeat
local result = redis.call("SCAN", cursor, "COUNT", 500, "TYPE", type, "MATCH", pattern)
cursor = result[1]
local scan_keys = result[2]
for i, key in ipairs(scan_keys) do
table.insert(keys, key)
end
until cursor == "0"
return keys
end

View file

@ -0,0 +1,69 @@
import asyncio
import async_timeout
import string
import random
from redis import asyncio as aioredis
from . import dfly_args
from .seeder import Seeder
@dfly_args({"proactor_threads": 4})
async def test_seeder_key_target(async_client: aioredis.Redis):
"""Ensure seeder reaches its key targets"""
s = Seeder(units=random.randint(4, 12), key_target=5000)
# Ensure tests are not reasonably slow
async with async_timeout.timeout(1 + 4):
# Fill with 5k keys, 1% derivation = 50
await s.run(async_client, target_deviation=0.01)
assert abs(await async_client.dbsize() - 5000) <= 50
# Run 1k ops, ensure key balance stays the "more or less" the same
await s.run(async_client, target_ops=1000)
assert abs(await async_client.dbsize() - 5000) <= 100
# Run one second until stopped
task = asyncio.create_task(s.run(async_client))
await asyncio.sleep(1.0)
await s.stop(async_client)
await task
# Change key target, 100 is actual minimum because "math breaks"
s.change_key_target(0)
await s.run(async_client, target_deviation=0.5) # don't set low precision with low values
assert await async_client.dbsize() < 200
@dfly_args({"proactor_threads": 4})
async def test_seeder_capture(async_client: aioredis.Redis):
"""Ensure same data produces same state hashes"""
async def set_data():
p = async_client.pipeline()
p.mset(mapping={f"string{i}": f"{i}" for i in range(100)})
# uncomment when seeder supports more than strings
# p.lpush("list1", *list(string.ascii_letters))
# p.sadd("set1", *list(string.ascii_letters))
# p.hset("hash1", mapping={f"{i}": l for i, l in enumerate(string.ascii_letters)})
# p.zadd("zset1", mapping={l: i for i, l in enumerate(string.ascii_letters)})
# p.json().set("json1", ".", {"a": [1, 2, 3], "b": {"c": 1, "d": 2, "e": [5, 6]}})
await p.execute()
# Capture with filled data
await set_data()
c1 = await Seeder.capture(async_client)
# Check hashes are 0 without data
await async_client.flushall()
assert all(h == 0 for h in (await Seeder.capture(async_client)))
# Check setting the same data results in same hashes
await set_data()
c2 = await Seeder.capture(async_client)
assert c1 == c2
# Check chaning the data gives different hahses
# await async_client.lpush("list1", "NEW")
await async_client.append("string1", "MORE-DATA")
c3 = await Seeder.capture(async_client)
assert c1 != c3