Start slot migration (#2218)

* feat: add new command START-SLOT-MIGRATION
This commit is contained in:
Borys 2023-11-29 13:38:13 +02:00 committed by GitHub
parent 43431d1986
commit bfb1b3b624
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 263 additions and 7 deletions

View file

@ -43,9 +43,16 @@ template <typename T> T CmdArgParser::Num(size_t idx) {
} else if constexpr (std::is_same_v<T, double>) {
if (absl::SimpleAtod(arg, &out))
return out;
} else if constexpr (std::is_integral_v<T>) {
} else if constexpr (std::is_integral_v<T> && sizeof(T) >= sizeof(int32_t)) {
if (absl::SimpleAtoi(arg, &out))
return out;
} else if constexpr (std::is_integral_v<T> && sizeof(T) < sizeof(int32_t)) {
int32_t tmp;
if (absl::SimpleAtoi(arg, &tmp)) {
out = tmp; // out can not store the whole tmp
if (tmp == out)
return out;
}
}
Report(INVALID_INT, idx);
@ -58,6 +65,8 @@ template uint64_t CmdArgParser::Num<uint64_t>(size_t);
template int64_t CmdArgParser::Num<int64_t>(size_t);
template uint32_t CmdArgParser::Num<uint32_t>(size_t);
template int32_t CmdArgParser::Num<int32_t>(size_t);
template uint16_t CmdArgParser::Num<uint16_t>(size_t);
template int16_t CmdArgParser::Num<int16_t>(size_t);
ErrorReply CmdArgParser::ErrorInfo::MakeReply() const {
switch (type) {

View file

@ -39,7 +39,8 @@ add_library(dragonfly_lib engine_shard_set.cc channel_store.cc command_registry.
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc
cluster/cluster_family.cc cluster/cluster_slot_migration.cc
acl/user.cc acl/user_registry.cc acl/acl_family.cc
acl/validator.cc acl/helpers.cc)
cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float)

View file

@ -12,9 +12,11 @@
#include "base/flags.h"
#include "base/logging.h"
#include "core/json_object.h"
#include "facade/cmd_arg_parser.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/acl/acl_commands_def.h"
#include "server/cluster/cluster_slot_migration.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/dflycmd.h"
@ -392,6 +394,8 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return DflyClusterMyId(args, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
} else if (sub_cmd == "START-SLOT-MIGRATION") {
return DflyClusterStartSlotMigration(args, cntx);
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
@ -589,6 +593,73 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
return rb->SendOk();
}
void ClusterFamily::DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx) {
SinkReplyBuilder* rb = cntx->reply_builder();
args.remove_prefix(1); // Removes "START-SLOT-MIGRATION" subcmd string
CmdArgParser parser(args);
auto [host_ip, port] = parser.Next<std::string_view, uint16_t>();
std::vector<SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return (*cntx)->SendError(err->MakeReply());
ClusterSlotMigration node(std::string(host_ip), port, slots);
node.Start(cntx);
return rb->SendOk();
}
void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
args.remove_prefix(1);
if (sub_cmd == "CONF") {
MigrationConf(args, cntx);
} else {
(*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType);
}
}
void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
CmdArgParser parser{args};
auto port = parser.Next<uint16_t>();
(void)port; // we need it for the next step
std::vector<ClusterConfig::SlotRange> slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());
if (!tl_cluster_config) {
(*cntx)->SendError(kClusterNotConfigured);
return;
}
for (const auto& migration_range : slots) {
for (auto i = migration_range.start; i <= migration_range.end; ++i) {
if (!tl_cluster_config->IsMySlot(i)) {
VLOG(1) << "Invalid migration slot " << i << " in range " << migration_range.start << ':'
<< migration_range.end;
(*cntx)->SendError("Invalid slots range");
return;
}
}
}
cntx->conn()->SetName("slot_migration_ctrl");
(*cntx)->SendLong(shard_set->size());
return;
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
@ -603,6 +674,7 @@ constexpr uint32_t kCluster = SLOW;
constexpr uint32_t kDflyCluster = ADMIN | SLOW;
constexpr uint32_t kReadOnly = FAST | CONNECTION;
constexpr uint32_t kReadWrite = FAST | CONNECTION;
constexpr uint32_t kDflyMigrate = ADMIN | SLOW | DANGEROUS;
} // namespace acl
void ClusterFamily::Register(CommandRegistry* registry) {
@ -612,7 +684,9 @@ void ClusterFamily::Register(CommandRegistry* registry) {
acl::kDflyCluster}
.HFUNC(DflyCluster)
<< CI{"READONLY", CO::READONLY, 1, 0, 0, acl::kReadOnly}.HFUNC(ReadOnly)
<< CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite);
<< CI{"READWRITE", CO::READONLY, 1, 0, 0, acl::kReadWrite}.HFUNC(ReadWrite)
<< CI{"DFLYMIGRATE", CO::ADMIN | CO::HIDDEN, -1, 0, 0, acl::kDflyMigrate}.HFUNC(
DflyMigrate);
}
} // namespace dfly

View file

