From ed11c8d3a44f64e9f19e37b99f82f83be1a86a11 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Mon, 30 Sep 2024 09:54:02 +0300 Subject: [PATCH] chore: allow config set notify_keyspace_events (#3790) We do not allow notify_keyspace_events to be set at runtime via config set command. * allow notify_keyspace_events in config set command * add tests --------- Signed-off-by: kostas --- src/server/db_slice.cc | 4 +++ src/server/db_slice.h | 4 +++ src/server/main_service.cc | 19 +++++++++++ tests/dragonfly/connection_test.py | 51 +++++++++++++++++++++++++----- 4 files changed, 70 insertions(+), 8 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 0cc30214e..87198b6c3 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1454,6 +1454,10 @@ void DbSlice::ResetEvents() { events_ = {}; } +void DbSlice::SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events) { + expired_keys_events_recording_ = !notify_keyspace_events.empty(); +} + void DbSlice::SendInvalidationTrackingMessage(std::string_view key) { if (client_tracking_map_.empty()) return; diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 546faac2b..16799530a 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -511,6 +511,10 @@ class DbSlice { return pt->Traverse(cursor, std::forward(cb)); } + // Does not check for non supported events. Callers must parse the string and reject it + // if it's not empty and not EX. + void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events); + private: void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key); void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 3e4724c45..7ea727f6c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -931,6 +931,25 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("table_growth_margin"); config_registry.RegisterMutable("tcp_keepalive"); + config_registry.RegisterMutable( + "notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) { + auto res = flag.TryGet(); + if (!res.has_value() || (!res->empty() && !absl::EqualsIgnoreCase(*res, "EX"))) { + return false; + } + + pool->AwaitBrief([&res](unsigned, auto*) { + auto* shard = EngineShard::tlocal(); + if (shard) { + auto shard_id = shard->shard_id(); + auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id); + db_slice.SetNotifyKeyspaceEvents(*res); + } + }); + + return true; + }); + serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size); uint32_t shard_num = GetFlag(FLAGS_num_shards); if (shard_num == 0 || shard_num > pp_.size()) { diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 4d9c124c2..83f5bab7f 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -411,20 +411,15 @@ async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_co await async_pool.disconnect() -@dfly_args({"notify_keyspace_events": "Ex"}) -async def test_keyspace_events(async_client: aioredis.Redis): - pclient = async_client.pubsub() - await pclient.subscribe("__keyevent@0__:expired") - +async def produce_expiring_keys(async_client: aioredis.Redis): keys = [] for i in range(10, 50): keys.append(f"k{i}") await async_client.set(keys[-1], "X", px=200 + i * 10) + return keys - # We don't support immediate expiration: - # keys += ['immediate'] - # await async_client.set(keys[-1], 'Y', exat=123) # expired 50 years ago +async def collect_expiring_events(pclient, keys): events = [] async for message in pclient.listen(): if message["type"] == "subscribe": @@ -433,10 +428,50 @@ async def test_keyspace_events(async_client: aioredis.Redis): events.append(message) if len(events) >= len(keys): break + return events + + +@dfly_args({"notify_keyspace_events": "Ex"}) +async def test_keyspace_events(async_client: aioredis.Redis): + pclient = async_client.pubsub() + await pclient.subscribe("__keyevent@0__:expired") + + keys = await produce_expiring_keys(async_client) + + # We don't support immediate expiration: + # keys += ['immediate'] + # await async_client.set(keys[-1], 'Y', exat=123) # expired 50 years ago + + events = await collect_expiring_events(pclient, keys) assert set(ev["data"] for ev in events) == set(keys) +async def test_keyspace_events_config_set(async_client: aioredis.Redis): + # nonsense does not make sense as argument, we only accept ex or empty string + with pytest.raises((ResponseError)): + await async_client.config_set("notify_keyspace_events", "nonsense") + + await async_client.config_set("notify_keyspace_events", "ex") + pclient = async_client.pubsub() + await pclient.subscribe("__keyevent@0__:expired") + + keys = await produce_expiring_keys(async_client) + + events = await collect_expiring_events(pclient, keys) + + assert set(ev["data"] for ev in events) == set(keys) + + keys = await produce_expiring_keys(async_client) + await async_client.config_set("notify_keyspace_events", "") + try: + async with async_timeout.timeout(1): + await collect_expiring_events(pclient, keys) + assert False + except: + pass + + async def test_big_command(df_server, size=8 * 1024): reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)