chore: Hide replicas from CLUSTER subcmds in managed mode (#4174)

* chore: Hide replicas from `CLUSTER` subcmds in managed mode

Part of #4173 (see for context)

* server.client()
This commit is contained in:
Shahar Mike 2024-11-24 15:10:32 +02:00 committed by GitHub
parent e05363995f
commit 6a7f345bc5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 42 additions and 16 deletions

View file

@ -33,6 +33,9 @@ ABSL_FLAG(std::string, cluster_node_id, "",
"ID within a cluster, used for slot assignment. MUST be unique. If empty, uses master "
"replication ID (random string)");
ABSL_FLAG(bool, managed_service_info, false,
"Hides some implementation details from users when true (i.e. in managed service env)");
ABSL_DECLARE_FLAG(int32_t, port);
ABSL_DECLARE_FLAG(uint16_t, announce_port);
@ -122,10 +125,12 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
info.master = {.id = id_, .ip = preferred_endpoint, .port = preferred_port};
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
if (cntx->conn()->IsPrivileged() || !absl::GetFlag(FLAGS_managed_service_info)) {
for (const auto& replica : server_family_->GetDflyCmd()->GetReplicasRoleInfo()) {
info.replicas.push_back({.id = replica.id,
.ip = replica.address,
.port = static_cast<uint16_t>(replica.listening_port)});
}
}
} else {
// TODO: We currently don't save the master's ID in the replica

View file

@ -234,7 +234,7 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool:
info = answer[2]
assert len(info) == 3
ip_addr = str(info[0], "utf-8")
ip_addr = info[0]
assert is_local_host(ip_addr)
assert info[1] == port
@ -244,7 +244,7 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool:
replica = replicas[i - 3]
rep_info = answer[i]
assert len(rep_info) == 3
ip_addr = str(rep_info[0], "utf-8")
ip_addr = rep_info[0]
assert is_local_host(ip_addr)
assert rep_info[1] == replica.port
assert rep_info[2] == replica.id
@ -252,21 +252,21 @@ def verify_slots_result(port: int, answer: list, replicas) -> bool:
return True
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated"})
# --managed_service_info means that Dragonfly is running in a managed service, so some details
# are hidden from users, see https://github.com/dragonflydb/dragonfly/issues/4173
@dfly_args({"proactor_threads": 4, "cluster_mode": "emulated", "managed_service_info": "true"})
async def test_emulated_cluster_with_replicas(df_factory):
master = df_factory.create(port=BASE_PORT)
master = df_factory.create(port=BASE_PORT, admin_port=BASE_PORT + 1000)
replicas = [df_factory.create(port=BASE_PORT + i, logtostdout=True) for i in range(1, 3)]
df_factory.start_all([master, *replicas])
c_master = aioredis.Redis(port=master.port)
master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8")
c_master = master.client()
c_master_admin = master.admin_client()
master_id = await c_master.execute_command("CLUSTER MYID")
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
replica_ids = [
(await c_replica.execute_command("CLUSTER MYID")).decode("utf-8")
for c_replica in c_replicas
]
c_replicas = [replica.client() for replica in replicas]
replica_ids = [(await c_replica.execute_command("CLUSTER MYID")) for c_replica in c_replicas]
for replica, c_replica in zip(replicas, c_replicas):
res = await c_replica.execute_command("CLUSTER SLOTS")
@ -279,7 +279,7 @@ async def test_emulated_cluster_with_replicas(df_factory):
# Connect replicas to master
for replica, c_replica in zip(replicas, c_replicas):
rc = await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
assert str(rc, "utf-8") == "OK"
assert rc == "OK"
await asyncio.sleep(0.5)
@ -290,6 +290,13 @@ async def test_emulated_cluster_with_replicas(df_factory):
)
res = await c_master.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
replicas=[],
)
res = await c_master_admin.execute_command("CLUSTER SLOTS")
assert verify_slots_result(
port=master.port,
answer=res[0],
@ -308,6 +315,20 @@ async def test_emulated_cluster_with_replicas(df_factory):
"node_id": master_id,
"slots": [["0", "16383"]],
},
}
assert await c_master_admin.execute_command("CLUSTER NODES") == {
f"127.0.0.1:{master.port}": {
"connected": True,
"epoch": "0",
"flags": "myself,master",
"last_ping_sent": "0",
"last_pong_rcvd": "0",
"master_id": "-",
"migrations": [],
"node_id": master_id,
"slots": [["0", "16383"]],
},
f"127.0.0.1:{replicas[0].port}": {
"connected": True,
"epoch": "0",