feat(cluster_mgr): add populate command (#4816)

* feat(cluster_mgr): add populate command

We further simplify the code around cluster config
Also - add a command that populates all the cluster ranges in the cluster
using the "populate" command. `--size` and `--valsize` arguments are also added.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: fixes

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-25 10:47:10 +02:00 committed by GitHub
parent a9ecee2ba5
commit 2a3a1567b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -1,9 +1,10 @@
#!/usr/bin/env python3
import argparse
from argparse import RawTextHelpFormatter
import json
import math
from typing import Iterable
from typing import Iterable, List
import redis
import subprocess
import time
@ -28,10 +29,16 @@ class Node:
node.id = send_command(node, ["cluster", "myid"])
print(f"- ID {node.id}")
def __repr__(self):
return f"{self.host}:{self.port}/{self.id}"
class Master:
def to_dict(self):
return {"id": self.id, "ip": self.host, "port": self.port}
class Master(Node):
def __init__(self, host, port):
self.node = Node(host, port)
Node.__init__(self, host, port)
self.replicas = []
@ -89,7 +96,7 @@ class SlotRange:
def __repr__(self):
return f"({self.start}-{self.end})"
def merge(self, other):
def merge(self, other: "SlotRange"):
if self.end + 1 == other.start:
self.end = other.end
return True
@ -101,7 +108,7 @@ class SlotRange:
def contains(self, slot_id):
return self.start <= slot_id <= self.end
def split(self, slot_id):
def remove(self, slot_id):
assert self.contains(slot_id)
if self.start < self.end:
@ -117,16 +124,12 @@ class SlotRange:
# Custom JSON encoder to handle SlotRange objects
class ClusterConfigEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, SlotRange):
if isinstance(obj, SlotRange) or isinstance(obj, Node):
return obj.to_dict()
return super().default(obj)
def build_node(node):
return {"id": node.id, "ip": node.host, "port": node.port}
def build_config_from_list(masters):
def build_config_from_list(masters: List[Master]):
total_slots = 16384
slots_per_node = math.floor(total_slots / len(masters))
@ -135,8 +138,8 @@ def build_config_from_list(masters):
slot_range = SlotRange(i * slots_per_node, (i + 1) * slots_per_node - 1)
c = {
"slot_ranges": [slot_range],
"master": build_node(master.node),
"replicas": [build_node(replica) for replica in master.replicas],
"master": master,
"replicas": master.replicas,
}
config.append(c)
@ -148,9 +151,10 @@ def build_config_from_list(masters):
def get_nodes_from_config(config):
nodes = []
for shard in config:
nodes.append(Node(shard["master"]["ip"], shard["master"]["port"]))
nodes.append(shard["master"])
for replica in shard["replicas"]:
nodes.append(Node(replica["ip"], replica["port"]))
nodes.append(replica)
for node in nodes:
node.update_id()
return nodes
@ -187,7 +191,7 @@ def create_locally(args):
nodes = []
for master in masters:
nodes.append(master.node)
nodes.append(master)
for replica in master.replicas:
nodes.append(replica)
@ -195,13 +199,14 @@ def create_locally(args):
for node in nodes:
start_node(node, args.dragonfly_bin, args.threads)
print()
time.sleep(0.5)
if args.replicas_per_master > 0:
print("Configuring replication...")
for master in masters:
for replica in master.replicas:
response = send_command(replica, ["replicaof", master.node.host, master.node.port])
print(f"- {replica.port} replicating {master.node.port}: {response}")
response = send_command(replica, ["replicaof", master.host, master.port])
print(f"- {replica.port} replicating {master.port}: {response}")
print()
print(f"Getting IDs...")
@ -221,9 +226,9 @@ def config_single_remote(args):
)
master = Master(args.target_host, args.target_port)
master.node.update_id()
master.update_id()
test = send_command(master.node, ["get", "x"], print_errors=False)
test = send_command(master, ["get", "x"], print_errors=False)
if type(test) is not Exception:
die_with_err("Node either not found or already configured")
@ -239,7 +244,9 @@ def build_config_from_existing(args):
def build_node(node_list):
d = list_to_dict(node_list)
return {"id": d["id"], "ip": d["endpoint"], "port": d["port"]}
node = Node(d["endpoint"], d["port"])
node.id = d["id"]
return node
def build_slots(slot_list):
slots = []
@ -267,7 +274,7 @@ def build_config_from_existing(args):
def find_master(config, host, port, die_if_not_found=True):
new_owner = None
for shard in config:
if shard["master"]["ip"] == host and shard["master"]["port"] == port:
if shard["master"].host == host and shard["master"].port == port:
new_owner = shard
break
@ -278,10 +285,10 @@ def find_master(config, host, port, die_if_not_found=True):
def find_replica(config, host, port):
for master in config:
for replica in master["replicas"]:
if replica["ip"] == host and replica["port"] == port:
return replica, master
for shard in config:
for replica in shard["replicas"]:
if replica.host == host and replica.port == port:
return replica, shard
die_with_err("Can't find target node")
@ -299,20 +306,19 @@ def attach(args):
die_with_err("Node is not a replica of target")
newcomer.update_id()
newcomer_node = build_node(newcomer)
config = build_config_from_existing(args)
master_node = find_master(config, args.target_host, args.target_port)
master_node["replicas"].append(newcomer_node)
master_node["replicas"].append(newcomer)
print(f"Pushing config:\n{config}\n")
push_config(config)
else:
newcomer = Master(args.attach_host, args.attach_port)
replica_resp = send_command(newcomer.node, ["info", "replication"])
replica_resp = send_command(newcomer, ["info", "replication"])
if replica_resp["role"] != "master":
die_with_err("Node is not in master mode")
newcomer.node.update_id()
newcomer.update_id()
newcomer_config = build_config_from_list([newcomer])
newcomer_config[0]["slot_ranges"] = []
@ -361,13 +367,13 @@ def move(args):
config = build_config_from_existing(args)
new_owner = find_master(config, args.target_host, args.target_port)
def remove_slot(slot, from_range: SlotRange, from_shard):
left, right = from_range.split(slot)
def remove_slot(slot_id, from_range: SlotRange, slot_ranges: list):
slot_ranges.remove(from_range)
left, right = from_range.remove(slot_id)
if left:
from_shard["slot_ranges"].append(left)
slot_ranges.append(left)
if right:
from_shard["slot_ranges"].append(right)
from_shard["slot_ranges"].remove(from_range)
slot_ranges.append(right)
def add_slot(slot, to_shard):
slot_range = SlotRange(slot, slot)
@ -378,8 +384,6 @@ def move(args):
def find_slot(slot, config):
for shard in config:
if shard == new_owner:
continue
for slot_range in shard["slot_ranges"]:
if slot_range.contains(slot):
return shard, slot_range
@ -398,7 +402,7 @@ def move(args):
shard, slot_range = find_slot(slot, config)
if shard == None or shard == new_owner:
continue
remove_slot(slot, slot_range, shard)
remove_slot(slot, slot_range, shard["slot_ranges"])
add_slot(slot, new_owner)
for shard in config:
@ -412,7 +416,7 @@ def move(args):
def migrate(args):
config = build_config_from_existing(args)
target = find_master(config, args.target_host, args.target_port)
target_node = Node(target["master"]["ip"], target["master"]["port"])
target_node = target["master"]
target_node.update_id()
# Find source node
@ -454,6 +458,25 @@ def migrate(args):
move(args)
def populate(args):
config = build_config_from_existing(args)
for shard in config:
master = shard["master"]
slot_ranges = shard["slot_ranges"]
for slot_range in slot_ranges:
cmd = [
"debug",
"populate",
str(args.size),
"key",
str(args.valsize),
"SLOTS",
str(slot_range.start),
str(slot_range.end),
]
send_command(master, cmd)
def print_config(args):
config = build_config_from_existing(args)
print(json.dumps(config, indent=2, cls=ClusterConfigEncoder))
@ -569,6 +592,13 @@ WARNING: Be careful! This will close all Dragonfly servers connected to the clus
parser.add_argument(
"--dragonfly_bin", default="../build-opt/dragonfly", help="Dragonfly binary path"
)
parser.add_argument(
"--size", type=int, default=1000000, help="Number of keys to populate in each slotrange"
)
parser.add_argument(
"--valsize", type=int, default=16, help="Value size for each key during population"
)
args = parser.parse_args()
actions = dict(
@ -584,6 +614,7 @@ WARNING: Be careful! This will close all Dragonfly servers connected to the clus
move,
print_config,
migrate,
populate,
]
]
)