diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 18bf45448..f904fe4bf 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -524,9 +524,7 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) { RETURN_ON_ERR(SaveString((uint8_t*)ri.key, ri.key_len)); RETURN_ON_ERR(SaveString(lp, lp_bytes)); - const FlushState flush_state = - (i + 1 < rax_size) ? FlushState::kFlushMidEntry : FlushState::kFlushEndEntry; - FlushIfNeeded(flush_state); + FlushIfNeeded(FlushState::kFlushMidEntry); } std::move(stop_listpacks_rax).Invoke(); @@ -595,6 +593,8 @@ error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) { } } + FlushIfNeeded(FlushState::kFlushEndEntry); + return error_code{}; } diff --git a/tests/dragonfly/seeder/__init__.py b/tests/dragonfly/seeder/__init__.py index 78c51f383..a10e1a980 100644 --- a/tests/dragonfly/seeder/__init__.py +++ b/tests/dragonfly/seeder/__init__.py @@ -18,8 +18,7 @@ except ImportError: class SeederBase: UID_COUNTER = 1 # multiple generators should not conflict on keys CACHED_SCRIPTS = {} - DEFAULT_TYPES = ["STRING", "LIST", "SET", "HASH", "ZSET", "JSON"] - BIG_VALUE_TYPES = ["LIST", "SET", "HASH", "ZSET"] + DEFAULT_TYPES = ["STRING", "LIST", "SET", "HASH", "ZSET", "JSON", "STREAM"] def __init__(self, types: typing.Optional[typing.List[str]] = None): self.uid = SeederBase.UID_COUNTER diff --git a/tests/dragonfly/seeder/script-genlib.lua b/tests/dragonfly/seeder/script-genlib.lua index e284050a3..ee129baab 100644 --- a/tests/dragonfly/seeder/script-genlib.lua +++ b/tests/dragonfly/seeder/script-genlib.lua @@ -192,6 +192,34 @@ function LG_funcs.mod_json(key, dbsize) end end +-- streams +-- store sequences of timestamped events + +function LG_funcs.add_stream(key) + local entries = {} + + local limit = LG_funcs.csize + local blobs = randstr_sequence() + + for i = 1, limit do + table.insert(entries, tostring(i)) + table.insert(entries, blobs[i]) + end + + redis.apcall('XADD', key, '*', unpack(entries)) +end + +function LG_funcs.mod_stream(key) + local action = math.random(1, 3) + if action <= 2 then + local size = LG_funcs.csize * 2 + redis.apcall('XADD', key, '*', math.random(0, size), randstr()) + else + local maxlen = math.random(0, 100) + redis.apcall('XTRIM', key, 'MAXLEN', '~', maxlen) + end +end + function LG_funcs.get_huge_entries() return huge_entries end diff --git a/tests/dragonfly/seeder_test.py b/tests/dragonfly/seeder_test.py index 3d35242da..ed8456d8e 100644 --- a/tests/dragonfly/seeder_test.py +++ b/tests/dragonfly/seeder_test.py @@ -13,7 +13,7 @@ async def test_static_seeder(async_client: aioredis.Redis): s = StaticSeeder(key_target=10_000, data_size=100) await s.run(async_client) - assert abs(await async_client.dbsize() - 10_000) <= 50 + assert abs(await async_client.dbsize() - 10_000) <= 70 @dfly_args({"proactor_threads": 4})