From 3891efac2c68ad6b92ae571d323f6bea6e946562 Mon Sep 17 00:00:00 2001 From: Borys Date: Tue, 16 Jul 2024 14:17:28 +0300 Subject: [PATCH] fix: forbid DFLYCLUSTER commads set for emulated cluster mode (#3307) * fix: forbid DFLYCLUSTER commads set for emulated cluster mode * feat: add CLUSTER MYID and remove DFLYCLUSTER MYID * fix(test): __del__ method in python can't be async * fix: crash and test_replicate_disconnect_cluster --- src/server/cluster/cluster_family.cc | 32 ++++++++++++++--------- src/server/cluster/cluster_family.h | 2 +- src/server/cluster/cluster_family_test.cc | 11 +++++--- tests/dragonfly/cluster_test.py | 31 +++++++++++----------- tests/dragonfly/proxy.py | 3 --- 5 files changed, 44 insertions(+), 35 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 633bddcce..a5312f51d 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -369,8 +369,18 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(kClusterDisabled); } + if (sub_cmd == "KEYSLOT") { + return KeySlot(args, cntx); + } + + if (args.size() > 1) { + return cntx->SendError(WrongNumArgsError(absl::StrCat("CLUSTER ", sub_cmd))); + } + if (sub_cmd == "HELP") { return ClusterHelp(cntx); + } else if (sub_cmd == "MYID") { + return ClusterMyId(cntx); } else if (sub_cmd == "SHARDS") { return ClusterShards(cntx); } else if (sub_cmd == "SLOTS") { @@ -379,8 +389,6 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) { return ClusterNodes(cntx); } else if (sub_cmd == "INFO") { return ClusterInfo(cntx); - } else if (sub_cmd == "KEYSLOT") { - return KeySlot(args, cntx); } else { return cntx->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType); } @@ -401,11 +409,15 @@ void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) { } void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { - if (!IsClusterEnabledOrEmulated()) { - return cntx->SendError(kClusterDisabled); + if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) { + return cntx->SendError("Cluster is disabled. Use --cluster_mode=yes to enable."); } - VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args; + if (cntx->conn()) { + VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args; + } else { + VLOG(2) << "Got DFLYCLUSTER command (NO_CLIENT_ID): " << args; + } ToUpper(&args[0]); string_view sub_cmd = ArgS(args, 0); @@ -414,8 +426,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return DflyClusterGetSlotInfo(args, cntx); } else if (sub_cmd == "CONFIG") { return DflyClusterConfig(args, cntx); - } else if (sub_cmd == "MYID") { - return DflyClusterMyId(args, cntx); } else if (sub_cmd == "FLUSHSLOTS") { return DflyClusterFlushSlots(args, cntx); } else if (sub_cmd == "SLOT-MIGRATION-STATUS") { @@ -425,12 +435,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); } -void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) { - if (!args.empty()) { - return cntx->SendError(WrongNumArgsError("DFLYCLUSTER MYID")); - } - auto* rb = static_cast(cntx->reply_builder()); - rb->SendBulkString(id_); +void ClusterFamily::ClusterMyId(ConnectionContext* cntx) { + cntx->SendSimpleString(id_); } namespace { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 7e2bd4a1e..1b563d5fa 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -46,6 +46,7 @@ class ClusterFamily { void ClusterSlots(ConnectionContext* cntx); void ClusterNodes(ConnectionContext* cntx); void ClusterInfo(ConnectionContext* cntx); + void ClusterMyId(ConnectionContext* cntx); void KeySlot(CmdArgList args, ConnectionContext* cntx); @@ -56,7 +57,6 @@ class ClusterFamily { void DflyCluster(CmdArgList args, ConnectionContext* cntx); void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx); void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); - void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); private: // Slots migration section diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index 868ecea6a..11224d035 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -34,7 +34,7 @@ class ClusterFamilyTest : public BaseFamilyTest { static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration"; string GetMyId() { - return RunPrivileged({"dflycluster", "myid"}).GetString(); + return Run({"cluster", "myid"}).GetString(); } void ConfigSingleNodeCluster(string id) { @@ -735,8 +735,13 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterSlots) { } TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) { - EXPECT_THAT(Run({"cluster", "nodes"}), - GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n"); + auto res = Run({"cluster", "nodes"}); + EXPECT_THAT(res, GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n"); +} + +TEST_F(ClusterFamilyEmulatedTest, ForbidenCommands) { + auto res = Run({"DFLYCLUSTER", "GETSLOTINFO", "SLOTS", "1"}); + EXPECT_THAT(res, ErrArg("Cluster is disabled. Use --cluster_mode=yes to enable.")); } } // namespace diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 42e6507ae..a2a2db48c 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -98,14 +98,15 @@ class NodeInfo: async def create_node_info(instance): - admin_client = instance.admin_client() + client = instance.client() + node_id = await get_node_id(client) ninfo = NodeInfo( instance=instance, - client=instance.client(), - admin_client=admin_client, + client=client, + admin_client=instance.admin_client(), slots=[], migrations=[], - id=await get_node_id(admin_client), + id=node_id, ) return ninfo @@ -169,8 +170,8 @@ def key_slot(key_str) -> int: return crc_hqx(key, 0) % 16384 -async def get_node_id(admin_connection): - id = await admin_connection.execute_command("DFLYCLUSTER MYID") +async def get_node_id(connection): + id = await connection.execute_command("CLUSTER MYID") assert isinstance(id, str) return id @@ -257,11 +258,11 @@ async def test_emulated_cluster_with_replicas(df_factory): df_factory.start_all([master, *replicas]) c_master = aioredis.Redis(port=master.port) - master_id = (await c_master.execute_command("dflycluster myid")).decode("utf-8") + master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8") c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] replica_ids = [ - (await c_replica.execute_command("dflycluster myid")).decode("utf-8") + (await c_replica.execute_command("CLUSTER MYID")).decode("utf-8") for c_replica in c_replicas ] @@ -403,7 +404,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory): c_nodes = [node.client() for node in nodes] c_nodes_admin = [node.admin_client() for node in nodes] - node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes)) config = f""" [ @@ -529,8 +530,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto df_factory.start_all([master, replica]) async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin: - master_id = await get_node_id(c_master_admin) - replica_id = await get_node_id(c_replica_admin) + master_id = await get_node_id(c_master) + replica_id = await get_node_id(c_replica) config = f""" [ @@ -640,11 +641,11 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF c_master = master.client() c_master_admin = master.admin_client() - master_id = await get_node_id(c_master_admin) + master_id = await get_node_id(c_master) c_replica = replica.client() c_replica_admin = replica.admin_client() - replica_id = await get_node_id(c_replica_admin) + replica_id = await get_node_id(c_replica) config = f""" [ @@ -748,7 +749,7 @@ async def test_cluster_blocking_command(df_server): config = [ { "slot_ranges": [{"start": 0, "end": 8000}], - "master": {"id": await get_node_id(c_master_admin), "ip": "10.0.0.1", "port": 7000}, + "master": {"id": await get_node_id(c_master), "ip": "10.0.0.1", "port": 7000}, "replicas": [], }, { @@ -820,7 +821,7 @@ async def test_cluster_native_client( c_replicas = [replica.client() for replica in replicas] await asyncio.gather(*(wait_available_async(c) for c in c_replicas)) c_replicas_admin = [replica.admin_client() for replica in replicas] - replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin)) + replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas)) config = f""" [ diff --git a/tests/dragonfly/proxy.py b/tests/dragonfly/proxy.py index 5ec850a08..fc81b4f63 100644 --- a/tests/dragonfly/proxy.py +++ b/tests/dragonfly/proxy.py @@ -11,9 +11,6 @@ class Proxy: self.stop_connections = [] self.server = None - async def __del__(self): - await self.close() - async def handle(self, reader, writer): remote_reader, remote_writer = await asyncio.open_connection( self.remote_host, self.remote_port