@ -45,6 +45,10 @@ class ClusterFamily {
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx);
void DflyClusterMyId(CmdArgList args, ConnectionContext* cntx);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
void DflyClusterStartSlotMigration(CmdArgList args, ConnectionContext* cntx);
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
void MigrationConf(CmdArgList args, ConnectionContext* cntx);
ClusterConfig::ClusterShard GetEmulatedShardInfo(ConnectionContext* cntx) const;

View file

@ -0,0 +1,80 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/cluster_slot_migration.h"
#include <absl/flags/flag.h>
#include "base/logging.h"
#include "server/error.h"
#include "server/main_service.h"
ABSL_FLAG(int, source_connect_timeout_ms, 20000,
"Timeout for establishing connection to a source node");
ABSL_DECLARE_FLAG(int32_t, port);
namespace dfly {
using namespace std;
using namespace facade;
using absl::GetFlag;
ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots)
: ProtocolClient(move(host_ip), port), slots_(std::move(slots)) {
}
ClusterSlotMigration::~ClusterSlotMigration() {
}
error_code ClusterSlotMigration::Start(ConnectionContext* cntx) {
VLOG(1) << "Starting slot migration";
auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code {
if (ec) {
(*cntx)->SendError(absl::StrCat(msg, ec.message()));
}
return ec;
};
VLOG(1) << "Resolving host DNS";
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));
VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
VLOG(1) << "Greeting";
ec = Greet();
RETURN_ON_ERR(check_connection_error(ec, "couldn't greet source "));
(*cntx)->SendOk();
return {};
}
error_code ClusterSlotMigration::Greet() {
ResetParser(false);
VLOG(1) << "greeting message handling";
RETURN_ON_ERR(SendCommandAndReadResponse("PING"));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG"));
auto port = absl::GetFlag(FLAGS_port);
auto cmd = absl::StrCat("DFLYMIGRATE CONF ", port);
for (const auto& s : slots_) {
absl::StrAppend(&cmd, " ", s.start, " ", s.end);
}
VLOG(1) << "Migration command: " << cmd;
RETURN_ON_ERR(SendCommandAndReadResponse(cmd));
// Response is: num_shards
if (!CheckRespFirstTypes({RespExpr::INT64}))
return make_error_code(errc::bad_message);
souce_shards_num_ = get<int64_t>(LastResponseArgs()[0].u);
return error_code{};
}
} // namespace dfly

View file

@ -0,0 +1,24 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "server/protocol_client.h"
namespace dfly {
class ClusterSlotMigration : ProtocolClient {
public:
ClusterSlotMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
~ClusterSlotMigration();
std::error_code Start(ConnectionContext* cntx);
private:
std::error_code Greet();
std::vector<ClusterConfig::SlotRange> slots_;
size_t souce_shards_num_ = 0;
};
} // namespace dfly

View file

@ -212,7 +212,7 @@ ProtocolClient::~ProtocolClient() {
#endif
}
error_code ProtocolClient::ResolveMasterDns() {
error_code ProtocolClient::ResolveHostDns() {
char ip_addr[INET6_ADDRSTRLEN];
int resolve_res = ResolveDns(server_context_.host, ip_addr);
if (resolve_res != 0) {

View file

@ -4,6 +4,7 @@
#pragma once
#include <absl/container/inlined_vector.h>
#include <absl/strings/escaping.h>
#include <boost/fiber/barrier.hpp>
#include <queue>
@ -63,7 +64,7 @@ class ProtocolClient {
// the DNS resolution step.
explicit ProtocolClient(ServerContext context);
std::error_code ResolveMasterDns(); // Resolve master dns
std::error_code ResolveHostDns();
// Connect to master and authenticate if needed.
std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx);

View file

@ -101,7 +101,7 @@ error_code Replica::Start(ConnectionContext* cntx) {
// 1. Resolve dns.
VLOG(1) << "Resolving master DNS";
error_code ec = ResolveMasterDns();
error_code ec = ResolveHostDns();
RETURN_ON_ERR(check_connection_error(ec, "could not resolve master dns"));
// 2. Connect socket.
@ -179,7 +179,7 @@ void Replica::MainReplicationFb() {
if (is_paused_)
continue;
ec = ResolveMasterDns();
ec = ResolveHostDns();
if (ec) {
LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec;
continue;

View file

@ -691,3 +691,66 @@ async def test_cluster_native_client(df_local_factory: DflyInstanceFactory):
await test_random_keys()
await client.close()
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
# Check slot migration from one node to another
nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
]
df_local_factory.start_all(nodes)
c_nodes = [node.client() for node in nodes]
c_nodes_admin = [node.admin_client() for node in nodes]
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin))
config = f"""
[
{{
"slot_ranges": [
{{
"start": 0,
"end": LAST_SLOT_CUTOFF
}}
],
"master": {{
"id": "{node_ids[0]}",
"ip": "localhost",
"port": {nodes[0].port}
}},
"replicas": []
}},
{{
"slot_ranges": [
{{
"start": NEXT_SLOT_CUTOFF,
"end": 16383
}}
],
"master": {{
"id": "{node_ids[1]}",
"ip": "localhost",
"port": {nodes[1].port}
}},
"replicas": []
}}
]
"""
await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes_admin,
)
res = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "5200", "5259"
)
assert "OK" == res
await c_nodes_admin[0].close()
await c_nodes_admin[1].close()