mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix(seeder): Support stream types in the Seeder (#4574)
This commit is contained in:
parent
023aa7c89e
commit
66bce2b51b
4 changed files with 33 additions and 6 deletions
|
@ -524,9 +524,7 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
|
||||||
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
|
RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len));
|
||||||
RETURN_ON_ERR(SaveString(lp, lp_bytes));
|
RETURN_ON_ERR(SaveString(lp, lp_bytes));
|
||||||
|
|
||||||
const FlushState flush_state =
|
FlushIfNeeded(FlushState::kFlushMidEntry);
|
||||||
(i + 1 < rax_size) ? FlushState::kFlushMidEntry : FlushState::kFlushEndEntry;
|
|
||||||
FlushIfNeeded(flush_state);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
std::move(stop_listpacks_rax).Invoke();
|
std::move(stop_listpacks_rax).Invoke();
|
||||||
|
@ -595,6 +593,8 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
FlushIfNeeded(FlushState::kFlushEndEntry);
|
||||||
|
|
||||||
return error_code{};
|
return error_code{};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,7 @@ except ImportError:
|
||||||
class SeederBase:
|
class SeederBase:
|
||||||
UID_COUNTER = 1 # multiple generators should not conflict on keys
|
UID_COUNTER = 1 # multiple generators should not conflict on keys
|
||||||
CACHED_SCRIPTS = {}
|
CACHED_SCRIPTS = {}
|
||||||
DEFAULT_TYPES = ["STRING", "LIST", "SET", "HASH", "ZSET", "JSON"]
|
DEFAULT_TYPES = ["STRING", "LIST", "SET", "HASH", "ZSET", "JSON", "STREAM"]
|
||||||
BIG_VALUE_TYPES = ["LIST", "SET", "HASH", "ZSET"]
|
|
||||||
|
|
||||||
def __init__(self, types: typing.Optional[typing.List[str]] = None):
|
def __init__(self, types: typing.Optional[typing.List[str]] = None):
|
||||||
self.uid = SeederBase.UID_COUNTER
|
self.uid = SeederBase.UID_COUNTER
|
||||||
|
|
|
@ -192,6 +192,34 @@ function LG_funcs.mod_json(key, dbsize)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
-- streams
|
||||||
|
-- store sequences of timestamped events
|
||||||
|
|
||||||
|
function LG_funcs.add_stream(key)
|
||||||
|
local entries = {}
|
||||||
|
|
||||||
|
local limit = LG_funcs.csize
|
||||||
|
local blobs = randstr_sequence()
|
||||||
|
|
||||||
|
for i = 1, limit do
|
||||||
|
table.insert(entries, tostring(i))
|
||||||
|
table.insert(entries, blobs[i])
|
||||||
|
end
|
||||||
|
|
||||||
|
redis.apcall('XADD', key, '*', unpack(entries))
|
||||||
|
end
|
||||||
|
|
||||||
|
function LG_funcs.mod_stream(key)
|
||||||
|
local action = math.random(1, 3)
|
||||||
|
if action <= 2 then
|
||||||
|
local size = LG_funcs.csize * 2
|
||||||
|
redis.apcall('XADD', key, '*', math.random(0, size), randstr())
|
||||||
|
else
|
||||||
|
local maxlen = math.random(0, 100)
|
||||||
|
redis.apcall('XTRIM', key, 'MAXLEN', '~', maxlen)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
function LG_funcs.get_huge_entries()
|
function LG_funcs.get_huge_entries()
|
||||||
return huge_entries
|
return huge_entries
|
||||||
end
|
end
|
||||||
|
|
|
@ -13,7 +13,7 @@ async def test_static_seeder(async_client: aioredis.Redis):
|
||||||
s = StaticSeeder(key_target=10_000, data_size=100)
|
s = StaticSeeder(key_target=10_000, data_size=100)
|
||||||
await s.run(async_client)
|
await s.run(async_client)
|
||||||
|
|
||||||
assert abs(await async_client.dbsize() - 10_000) <= 50
|
assert abs(await async_client.dbsize() - 10_000) <= 70
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"proactor_threads": 4})
|
@dfly_args({"proactor_threads": 4})
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue