From bfb1b3b624e45c8b9ace34b499588eb6c4b20178 Mon Sep 17 00:00:00 2001 From: Borys Date: Wed, 29 Nov 2023 13:38:13 +0200 Subject: [PATCH] Start slot migration (#2218) * feat: add new command START-SLOT-MIGRATION --- src/facade/cmd_arg_parser.cc | 11 ++- src/server/CMakeLists.txt | 3 +- src/server/cluster/cluster_family.cc | 76 ++++++++++++++++++- src/server/cluster/cluster_family.h | 4 + src/server/cluster/cluster_slot_migration.cc | 80 ++++++++++++++++++++ src/server/cluster/cluster_slot_migration.h | 24 ++++++ src/server/protocol_client.cc | 2 +- src/server/protocol_client.h | 3 +- src/server/replica.cc | 4 +- tests/dragonfly/cluster_test.py | 63 +++++++++++++++ 10 files changed, 263 insertions(+), 7 deletions(-) create mode 100644 src/server/cluster/cluster_slot_migration.cc create mode 100644 src/server/cluster/cluster_slot_migration.h diff --git a/src/facade/cmd_arg_parser.cc b/src/facade/cmd_arg_parser.cc index 01665cf49..22cda2180 100644 --- a/src/facade/cmd_arg_parser.cc +++ b/src/facade/cmd_arg_parser.cc @@ -43,9 +43,16 @@ template T CmdArgParser::Num(size_t idx) { } else if constexpr (std::is_same_v) { if (absl::SimpleAtod(arg, &out)) return out; - } else if constexpr (std::is_integral_v) { + } else if constexpr (std::is_integral_v && sizeof(T) >= sizeof(int32_t)) { if (absl::SimpleAtoi(arg, &out)) return out; + } else if constexpr (std::is_integral_v && sizeof(T) < sizeof(int32_t)) { + int32_t tmp; + if (absl::SimpleAtoi(arg, &tmp)) { + out = tmp; // out can not store the whole tmp + if (tmp == out) + return out; + } } Report(INVALID_INT, idx); @@ -58,6 +65,8 @@ template uint64_t CmdArgParser::Num(size_t); template int64_t CmdArgParser::Num(size_t); template uint32_t CmdArgParser::Num(size_t); template int32_t CmdArgParser::Num(size_t); +template uint16_t CmdArgParser::Num(size_t); +template int16_t CmdArgParser::Num(size_t); ErrorReply CmdArgParser::ErrorInfo::MakeReply() const { switch (type) { diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 9f9a1caaa..01be21810 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -39,7 +39,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc command_registry. set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc - cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc + cluster/cluster_family.cc cluster/cluster_slot_migration.cc + acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 9dcce7987..16093e0ea 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -12,9 +12,11 @@ #include "base/flags.h" #include "base/logging.h" #include "core/json_object.h" +#include "facade/cmd_arg_parser.h" #include "facade/dragonfly_connection.h" #include "facade/error.h" #include "server/acl/acl_commands_def.h" +#include "server/cluster/cluster_slot_migration.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/dflycmd.h" @@ -392,6 +394,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) { return DflyClusterMyId(args, cntx); } else if (sub_cmd == "FLUSHSLOTS") { return DflyClusterFlushSlots(args, cntx); + } else if (sub_cmd == "START-SLOT-MIGRATION") { + return DflyClusterStartSlotMigration(args, cntx); } return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType); @@ -589,6 +593,73 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn return rb->SendOk(); } +void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) { + SinkReplyBuilder* rb = cntx->reply_builder(); + + args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string + + CmdArgParser parser(args); + auto [host_ip, port] = parser.Next(); + std::vector slots; + do { + auto [slot_start, slot_end] = parser.Next(); + slots.emplace_back(SlotRange{slot_start, slot_end}); + } while (parser.HasNext()); + + if (auto err = parser.Error(); err) + return (*cntx)->SendError(err->MakeReply()); + + ClusterSlotMigration node(std::string(host_ip), port, slots); + node.Start(cntx); + + return rb->SendOk(); +} + +void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[0]); + string_view sub_cmd = ArgS(args, 0); + args.remove_prefix(1); + if (sub_cmd == "CONF") { + MigrationConf(args, cntx); + } else { + (*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType); + } +} + +void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { + VLOG(1) << "Create slot migration config"; + CmdArgParser parser{args}; + auto port = parser.Next(); + (void)port; // we need it for the next step + + std::vector slots; + do { + auto [slot_start, slot_end] = parser.Next(); + slots.emplace_back(SlotRange{slot_start, slot_end}); + } while (parser.HasNext()); + + if (!tl_cluster_config) { + (*cntx)->SendError(kClusterNotConfigured); + return; + } + + for (const auto& migration_range : slots) { + for (auto i = migration_range.start; i <= migration_range.end; ++i) { + if (!tl_cluster_config->IsMySlot(i)) { + VLOG(1) << "Invalid migration slot " << i << " in range " << migration_range.start << ':' + << migration_range.end; + (*cntx)->SendError("Invalid slots range"); + return; + } + } + } + + cntx->conn()->SetName("slot_migration_ctrl"); + + (*cntx)->SendLong(shard_set->size()); + return; +} + using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) { @@ -603,6 +674,7 @@ constexpr uint32_t kCluster = SLOW; constexpr uint32_t kDflyCluster = ADMIN | SLOW; constexpr uint32_t kReadOnly = FAST | CONNECTION; constexpr uint32_t kReadWrite = FAST | CONNECTION; +constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS; } // namespace acl void ClusterFamily::Register(CommandRegistry* registry) { @@ -612,7 +684,9 @@ void ClusterFamily::Register(CommandRegistry* registry) { acl::kDflyCluster} .HFUNC(DflyCluster) << CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly) - << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite); + << CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite) + << CI{"DFLYMIGRATE", CO::ADMIN | CO::HIDDEN, -1, 0, 0, acl::kDflyMigrate}.HFUNC( + DflyMigrate); } } // namespace dfly diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 6a60f3bc7..466214d25 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -45,6 +45,10 @@ class ClusterFamily { void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx); void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx); void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx); + void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx); + void DflyMigrate(CmdArgList args, ConnectionContext* cntx); + + void MigrationConf(CmdArgList args, ConnectionContext* cntx); ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const; diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc new file mode 100644 index 000000000..878b85882 --- /dev/null +++ b/src/server/cluster/cluster_slot_migration.cc @@ -0,0 +1,80 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/cluster/cluster_slot_migration.h" + +#include + +#include "base/logging.h" +#include "server/error.h" +#include "server/main_service.h" + +ABSL_FLAG(int, source_connect_timeout_ms, 20000, + "Timeout for establishing connection to a source node"); + +ABSL_DECLARE_FLAG(int32_t, port); + +namespace dfly { + +using namespace std; +using namespace facade; +using absl::GetFlag; + +ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, + std::vector slots) + : ProtocolClient(move(host_ip), port), slots_(std::move(slots)) { +} + +ClusterSlotMigration::~ClusterSlotMigration() { +} + +error_code ClusterSlotMigration::Start(ConnectionContext* cntx) { + VLOG(1) << "Starting slot migration"; + + auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code { + if (ec) { + (*cntx)->SendError(absl::StrCat(msg, ec.message())); + } + return ec; + }; + + VLOG(1) << "Resolving host DNS"; + error_code ec = ResolveHostDns(); + RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns")); + + VLOG(1) << "Connecting to source"; + ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); + RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); + + VLOG(1) << "Greeting"; + ec = Greet(); + RETURN_ON_ERR(check_connection_error(ec, "couldn't greet source ")); + + (*cntx)->SendOk(); + + return {}; +} + +error_code ClusterSlotMigration::Greet() { + ResetParser(false); + VLOG(1) << "greeting message handling"; + RETURN_ON_ERR(SendCommandAndReadResponse("PING")); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); + + auto port = absl::GetFlag(FLAGS_port); + auto cmd = absl::StrCat("DFLYMIGRATE CONF ", port); + for (const auto& s : slots_) { + absl::StrAppend(&cmd, " ", s.start, " ", s.end); + } + VLOG(1) << "Migration command: " << cmd; + RETURN_ON_ERR(SendCommandAndReadResponse(cmd)); + // Response is: num_shards + if (!CheckRespFirstTypes({RespExpr::INT64})) + return make_error_code(errc::bad_message); + + souce_shards_num_ = get(LastResponseArgs()[0].u); + + return error_code{}; +} + +} // namespace dfly diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h new file mode 100644 index 000000000..1dbe1eb64 --- /dev/null +++ b/src/server/cluster/cluster_slot_migration.h @@ -0,0 +1,24 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include "server/protocol_client.h" + +namespace dfly { + +class ClusterSlotMigration : ProtocolClient { + public: + ClusterSlotMigration(std::string host_ip, uint16_t port, + std::vector slots); + ~ClusterSlotMigration(); + + std::error_code Start(ConnectionContext* cntx); + + private: + std::error_code Greet(); + std::vector slots_; + size_t souce_shards_num_ = 0; +}; + +} // namespace dfly diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 2dbf95a87..3ea8fff1f 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -212,7 +212,7 @@ ProtocolClient::~ProtocolClient() { #endif } -error_code ProtocolClient::ResolveMasterDns() { +error_code ProtocolClient::ResolveHostDns() { char ip_addr[INET6_ADDRSTRLEN]; int resolve_res = ResolveDns(server_context_.host, ip_addr); if (resolve_res != 0) { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index e553782e8..f57e3482b 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -4,6 +4,7 @@ #pragma once #include +#include #include #include @@ -63,7 +64,7 @@ class ProtocolClient { // the DNS resolution step. explicit ProtocolClient(ServerContext context); - std::error_code ResolveMasterDns(); // Resolve master dns + std::error_code ResolveHostDns(); // Connect to master and authenticate if needed. std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); diff --git a/src/server/replica.cc b/src/server/replica.cc index 5c504d3cb..f4f629d7c 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -101,7 +101,7 @@ error_code Replica::Start(ConnectionContext* cntx) { // 1. Resolve dns. VLOG(1) << "Resolving master DNS"; - error_code ec = ResolveMasterDns(); + error_code ec = ResolveHostDns(); RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns")); // 2. Connect socket. @@ -179,7 +179,7 @@ void Replica::MainReplicationFb() { if (is_paused_) continue; - ec = ResolveMasterDns(); + ec = ResolveHostDns(); if (ec) { LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec; continue; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 416395d16..65f863583 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -691,3 +691,66 @@ async def test_cluster_native_client(df_local_factory: DflyInstanceFactory): await test_random_keys() await client.close() + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): + # Check slot migration from one node to another + nodes = [ + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + for i in range(2) + ] + + df_local_factory.start_all(nodes) + + c_nodes = [node.client() for node in nodes] + c_nodes_admin = [node.admin_client() for node in nodes] + + node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + + config = f""" + [ + {{ + "slot_ranges": [ + {{ + "start": 0, + "end": LAST_SLOT_CUTOFF + }} + ], + "master": {{ + "id": "{node_ids[0]}", + "ip": "localhost", + "port": {nodes[0].port} + }}, + "replicas": [] + }}, + {{ + "slot_ranges": [ + {{ + "start": NEXT_SLOT_CUTOFF, + "end": 16383 + }} + ], + "master": {{ + "id": "{node_ids[1]}", + "ip": "localhost", + "port": {nodes[1].port} + }}, + "replicas": [] + }} + ] + """ + + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + c_nodes_admin, + ) + + res = await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259" + ) + + assert "OK" == res + + await c_nodes_admin[0].close() + await c_nodes_admin[1].close()