diff --git a/tools/cluster_mgr.py b/tools/cluster_mgr.py index 4cfa3b049..4530d8902 100755 --- a/tools/cluster_mgr.py +++ b/tools/cluster_mgr.py @@ -1,9 +1,10 @@ #!/usr/bin/env python3 + import argparse from argparse import RawTextHelpFormatter import json import math -from typing import Iterable +from typing import Iterable, List import redis import subprocess import time @@ -28,10 +29,16 @@ class Node: node.id = send_command(node, ["cluster", "myid"]) print(f"- ID {node.id}") + def __repr__(self): + return f"{self.host}:{self.port}/{self.id}" -class Master: + def to_dict(self): + return {"id": self.id, "ip": self.host, "port": self.port} + + +class Master(Node): def __init__(self, host, port): - self.node = Node(host, port) + Node.__init__(self, host, port) self.replicas = [] @@ -89,7 +96,7 @@ class SlotRange: def __repr__(self): return f"({self.start}-{self.end})" - def merge(self, other): + def merge(self, other: "SlotRange"): if self.end + 1 == other.start: self.end = other.end return True @@ -101,7 +108,7 @@ class SlotRange: def contains(self, slot_id): return self.start <= slot_id <= self.end - def split(self, slot_id): + def remove(self, slot_id): assert self.contains(slot_id) if self.start < self.end: @@ -117,16 +124,12 @@ class SlotRange: # Custom JSON encoder to handle SlotRange objects class ClusterConfigEncoder(json.JSONEncoder): def default(self, obj): - if isinstance(obj, SlotRange): + if isinstance(obj, SlotRange) or isinstance(obj, Node): return obj.to_dict() return super().default(obj) -def build_node(node): - return {"id": node.id, "ip": node.host, "port": node.port} - - -def build_config_from_list(masters): +def build_config_from_list(masters: List[Master]): total_slots = 16384 slots_per_node = math.floor(total_slots / len(masters)) @@ -135,8 +138,8 @@ def build_config_from_list(masters): slot_range = SlotRange(i * slots_per_node, (i + 1) * slots_per_node - 1) c = { "slot_ranges": [slot_range], - "master": build_node(master.node), - "replicas": [build_node(replica) for replica in master.replicas], + "master": master, + "replicas": master.replicas, } config.append(c) @@ -148,9 +151,10 @@ def build_config_from_list(masters): def get_nodes_from_config(config): nodes = [] for shard in config: - nodes.append(Node(shard["master"]["ip"], shard["master"]["port"])) + nodes.append(shard["master"]) for replica in shard["replicas"]: - nodes.append(Node(replica["ip"], replica["port"])) + nodes.append(replica) + for node in nodes: node.update_id() return nodes @@ -187,7 +191,7 @@ def create_locally(args): nodes = [] for master in masters: - nodes.append(master.node) + nodes.append(master) for replica in master.replicas: nodes.append(replica) @@ -195,13 +199,14 @@ def create_locally(args): for node in nodes: start_node(node, args.dragonfly_bin, args.threads) print() + time.sleep(0.5) if args.replicas_per_master > 0: print("Configuring replication...") for master in masters: for replica in master.replicas: - response = send_command(replica, ["replicaof", master.node.host, master.node.port]) - print(f"- {replica.port} replicating {master.node.port}: {response}") + response = send_command(replica, ["replicaof", master.host, master.port]) + print(f"- {replica.port} replicating {master.port}: {response}") print() print(f"Getting IDs...") @@ -221,9 +226,9 @@ def config_single_remote(args): ) master = Master(args.target_host, args.target_port) - master.node.update_id() + master.update_id() - test = send_command(master.node, ["get", "x"], print_errors=False) + test = send_command(master, ["get", "x"], print_errors=False) if type(test) is not Exception: die_with_err("Node either not found or already configured") @@ -239,7 +244,9 @@ def build_config_from_existing(args): def build_node(node_list): d = list_to_dict(node_list) - return {"id": d["id"], "ip": d["endpoint"], "port": d["port"]} + node = Node(d["endpoint"], d["port"]) + node.id = d["id"] + return node def build_slots(slot_list): slots = [] @@ -267,7 +274,7 @@ def build_config_from_existing(args): def find_master(config, host, port, die_if_not_found=True): new_owner = None for shard in config: - if shard["master"]["ip"] == host and shard["master"]["port"] == port: + if shard["master"].host == host and shard["master"].port == port: new_owner = shard break @@ -278,10 +285,10 @@ def find_master(config, host, port, die_if_not_found=True): def find_replica(config, host, port): - for master in config: - for replica in master["replicas"]: - if replica["ip"] == host and replica["port"] == port: - return replica, master + for shard in config: + for replica in shard["replicas"]: + if replica.host == host and replica.port == port: + return replica, shard die_with_err("Can't find target node") @@ -299,20 +306,19 @@ def attach(args): die_with_err("Node is not a replica of target") newcomer.update_id() - newcomer_node = build_node(newcomer) config = build_config_from_existing(args) master_node = find_master(config, args.target_host, args.target_port) - master_node["replicas"].append(newcomer_node) + master_node["replicas"].append(newcomer) print(f"Pushing config:\n{config}\n") push_config(config) else: newcomer = Master(args.attach_host, args.attach_port) - replica_resp = send_command(newcomer.node, ["info", "replication"]) + replica_resp = send_command(newcomer, ["info", "replication"]) if replica_resp["role"] != "master": die_with_err("Node is not in master mode") - newcomer.node.update_id() + newcomer.update_id() newcomer_config = build_config_from_list([newcomer]) newcomer_config[0]["slot_ranges"] = [] @@ -361,13 +367,13 @@ def move(args): config = build_config_from_existing(args) new_owner = find_master(config, args.target_host, args.target_port) - def remove_slot(slot, from_range: SlotRange, from_shard): - left, right = from_range.split(slot) + def remove_slot(slot_id, from_range: SlotRange, slot_ranges: list): + slot_ranges.remove(from_range) + left, right = from_range.remove(slot_id) if left: - from_shard["slot_ranges"].append(left) + slot_ranges.append(left) if right: - from_shard["slot_ranges"].append(right) - from_shard["slot_ranges"].remove(from_range) + slot_ranges.append(right) def add_slot(slot, to_shard): slot_range = SlotRange(slot, slot) @@ -378,8 +384,6 @@ def move(args): def find_slot(slot, config): for shard in config: - if shard == new_owner: - continue for slot_range in shard["slot_ranges"]: if slot_range.contains(slot): return shard, slot_range @@ -398,7 +402,7 @@ def move(args): shard, slot_range = find_slot(slot, config) if shard == None or shard == new_owner: continue - remove_slot(slot, slot_range, shard) + remove_slot(slot, slot_range, shard["slot_ranges"]) add_slot(slot, new_owner) for shard in config: @@ -412,7 +416,7 @@ def move(args): def migrate(args): config = build_config_from_existing(args) target = find_master(config, args.target_host, args.target_port) - target_node = Node(target["master"]["ip"], target["master"]["port"]) + target_node = target["master"] target_node.update_id() # Find source node @@ -454,6 +458,25 @@ def migrate(args): move(args) +def populate(args): + config = build_config_from_existing(args) + for shard in config: + master = shard["master"] + slot_ranges = shard["slot_ranges"] + for slot_range in slot_ranges: + cmd = [ + "debug", + "populate", + str(args.size), + "key", + str(args.valsize), + "SLOTS", + str(slot_range.start), + str(slot_range.end), + ] + send_command(master, cmd) + + def print_config(args): config = build_config_from_existing(args) print(json.dumps(config, indent=2, cls=ClusterConfigEncoder)) @@ -569,6 +592,13 @@ WARNING: Be careful! This will close all Dragonfly servers connected to the clus parser.add_argument( "--dragonfly_bin", default="../build-opt/dragonfly", help="Dragonfly binary path" ) + parser.add_argument( + "--size", type=int, default=1000000, help="Number of keys to populate in each slotrange" + ) + parser.add_argument( + "--valsize", type=int, default=16, help="Value size for each key during population" + ) + args = parser.parse_args() actions = dict( @@ -584,6 +614,7 @@ WARNING: Be careful! This will close all Dragonfly servers connected to the clus move, print_config, migrate, + populate, ] ] )