mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: allow config set notify_keyspace_events (#3790)
We do not allow notify_keyspace_events to be set at runtime via config set command. * allow notify_keyspace_events in config set command * add tests --------- Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
parent
5d64e1471a
commit
ed11c8d3a4
4 changed files with 70 additions and 8 deletions
|
@ -1454,6 +1454,10 @@ void DbSlice::ResetEvents() {
|
||||||
events_ = {};
|
events_ = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void DbSlice::SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events) {
|
||||||
|
expired_keys_events_recording_ = !notify_keyspace_events.empty();
|
||||||
|
}
|
||||||
|
|
||||||
void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
|
void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
|
||||||
if (client_tracking_map_.empty())
|
if (client_tracking_map_.empty())
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -511,6 +511,10 @@ class DbSlice {
|
||||||
return pt->Traverse(cursor, std::forward<Cb>(cb));
|
return pt->Traverse(cursor, std::forward<Cb>(cb));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Does not check for non supported events. Callers must parse the string and reject it
|
||||||
|
// if it's not empty and not EX.
|
||||||
|
void SetNotifyKeyspaceEvents(std::string_view notify_keyspace_events);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
|
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
|
||||||
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
|
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
|
||||||
|
|
|
@ -931,6 +931,25 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
|
||||||
config_registry.RegisterMutable("table_growth_margin");
|
config_registry.RegisterMutable("table_growth_margin");
|
||||||
config_registry.RegisterMutable("tcp_keepalive");
|
config_registry.RegisterMutable("tcp_keepalive");
|
||||||
|
|
||||||
|
config_registry.RegisterMutable(
|
||||||
|
"notify_keyspace_events", [pool = &pp_](const absl::CommandLineFlag& flag) {
|
||||||
|
auto res = flag.TryGet<std::string>();
|
||||||
|
if (!res.has_value() || (!res->empty() && !absl::EqualsIgnoreCase(*res, "EX"))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
pool->AwaitBrief([&res](unsigned, auto*) {
|
||||||
|
auto* shard = EngineShard::tlocal();
|
||||||
|
if (shard) {
|
||||||
|
auto shard_id = shard->shard_id();
|
||||||
|
auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id);
|
||||||
|
db_slice.SetNotifyKeyspaceEvents(*res);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
|
serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size);
|
||||||
uint32_t shard_num = GetFlag(FLAGS_num_shards);
|
uint32_t shard_num = GetFlag(FLAGS_num_shards);
|
||||||
if (shard_num == 0 || shard_num > pp_.size()) {
|
if (shard_num == 0 || shard_num > pp_.size()) {
|
||||||
|
|
|
@ -411,20 +411,15 @@ async def test_subscribers_with_active_publisher(df_server: DflyInstance, max_co
|
||||||
await async_pool.disconnect()
|
await async_pool.disconnect()
|
||||||
|
|
||||||
|
|
||||||
@dfly_args({"notify_keyspace_events": "Ex"})
|
async def produce_expiring_keys(async_client: aioredis.Redis):
|
||||||
async def test_keyspace_events(async_client: aioredis.Redis):
|
|
||||||
pclient = async_client.pubsub()
|
|
||||||
await pclient.subscribe("__keyevent@0__:expired")
|
|
||||||
|
|
||||||
keys = []
|
keys = []
|
||||||
for i in range(10, 50):
|
for i in range(10, 50):
|
||||||
keys.append(f"k{i}")
|
keys.append(f"k{i}")
|
||||||
await async_client.set(keys[-1], "X", px=200 + i * 10)
|
await async_client.set(keys[-1], "X", px=200 + i * 10)
|
||||||
|
return keys
|
||||||
|
|
||||||
# We don't support immediate expiration:
|
|
||||||
# keys += ['immediate']
|
|
||||||
# await async_client.set(keys[-1], 'Y', exat=123) # expired 50 years ago
|
|
||||||
|
|
||||||
|
async def collect_expiring_events(pclient, keys):
|
||||||
events = []
|
events = []
|
||||||
async for message in pclient.listen():
|
async for message in pclient.listen():
|
||||||
if message["type"] == "subscribe":
|
if message["type"] == "subscribe":
|
||||||
|
@ -433,10 +428,50 @@ async def test_keyspace_events(async_client: aioredis.Redis):
|
||||||
events.append(message)
|
events.append(message)
|
||||||
if len(events) >= len(keys):
|
if len(events) >= len(keys):
|
||||||
break
|
break
|
||||||
|
return events
|
||||||
|
|
||||||
|
|
||||||
|
@dfly_args({"notify_keyspace_events": "Ex"})
|
||||||
|
async def test_keyspace_events(async_client: aioredis.Redis):
|
||||||
|
pclient = async_client.pubsub()
|
||||||
|
await pclient.subscribe("__keyevent@0__:expired")
|
||||||
|
|
||||||
|
keys = await produce_expiring_keys(async_client)
|
||||||
|
|
||||||
|
# We don't support immediate expiration:
|
||||||
|
# keys += ['immediate']
|
||||||
|
# await async_client.set(keys[-1], 'Y', exat=123) # expired 50 years ago
|
||||||
|
|
||||||
|
events = await collect_expiring_events(pclient, keys)
|
||||||
|
|
||||||
assert set(ev["data"] for ev in events) == set(keys)
|
assert set(ev["data"] for ev in events) == set(keys)
|
||||||
|
|
||||||
|
|
||||||
|
async def test_keyspace_events_config_set(async_client: aioredis.Redis):
|
||||||
|
# nonsense does not make sense as argument, we only accept ex or empty string
|
||||||
|
with pytest.raises((ResponseError)):
|
||||||
|
await async_client.config_set("notify_keyspace_events", "nonsense")
|
||||||
|
|
||||||
|
await async_client.config_set("notify_keyspace_events", "ex")
|
||||||
|
pclient = async_client.pubsub()
|
||||||
|
await pclient.subscribe("__keyevent@0__:expired")
|
||||||
|
|
||||||
|
keys = await produce_expiring_keys(async_client)
|
||||||
|
|
||||||
|
events = await collect_expiring_events(pclient, keys)
|
||||||
|
|
||||||
|
assert set(ev["data"] for ev in events) == set(keys)
|
||||||
|
|
||||||
|
keys = await produce_expiring_keys(async_client)
|
||||||
|
await async_client.config_set("notify_keyspace_events", "")
|
||||||
|
try:
|
||||||
|
async with async_timeout.timeout(1):
|
||||||
|
await collect_expiring_events(pclient, keys)
|
||||||
|
assert False
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
async def test_big_command(df_server, size=8 * 1024):
|
async def test_big_command(df_server, size=8 * 1024):
|
||||||
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)
|
reader, writer = await asyncio.open_connection("127.0.0.1", df_server.port)
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue