mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
fix: forbid DFLYCLUSTER commads set for emulated cluster mode (#3307)
* fix: forbid DFLYCLUSTER commads set for emulated cluster mode * feat: add CLUSTER MYID and remove DFLYCLUSTER MYID * fix(test): __del__ method in python can't be async * fix: crash and test_replicate_disconnect_cluster
This commit is contained in:
parent
22756eeb81
commit
3891efac2c
5 changed files with 44 additions and 35 deletions
|
@ -369,8 +369,18 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return cntx->SendError(kClusterDisabled);
|
return cntx->SendError(kClusterDisabled);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (sub_cmd == "KEYSLOT") {
|
||||||
|
return KeySlot(args, cntx);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (args.size() > 1) {
|
||||||
|
return cntx->SendError(WrongNumArgsError(absl::StrCat("CLUSTER ", sub_cmd)));
|
||||||
|
}
|
||||||
|
|
||||||
if (sub_cmd == "HELP") {
|
if (sub_cmd == "HELP") {
|
||||||
return ClusterHelp(cntx);
|
return ClusterHelp(cntx);
|
||||||
|
} else if (sub_cmd == "MYID") {
|
||||||
|
return ClusterMyId(cntx);
|
||||||
} else if (sub_cmd == "SHARDS") {
|
} else if (sub_cmd == "SHARDS") {
|
||||||
return ClusterShards(cntx);
|
return ClusterShards(cntx);
|
||||||
} else if (sub_cmd == "SLOTS") {
|
} else if (sub_cmd == "SLOTS") {
|
||||||
|
@ -379,8 +389,6 @@ void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return ClusterNodes(cntx);
|
return ClusterNodes(cntx);
|
||||||
} else if (sub_cmd == "INFO") {
|
} else if (sub_cmd == "INFO") {
|
||||||
return ClusterInfo(cntx);
|
return ClusterInfo(cntx);
|
||||||
} else if (sub_cmd == "KEYSLOT") {
|
|
||||||
return KeySlot(args, cntx);
|
|
||||||
} else {
|
} else {
|
||||||
return cntx->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
|
return cntx->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
|
||||||
}
|
}
|
||||||
|
@ -401,11 +409,15 @@ void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
|
void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
|
||||||
if (!IsClusterEnabledOrEmulated()) {
|
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) {
|
||||||
return cntx->SendError(kClusterDisabled);
|
return cntx->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
|
||||||
}
|
}
|
||||||
|
|
||||||
VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;
|
if (cntx->conn()) {
|
||||||
|
VLOG(2) << "Got DFLYCLUSTER command (" << cntx->conn()->GetClientId() << "): " << args;
|
||||||
|
} else {
|
||||||
|
VLOG(2) << "Got DFLYCLUSTER command (NO_CLIENT_ID): " << args;
|
||||||
|
}
|
||||||
|
|
||||||
ToUpper(&args[0]);
|
ToUpper(&args[0]);
|
||||||
string_view sub_cmd = ArgS(args, 0);
|
string_view sub_cmd = ArgS(args, 0);
|
||||||
|
@ -414,8 +426,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return DflyClusterGetSlotInfo(args, cntx);
|
return DflyClusterGetSlotInfo(args, cntx);
|
||||||
} else if (sub_cmd == "CONFIG") {
|
} else if (sub_cmd == "CONFIG") {
|
||||||
return DflyClusterConfig(args, cntx);
|
return DflyClusterConfig(args, cntx);
|
||||||
} else if (sub_cmd == "MYID") {
|
|
||||||
return DflyClusterMyId(args, cntx);
|
|
||||||
} else if (sub_cmd == "FLUSHSLOTS") {
|
} else if (sub_cmd == "FLUSHSLOTS") {
|
||||||
return DflyClusterFlushSlots(args, cntx);
|
return DflyClusterFlushSlots(args, cntx);
|
||||||
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
|
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
|
||||||
|
@ -425,12 +435,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
|
||||||
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
|
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
|
||||||
}
|
}
|
||||||
|
|
||||||
void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
|
void ClusterFamily::ClusterMyId(ConnectionContext* cntx) {
|
||||||
if (!args.empty()) {
|
cntx->SendSimpleString(id_);
|
||||||
return cntx->SendError(WrongNumArgsError("DFLYCLUSTER MYID"));
|
|
||||||
}
|
|
||||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
|
||||||
rb->SendBulkString(id_);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
|
@ -46,6 +46,7 @@ class ClusterFamily {
|
||||||
void ClusterSlots(ConnectionContext* cntx);
|
void ClusterSlots(ConnectionContext* cntx);
|
||||||
void ClusterNodes(ConnectionContext* cntx);
|
void ClusterNodes(ConnectionContext* cntx);
|
||||||
void ClusterInfo(ConnectionContext* cntx);
|
void ClusterInfo(ConnectionContext* cntx);
|
||||||
|
void ClusterMyId(ConnectionContext* cntx);
|
||||||
|
|
||||||
void KeySlot(CmdArgList args, ConnectionContext* cntx);
|
void KeySlot(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
|
@ -56,7 +57,6 @@ class ClusterFamily {
|
||||||
void DflyCluster(CmdArgList args, ConnectionContext* cntx);
|
void DflyCluster(CmdArgList args, ConnectionContext* cntx);
|
||||||
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx);
|
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx);
|
||||||
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
|
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
|
||||||
void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx);
|
|
||||||
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
|
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
|
||||||
private: // Slots migration section
|
private: // Slots migration section
|
||||||
|
|
|
@ -34,7 +34,7 @@ class ClusterFamilyTest : public BaseFamilyTest {
|
||||||
static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration";
|
static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration";
|
||||||
|
|
||||||
string GetMyId() {
|
string GetMyId() {
|
||||||
return RunPrivileged({"dflycluster", "myid"}).GetString();
|
return Run({"cluster", "myid"}).GetString();
|
||||||
}
|
}
|
||||||
|
|
||||||
void ConfigSingleNodeCluster(string id) {
|
void ConfigSingleNodeCluster(string id) {
|
||||||
|
@ -735,8 +735,13 @@ TEST_F(ClusterFamilyEmulatedTest, ClusterSlots) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) {
|
TEST_F(ClusterFamilyEmulatedTest, ClusterNodes) {
|
||||||
EXPECT_THAT(Run({"cluster", "nodes"}),
|
auto res = Run({"cluster", "nodes"});
|
||||||
GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n");
|
EXPECT_THAT(res, GetMyId() + " fake-host:6379@6379 myself,master - 0 0 0 connected 0-16383\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(ClusterFamilyEmulatedTest, ForbidenCommands) {
|
||||||
|
auto res = Run({"DFLYCLUSTER", "GETSLOTINFO", "SLOTS", "1"});
|
||||||
|
EXPECT_THAT(res, ErrArg("Cluster is disabled. Use --cluster_mode=yes to enable."));
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
|
@ -98,14 +98,15 @@ class NodeInfo:
|
||||||
|
|
||||||
|
|
||||||
async def create_node_info(instance):
|
async def create_node_info(instance):
|
||||||
admin_client = instance.admin_client()
|
client = instance.client()
|
||||||
|
node_id = await get_node_id(client)
|
||||||
ninfo = NodeInfo(
|
ninfo = NodeInfo(
|
||||||
instance=instance,
|
instance=instance,
|
||||||
client=instance.client(),
|
client=client,
|
||||||
admin_client=admin_client,
|
admin_client=instance.admin_client(),
|
||||||
slots=[],
|
slots=[],
|
||||||
migrations=[],
|
migrations=[],
|
||||||
id=await get_node_id(admin_client),
|
id=node_id,
|
||||||
)
|
)
|
||||||
return ninfo
|
return ninfo
|
||||||
|
|
||||||
|
@ -169,8 +170,8 @@ def key_slot(key_str) -> int:
|
||||||
return crc_hqx(key, 0) % 16384
|
return crc_hqx(key, 0) % 16384
|
||||||
|
|
||||||
|
|
||||||
async def get_node_id(admin_connection):
|
async def get_node_id(connection):
|
||||||
id = await admin_connection.execute_command("DFLYCLUSTER MYID")
|
id = await connection.execute_command("CLUSTER MYID")
|
||||||
assert isinstance(id, str)
|
assert isinstance(id, str)
|
||||||
return id
|
return id
|
||||||
|
|
||||||
|
@ -257,11 +258,11 @@ async def test_emulated_cluster_with_replicas(df_factory):
|
||||||
df_factory.start_all([master, *replicas])
|
df_factory.start_all([master, *replicas])
|
||||||
|
|
||||||
c_master = aioredis.Redis(port=master.port)
|
c_master = aioredis.Redis(port=master.port)
|
||||||
master_id = (await c_master.execute_command("dflycluster myid")).decode("utf-8")
|
master_id = (await c_master.execute_command("CLUSTER MYID")).decode("utf-8")
|
||||||
|
|
||||||
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
|
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
|
||||||
replica_ids = [
|
replica_ids = [
|
||||||
(await c_replica.execute_command("dflycluster myid")).decode("utf-8")
|
(await c_replica.execute_command("CLUSTER MYID")).decode("utf-8")
|
||||||
for c_replica in c_replicas
|
for c_replica in c_replicas
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -403,7 +404,7 @@ async def test_cluster_slot_ownership_changes(df_factory: DflyInstanceFactory):
|
||||||
c_nodes = [node.client() for node in nodes]
|
c_nodes = [node.client() for node in nodes]
|
||||||
c_nodes_admin = [node.admin_client() for node in nodes]
|
c_nodes_admin = [node.admin_client() for node in nodes]
|
||||||
|
|
||||||
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
|
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes))
|
||||||
|
|
||||||
config = f"""
|
config = f"""
|
||||||
[
|
[
|
||||||
|
@ -529,8 +530,8 @@ async def test_cluster_replica_sets_non_owned_keys(df_factory: DflyInstanceFacto
|
||||||
df_factory.start_all([master, replica])
|
df_factory.start_all([master, replica])
|
||||||
|
|
||||||
async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
|
async with master.client() as c_master, master.admin_client() as c_master_admin, replica.client() as c_replica, replica.admin_client() as c_replica_admin:
|
||||||
master_id = await get_node_id(c_master_admin)
|
master_id = await get_node_id(c_master)
|
||||||
replica_id = await get_node_id(c_replica_admin)
|
replica_id = await get_node_id(c_replica)
|
||||||
|
|
||||||
config = f"""
|
config = f"""
|
||||||
[
|
[
|
||||||
|
@ -640,11 +641,11 @@ async def test_cluster_flush_slots_after_config_change(df_factory: DflyInstanceF
|
||||||
|
|
||||||
c_master = master.client()
|
c_master = master.client()
|
||||||
c_master_admin = master.admin_client()
|
c_master_admin = master.admin_client()
|
||||||
master_id = await get_node_id(c_master_admin)
|
master_id = await get_node_id(c_master)
|
||||||
|
|
||||||
c_replica = replica.client()
|
c_replica = replica.client()
|
||||||
c_replica_admin = replica.admin_client()
|
c_replica_admin = replica.admin_client()
|
||||||
replica_id = await get_node_id(c_replica_admin)
|
replica_id = await get_node_id(c_replica)
|
||||||
|
|
||||||
config = f"""
|
config = f"""
|
||||||
[
|
[
|
||||||
|
@ -748,7 +749,7 @@ async def test_cluster_blocking_command(df_server):
|
||||||
config = [
|
config = [
|
||||||
{
|
{
|
||||||
"slot_ranges": [{"start": 0, "end": 8000}],
|
"slot_ranges": [{"start": 0, "end": 8000}],
|
||||||
"master": {"id": await get_node_id(c_master_admin), "ip": "10.0.0.1", "port": 7000},
|
"master": {"id": await get_node_id(c_master), "ip": "10.0.0.1", "port": 7000},
|
||||||
"replicas": [],
|
"replicas": [],
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
@ -820,7 +821,7 @@ async def test_cluster_native_client(
|
||||||
c_replicas = [replica.client() for replica in replicas]
|
c_replicas = [replica.client() for replica in replicas]
|
||||||
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
|
await asyncio.gather(*(wait_available_async(c) for c in c_replicas))
|
||||||
c_replicas_admin = [replica.admin_client() for replica in replicas]
|
c_replicas_admin = [replica.admin_client() for replica in replicas]
|
||||||
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas_admin))
|
replica_ids = await asyncio.gather(*(get_node_id(c) for c in c_replicas))
|
||||||
|
|
||||||
config = f"""
|
config = f"""
|
||||||
[
|
[
|
||||||
|
|
|
@ -11,9 +11,6 @@ class Proxy:
|
||||||
self.stop_connections = []
|
self.stop_connections = []
|
||||||
self.server = None
|
self.server = None
|
||||||
|
|
||||||
async def __del__(self):
|
|
||||||
await self.close()
|
|
||||||
|
|
||||||
async def handle(self, reader, writer):
|
async def handle(self, reader, writer):
|
||||||
remote_reader, remote_writer = await asyncio.open_connection(
|
remote_reader, remote_writer = await asyncio.open_connection(
|
||||||
self.remote_host, self.remote_port
|
self.remote_host, self.remote_port
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue