mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(cluster_mgr): Add support for remote Dragonfly servers (#2671)
* WIP: `cluster_mgr.py` to work with remote targets * Documentation * No admin port * Support different hostname move/migrate * Fix migrate bug * Fix typo in --help * fix test * self.update_id()
This commit is contained in:
parent
7e4527098b
commit
54cb7d5cd0
3 changed files with 140 additions and 87 deletions
|
@ -42,7 +42,6 @@ constexpr char kIdNotFound[] = "syncid not found";
|
|||
|
||||
constexpr string_view kClusterDisabled =
|
||||
"Cluster is disabled. Enabled via passing --cluster_mode=emulated|yes";
|
||||
constexpr string_view kDflyClusterCmdPort = "DflyCluster command allowed only under admin port";
|
||||
|
||||
thread_local shared_ptr<ClusterConfig> tl_cluster_config;
|
||||
|
||||
|
@ -383,10 +382,6 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
|
|||
return cntx->SendError(kClusterDisabled);
|
||||
}
|
||||
|
||||
if (cntx->conn() && !cntx->conn()->IsPrivileged()) {
|
||||
return cntx->SendError(kDflyClusterCmdPort);
|
||||
}
|
||||
|
||||
ToUpper(&args[0]);
|
||||
string_view sub_cmd = ArgS(args, 0);
|
||||
args.remove_prefix(1); // remove subcommand name
|
||||
|
|
|
@ -38,29 +38,6 @@ class ClusterFamilyTest : public BaseFamilyTest {
|
|||
}
|
||||
};
|
||||
|
||||
TEST_F(ClusterFamilyTest, DflyClusterOnlyOnAdminPort) {
|
||||
string config = R"json(
|
||||
[
|
||||
{
|
||||
"slot_ranges": [
|
||||
{
|
||||
"start": 0,
|
||||
"end": 16383
|
||||
}
|
||||
],
|
||||
"master": {
|
||||
"id": "abcd1234",
|
||||
"ip": "10.0.0.1",
|
||||
"port": 7000
|
||||
},
|
||||
"replicas": []
|
||||
}
|
||||
])json";
|
||||
EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK");
|
||||
EXPECT_THAT(Run({"dflycluster", "config", config}),
|
||||
ErrArg("DflyCluster command allowed only under admin port"));
|
||||
}
|
||||
|
||||
TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) {
|
||||
EXPECT_THAT(RunPrivileged({"dflycluster", "config", "invalid JSON"}),
|
||||
ErrArg("Invalid JSON cluster config"));
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
#!/usr/bin/env python3
|
||||
import argparse
|
||||
from argparse import RawTextHelpFormatter
|
||||
import json
|
||||
import math
|
||||
import redis
|
||||
|
@ -12,15 +13,19 @@ To install: pip install -r requirements.txt
|
|||
|
||||
|
||||
class Node:
|
||||
def __init__(self, port):
|
||||
def __init__(self, host, port):
|
||||
self.id = ""
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.admin_port = port + 10_000
|
||||
|
||||
def update_id(node):
|
||||
node.id = send_command(node, ["dflycluster", "myid"])
|
||||
print(f"- ID {node.id}")
|
||||
|
||||
|
||||
class Master:
|
||||
def __init__(self, port):
|
||||
self.node = Node(port)
|
||||
def __init__(self, host, port):
|
||||
self.node = Node(host, port)
|
||||
self.replicas = []
|
||||
|
||||
|
||||
|
@ -31,7 +36,6 @@ def start_node(node, threads):
|
|||
[
|
||||
"../build-opt/dragonfly",
|
||||
f"--port={node.port}",
|
||||
f"--admin_port={node.admin_port}",
|
||||
"--cluster_mode=yes",
|
||||
f"--proactor_threads={threads}",
|
||||
"--dbfilename=",
|
||||
|
@ -42,8 +46,8 @@ def start_node(node, threads):
|
|||
)
|
||||
|
||||
|
||||
def send_command(node, command):
|
||||
client = redis.Redis(decode_responses=True, host="localhost", port=node.admin_port)
|
||||
def send_command(node, command, print_errors=True):
|
||||
client = redis.Redis(decode_responses=True, host=node.host, port=node.port)
|
||||
|
||||
for i in range(0, 5):
|
||||
try:
|
||||
|
@ -51,16 +55,14 @@ def send_command(node, command):
|
|||
client.close()
|
||||
return result
|
||||
except Exception as e:
|
||||
print(e)
|
||||
if print_errors:
|
||||
print(e)
|
||||
time.sleep(0.1 * i)
|
||||
|
||||
print(f"Unable to run command {command} against localhost:{node.admin_port} after 5 attempts!")
|
||||
if print_errors:
|
||||
print(f"Unable to run command {command} against {node.host}:{node.port} after 5 attempts!")
|
||||
|
||||
|
||||
def update_id(node):
|
||||
id = send_command(node, ["dflycluster", "myid"])
|
||||
node.id = id
|
||||
print(f"- ID for {node.port}: {id}")
|
||||
return Exception()
|
||||
|
||||
|
||||
def build_config_from_list(masters):
|
||||
|
@ -68,7 +70,7 @@ def build_config_from_list(masters):
|
|||
slots_per_node = math.floor(total_slots / len(masters))
|
||||
|
||||
def build_node(node):
|
||||
return {"id": node.id, "ip": "localhost", "port": node.port}
|
||||
return {"id": node.id, "ip": node.host, "port": node.port}
|
||||
|
||||
config = []
|
||||
for i, master in enumerate(masters):
|
||||
|
@ -87,9 +89,11 @@ def build_config_from_list(masters):
|
|||
def get_nodes_from_config(config):
|
||||
nodes = []
|
||||
for shard in config:
|
||||
nodes.append(Node(shard["master"]["port"]))
|
||||
nodes.append(Node(shard["master"]["ip"], shard["master"]["port"]))
|
||||
for replica in shard["replicas"]:
|
||||
nodes.append(Node(replica["port"]))
|
||||
nodes.append(Node(replica["ip"], replica["port"]))
|
||||
for node in nodes:
|
||||
node.update_id()
|
||||
return nodes
|
||||
|
||||
|
||||
|
@ -103,7 +107,7 @@ def push_config(config):
|
|||
push_to_node(node, config)
|
||||
|
||||
|
||||
def create(args):
|
||||
def create_locally(args):
|
||||
print(f"Setting up a Dragonfly cluster:")
|
||||
print(f"- Master nodes: {args.num_masters}")
|
||||
print(f"- Ports: {args.first_port}...{args.first_port + args.num_masters - 1}")
|
||||
|
@ -113,10 +117,10 @@ def create(args):
|
|||
next_port = args.first_port
|
||||
masters = []
|
||||
for i in range(args.num_masters):
|
||||
master = Master(next_port)
|
||||
master = Master("localhost", next_port)
|
||||
next_port += 1
|
||||
for j in range(args.replicas_per_master):
|
||||
replica = Node(next_port)
|
||||
replica = Node("localhost", next_port)
|
||||
master.replicas.append(replica)
|
||||
next_port += 1
|
||||
masters.append(master)
|
||||
|
@ -136,13 +140,13 @@ def create(args):
|
|||
print("Configuring replication...")
|
||||
for master in masters:
|
||||
for replica in master.replicas:
|
||||
response = send_command(replica, ["replicaof", "localhost", master.node.port])
|
||||
response = send_command(replica, ["replicaof", master.node.host, master.node.port])
|
||||
print(f"- {replica.port} replicating {master.node.port}: {response}")
|
||||
print()
|
||||
|
||||
print(f"Getting IDs...")
|
||||
for n in nodes:
|
||||
update_id(n)
|
||||
n.update_id()
|
||||
print()
|
||||
|
||||
config = build_config_from_list(masters)
|
||||
|
@ -151,6 +155,25 @@ def create(args):
|
|||
print()
|
||||
|
||||
|
||||
def config_single_remote(args):
|
||||
print(
|
||||
f"Configuring remote Dragonfly {args.target_host}:{args.target_port} to be a single-server cluster"
|
||||
)
|
||||
|
||||
master = Master(args.target_host, args.target_port)
|
||||
master.node.update_id()
|
||||
|
||||
test = send_command(master.node, ["get", "x"], print_errors=False)
|
||||
if type(test) is not Exception:
|
||||
print("Node either not found or already configured")
|
||||
exit(-1)
|
||||
|
||||
config = build_config_from_list([master])
|
||||
print(f"Pushing config:\n{config}\n")
|
||||
push_config(config)
|
||||
print()
|
||||
|
||||
|
||||
def build_config_from_existing(args):
|
||||
def list_to_dict(l):
|
||||
return {l[i]: l[i + 1] for i in range(0, len(l), 2)}
|
||||
|
@ -165,7 +188,7 @@ def build_config_from_existing(args):
|
|||
slots.append({"start": slot_list[i], "end": slot_list[i + 1]})
|
||||
return slots
|
||||
|
||||
client = redis.Redis(decode_responses=True, host="localhost", port=args.first_port)
|
||||
client = redis.Redis(decode_responses=True, host=args.target_host, port=args.target_port)
|
||||
existing = client.execute_command("cluster", "shards")
|
||||
config = []
|
||||
for shard_list in existing:
|
||||
|
@ -180,10 +203,10 @@ def build_config_from_existing(args):
|
|||
return config
|
||||
|
||||
|
||||
def find_node(config, port):
|
||||
def find_node(config, host, port):
|
||||
new_owner = None
|
||||
for shard in config:
|
||||
if shard["master"]["port"] == port:
|
||||
if shard["master"]["ip"] == host and shard["master"]["port"] == port:
|
||||
new_owner = shard
|
||||
break
|
||||
else:
|
||||
|
@ -192,9 +215,22 @@ def find_node(config, port):
|
|||
return new_owner
|
||||
|
||||
|
||||
def attach(args):
|
||||
print(f"Attaching remote Dragonfly {args.attach_host}:{args.attach_port} to cluster")
|
||||
newcomer = Master(args.attach_host, args.attach_port)
|
||||
newcomer.node.update_id()
|
||||
|
||||
newcomer_config = build_config_from_list([newcomer])
|
||||
newcomer_config[0]["slot_ranges"] = []
|
||||
config = build_config_from_existing(args)
|
||||
print(f"Pushing config:\n{config}\n")
|
||||
push_config([*config, newcomer_config[0]])
|
||||
print()
|
||||
|
||||
|
||||
def move(args):
|
||||
config = build_config_from_existing(args)
|
||||
new_owner = find_node(config, args.target_port)
|
||||
new_owner = find_node(config, args.target_host, args.target_port)
|
||||
|
||||
def remove_slot(slot, from_range, from_shard):
|
||||
if from_range["start"] == slot:
|
||||
|
@ -271,21 +307,23 @@ def move(args):
|
|||
|
||||
def migrate(args):
|
||||
config = build_config_from_existing(args)
|
||||
target = find_node(config, args.target_port)
|
||||
target_node = Node(target["master"]["port"])
|
||||
target = find_node(config, args.target_host, args.target_port)
|
||||
target_node = Node(target["master"]["ip"], target["master"]["port"])
|
||||
target_node.update_id()
|
||||
|
||||
# Find source node
|
||||
source = None
|
||||
for node in config:
|
||||
slots = node["slot_ranges"]
|
||||
for slot in slots:
|
||||
if slot["start"] >= args.slot_start and slot["end"] <= args.slot_end:
|
||||
if slot["start"] <= args.slot_start and slot["end"] >= args.slot_end:
|
||||
source = node
|
||||
break
|
||||
if source == None:
|
||||
print("Unsupported slot range migration (currently only 1-node migration supported)")
|
||||
exit(-1)
|
||||
source_node = Node(source["master"]["port"])
|
||||
source_node = Node(source["master"]["ip"], source["master"]["port"])
|
||||
source_node.update_id()
|
||||
|
||||
# do migration
|
||||
sync_id = send_command(
|
||||
|
@ -293,8 +331,8 @@ def migrate(args):
|
|||
[
|
||||
"DFLYCLUSTER",
|
||||
"START-SLOT-MIGRATION",
|
||||
"127.0.0.1",
|
||||
source["master"]["port"] + 10_000,
|
||||
source_node.host,
|
||||
source_node.port,
|
||||
args.slot_start,
|
||||
args.slot_end,
|
||||
],
|
||||
|
@ -330,30 +368,60 @@ def shutdown(args):
|
|||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="""
|
||||
Local Cluster Manager
|
||||
Dragonfly Manual Cluster Manager
|
||||
|
||||
This tool helps managing a Dragonfly cluster manually.
|
||||
Cluster can either be local or remote:
|
||||
- Starting Dragonfly instances must be done locally, binary is assumed to be under ../build-opt
|
||||
- Remote Dragonflies must already be started, and initialized with `--cluster_mode=yes`
|
||||
|
||||
Example usage:
|
||||
|
||||
Create a 3+3 nodes cluster:
|
||||
./cluster_mgr.py --action=create --num_masters=3 --replicas_per_master=1
|
||||
Create a 3 node cluster locally:
|
||||
./cluster_mgr.py --action=create_locally --num_masters=3
|
||||
This will create 3 Dragonfly processes with ports 7001-7003.
|
||||
Ports can be overridden with `--first_port`.
|
||||
|
||||
Connect to cluster and print current config:
|
||||
./cluster_mgr.py --action=print
|
||||
Create a 6 node cluster locally, 3 of them masters with 1 replica each:
|
||||
./cluster_mgr.py --action=create_locally --num_masters=3 --replicas_per_master=1
|
||||
|
||||
Connect to existing cluster and print current config:
|
||||
./cluster_mgr.py --action=print_config
|
||||
This will connect to localhost:6379 by default. Override with `--target_host` and `--target_port`
|
||||
|
||||
Configure an existing Dragonfly server to be a standalone cluster (owning all slots):
|
||||
./cluster_mgr.py --action=config_single_remote
|
||||
This connects to an *existing* Dragonfly server, and pushes a config telling it to own all slots.
|
||||
This will connect to localhost:6379 by default. Override with `--target_host` and `--target_port`
|
||||
|
||||
Attach an existing Dragonfly server to an existing cluster (owning no slots):
|
||||
./cluster_mgr.py --action=attach --attach_host=HOST --attach_port=PORT
|
||||
This will connect to existing cluster present at localhost:6379 by default. Override with
|
||||
`--target_host` and `--target_port`
|
||||
|
||||
To set up a new cluster - start the servers and then use
|
||||
./cluster_mgr.py --action=config_single_remote ...
|
||||
./cluster_mgr.py --action=attach ...
|
||||
And repeat `--action=attach` for all servers.
|
||||
Afterwards, distribute the slots between the servers as desired with `--action=move` or
|
||||
`--action=migrate`
|
||||
|
||||
Connect to cluster and move slots 10-20 to target:
|
||||
./cluster_mgr.py --action=move --slot_start=10 --slot_end=20 --target_host=X --target_port=X
|
||||
|
||||
Migrate slots 10-20 to target:
|
||||
./cluster_mgr.py --action=migrate --slot_start=10 --slot_end=20 --target_host=X --target_port=X
|
||||
|
||||
Connect to cluster and shutdown all nodes:
|
||||
./cluster_mgr.py --action=shutdown
|
||||
|
||||
Connect to cluster and move slots 10-20 to master with port 7002:
|
||||
./cluster_mgr.py --action=move --slot_start=10 --slot_end=20 --new_owner=7002
|
||||
|
||||
Migrate slots 10-20 to master with port 7002
|
||||
./cluster_mgr.py --action=migrate --slot_start=10 --slot_end=20 --new_owner=7002
|
||||
"""
|
||||
WARNING: Be careful! This will close all Dragonfly servers connected to the cluster.
|
||||
""",
|
||||
formatter_class=RawTextHelpFormatter,
|
||||
)
|
||||
parser.add_argument(
|
||||
"--action",
|
||||
default="",
|
||||
help="Which action to take? create: start a new instance, move=move slots",
|
||||
help="Which action to take? See `--help`",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--num_masters", type=int, default=3, help="Number of master nodes in cluster"
|
||||
|
@ -363,28 +431,41 @@ Migrate slots 10-20 to master with port 7002
|
|||
)
|
||||
parser.add_argument("--first_port", type=int, default=7001, help="First master's port")
|
||||
parser.add_argument("--threads", type=int, default=2, help="Threads per node")
|
||||
parser.add_argument("--slot_start", type=int, default=0, help="First slot to move (inclusive)")
|
||||
parser.add_argument("--slot_end", type=int, default=100, help="Last slot to move (inclusive)")
|
||||
parser.add_argument(
|
||||
"--target_port",
|
||||
type=int,
|
||||
default=0,
|
||||
help="Master port to take ownership over slots in range " "[--slot_start, --slot_end]",
|
||||
"--slot_start", type=int, default=0, help="First slot to move / migrate (inclusive)"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--slot_end", type=int, default=100, help="Last slot to move / migrate (inclusive)"
|
||||
)
|
||||
parser.add_argument("--target_host", default="localhost", help="Master host/ip")
|
||||
parser.add_argument("--target_port", type=int, default=6379, help="Master port")
|
||||
parser.add_argument(
|
||||
"--attach_host", default="localhost", help="New cluster node master host/ip"
|
||||
)
|
||||
parser.add_argument(
|
||||
"--attach_port", type=int, default=6379, help="New cluster node master port"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
actions = {
|
||||
"create": create,
|
||||
"shutdown": shutdown,
|
||||
"move": move,
|
||||
"print": print_config,
|
||||
"migrate": migrate,
|
||||
}
|
||||
actions = dict(
|
||||
[
|
||||
(f.__name__, f)
|
||||
for f in [
|
||||
create_locally,
|
||||
shutdown,
|
||||
config_single_remote,
|
||||
attach,
|
||||
move,
|
||||
print_config,
|
||||
migrate,
|
||||
]
|
||||
]
|
||||
)
|
||||
action = actions.get(args.action.lower())
|
||||
if action:
|
||||
action(args)
|
||||
else:
|
||||
print(f'Error - unknown action "{args.action}"')
|
||||
print(f'Error - unknown action "{args.action}". See --help')
|
||||
exit(-1)
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue