mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat: allow finish and start migration in the same config (#4486)
This commit is contained in:
parent
91435bc6af
commit
010166525e
2 changed files with 73 additions and 8 deletions
|
@ -896,11 +896,12 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
|
||||||
if (auto err = parser.Error(); err)
|
if (auto err = parser.Error(); err)
|
||||||
return builder->SendError(err->MakeReply());
|
return builder->SendError(err->MakeReply());
|
||||||
|
|
||||||
|
SlotRanges slot_ranges(std::move(slots));
|
||||||
|
|
||||||
const auto& incoming_migrations = cluster_config()->GetIncomingMigrations();
|
const auto& incoming_migrations = cluster_config()->GetIncomingMigrations();
|
||||||
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
|
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
|
||||||
[source_id = source_id](const MigrationInfo& info) {
|
[&source_id, &slot_ranges](const MigrationInfo& info) {
|
||||||
// TODO: also compare slot ranges (in an order-agnostic way)
|
return info.node_info.id == source_id && info.slot_ranges == slot_ranges;
|
||||||
return info.node_info.id == source_id;
|
|
||||||
});
|
});
|
||||||
if (!found) {
|
if (!found) {
|
||||||
VLOG(1) << "Unrecognized incoming migration from " << source_id;
|
VLOG(1) << "Unrecognized incoming migration from " << source_id;
|
||||||
|
@ -914,7 +915,7 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
|
||||||
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
|
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
|
||||||
|
|
||||||
incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
|
incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
|
||||||
string(source_id), &server_family_->service(), SlotRanges(std::move(slots)), flows_num));
|
string(source_id), &server_family_->service(), std::move(slot_ranges), flows_num));
|
||||||
|
|
||||||
return builder->SendOk();
|
return builder->SendOk();
|
||||||
}
|
}
|
||||||
|
@ -966,16 +967,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
|
||||||
bool is_migration_valid = false;
|
bool is_migration_valid = false;
|
||||||
if (is_incoming) {
|
if (is_incoming) {
|
||||||
for (const auto& mj : incoming_migrations_jobs_) {
|
for (const auto& mj : incoming_migrations_jobs_) {
|
||||||
if (mj->GetSourceID() == node_id) {
|
if (mj->GetSourceID() == node_id && slots == mj->GetSlots()) {
|
||||||
// TODO add compare for slots
|
|
||||||
is_migration_valid = true;
|
is_migration_valid = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (const auto& mj : outgoing_migration_jobs_) {
|
for (const auto& mj : outgoing_migration_jobs_) {
|
||||||
if (mj->GetMigrationInfo().node_info.id == node_id) {
|
if (mj->GetMigrationInfo().node_info.id == node_id &&
|
||||||
// TODO add compare for slots
|
mj->GetMigrationInfo().slot_ranges == slots) {
|
||||||
is_migration_valid = true;
|
is_migration_valid = true;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2140,6 +2140,69 @@ async def test_cluster_migration_while_seeding(
|
||||||
assert extract_int_after_prefix("buckets on_db_update ", line) > 0
|
assert extract_int_after_prefix("buckets on_db_update ", line) > 0
|
||||||
|
|
||||||
|
|
||||||
|
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cluster_migrations_sequence(
|
||||||
|
df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory
|
||||||
|
):
|
||||||
|
instances = [
|
||||||
|
df_factory.create(port=next(next_port), admin_port=next(next_port)) for _ in range(2)
|
||||||
|
]
|
||||||
|
df_factory.start_all(instances)
|
||||||
|
|
||||||
|
nodes = [await create_node_info(instance) for instance in instances]
|
||||||
|
nodes[0].slots = [(0, 16383)]
|
||||||
|
nodes[1].slots = []
|
||||||
|
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
logging.debug("Seeding cluster")
|
||||||
|
seeder = df_seeder_factory.create(
|
||||||
|
keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True
|
||||||
|
)
|
||||||
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
|
seed = asyncio.create_task(seeder.run())
|
||||||
|
await asyncio.sleep(1)
|
||||||
|
|
||||||
|
slot_step = 500
|
||||||
|
nodes[0].migrations = [
|
||||||
|
MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, slot_step - 1)], nodes[1].id)
|
||||||
|
]
|
||||||
|
logging.debug("Migrating slots")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
for i in range(slot_step, 16301, slot_step):
|
||||||
|
logging.debug("Waiting for migration to finish")
|
||||||
|
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=10)
|
||||||
|
|
||||||
|
nodes[0].slots = [(i, 16383)]
|
||||||
|
nodes[1].slots = [(0, i - 1)]
|
||||||
|
end_slot = min(i + slot_step - 1, 16383)
|
||||||
|
nodes[0].migrations = [
|
||||||
|
MigrationInfo("127.0.0.1", instances[1].admin_port, [(i, end_slot)], nodes[1].id)
|
||||||
|
]
|
||||||
|
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
logging.debug("Waiting for migration to finish")
|
||||||
|
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=10)
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
logging.debug("Finalizing migration")
|
||||||
|
nodes[0].slots = []
|
||||||
|
nodes[1].slots = [(0, 16383)]
|
||||||
|
nodes[0].migrations = []
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
logging.debug("stop seeding")
|
||||||
|
seeder.stop()
|
||||||
|
await seed
|
||||||
|
|
||||||
|
capture = await seeder.capture_fake_redis()
|
||||||
|
assert await seeder.compare(capture, instances[1].port)
|
||||||
|
|
||||||
|
|
||||||
def parse_lag(replication_info: str):
|
def parse_lag(replication_info: str):
|
||||||
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
|
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
|
||||||
assert len(lags) == 1
|
assert len(lags) == 1
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue