feat: add ability reaply config with migration #2924 (#2926)

* feat: add ability reaply config with migration #2924
This commit is contained in:
Borys 2024-04-19 16:21:54 +03:00 committed by GitHub
parent 7666aae6dc
commit 9a6a9ec198
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 115 additions and 39 deletions

View file

@ -338,10 +338,11 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
return CreateFromConfig(my_id, config.value());
}
std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(const std::vector<SlotRange>& slots,
bool enable) const {
std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(
const SlotRanges& enable_slots, const SlotRanges& disable_slots) const {
auto new_config = std::make_shared<ClusterConfig>(*this);
new_config->my_slots_.Set(slots, enable);
new_config->my_slots_.Set(enable_slots, true);
new_config->my_slots_.Set(disable_slots, false);
return new_config;
}

View file

@ -77,8 +77,8 @@ class ClusterConfig {
static std::shared_ptr<ClusterConfig> CreateFromConfig(std::string_view my_id,
std::string_view json_config);
std::shared_ptr<ClusterConfig> CloneWithChanges(const std::vector<SlotRange>& slots,
bool enable) const;
std::shared_ptr<ClusterConfig> CloneWithChanges(const SlotRanges& enable_slots,
const SlotRanges& disable_slots) const;
// If key is in my slots ownership return true
bool IsMySlot(SlotId id) const;

View file

@ -494,15 +494,40 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
lock_guard gu(set_config_mu);
lock_guard config_update_lk(
config_update_mu_); // to prevent simultaneous update config from outgoing migration
// TODO we shouldn't provide cntx into StartSlotMigrations
if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) {
return cntx->SendError("Can't start the migration");
}
RemoveOutgoingMigrations(new_config->GetFinishedOutgoingMigrations(tl_cluster_config));
RemoveIncomingMigrations(new_config->GetFinishedIncomingMigrations(tl_cluster_config));
lock_guard config_update_lk(
config_update_mu_); // to prevent simultaneous update config from outgoing migration
SlotRanges enable_slots, disable_slots;
{
std::lock_guard lk(migration_mu_);
// If migration state is changed simultaneously, the changes to config will be applied after
// set_config_mu is unlocked and even if we apply the same changes 2 times it's not a problem
for (const auto& m : incoming_migrations_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
const auto& slots = m->GetSlots();
enable_slots.insert(enable_slots.end(), slots.begin(), slots.end());
}
}
for (const auto& m : outgoing_migration_jobs_) {
if (m->GetState() == MigrationState::C_FINISHED) {
const auto& slots = m->GetSlots();
disable_slots.insert(disable_slots.end(), slots.begin(), slots.end());
}
}
}
new_config = new_config->CloneWithChanges(enable_slots, disable_slots);
// TODO we shouldn't provide cntx into StartSlotMigrations
if (!StartSlotMigrations(new_config->GetNewOutgoingMigrations(tl_cluster_config), cntx)) {
// TODO it shouldn't be an error
return cntx->SendError("Can't start the migration");
}
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
@ -851,7 +876,8 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::UpdateConfig(const std::vector<SlotRange>& slots, bool enable) {
lock_guard gu(config_update_mu_);
auto new_config = tl_cluster_config->CloneWithChanges(slots, enable);
auto new_config = enable ? tl_cluster_config->CloneWithChanges(slots, {})
: tl_cluster_config->CloneWithChanges({}, slots);
shard_set->pool()->AwaitFiberOnAll(
[&new_config](util::ProactorBase* pb) { tl_cluster_config = new_config; });

View file

@ -1135,6 +1135,20 @@ class NodeInfo:
id: str
async def create_node_info(instance):
admin_client = instance.admin_client()
ninfo = NodeInfo(
instance=instance,
client=instance.client(),
admin_client=admin_client,
slots=[],
next_slots=[],
migrations=[],
id=await get_node_id(admin_client),
)
return ninfo
def generate_config(nodes):
return [
{
@ -1185,18 +1199,7 @@ async def test_cluster_fuzzymigration(
]
df_local_factory.start_all(instances)
nodes = [
NodeInfo(
instance=instance,
client=instance.client(),
admin_client=instance.admin_client(),
slots=[],
next_slots=[],
migrations=[],
id=await get_node_id(instance.admin_client()),
)
for instance in instances
]
nodes = [(await create_node_info(instance)) for instance in instances]
# Generate equally sized ranges and distribute by nodes
step = 16400 // segments
@ -1321,7 +1324,7 @@ async def test_cluster_fuzzymigration(
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
async def test_cluster_config_reapply(df_local_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@ -1329,18 +1332,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
]
df_local_factory.start_all(instances)
nodes = [
NodeInfo(
instance=instance,
client=instance.client(),
admin_client=instance.admin_client(),
slots=[],
next_slots=[],
migrations=[],
id=await get_node_id(instance.admin_client()),
)
for instance in instances
]
nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 8000)]
nodes[1].slots = [(8001, 16383)]
@ -1352,7 +1344,64 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes]
nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].port, [(6000, 8000)], nodes[1].id)
MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id)
]
logging.debug("Migrating slots 6000-8000")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
while "FINISHED" not in await nodes[1].admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id
):
logging.debug("SLOT-MIGRATION-STATUS is not FINISHED")
await asyncio.sleep(0.05)
assert await nodes[0].admin_client.dbsize() == SIZE
assert await nodes[1].admin_client.dbsize() == SIZE
logging.debug("Reapply config with migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await asyncio.sleep(0.1)
assert await nodes[0].admin_client.dbsize() == SIZE
assert await nodes[1].admin_client.dbsize() == SIZE
logging.debug("Finalizing migration")
nodes[0].migrations = []
nodes[0].slots = [(0, 6000)]
nodes[1].slots = [(6001, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("Migration finalized")
await asyncio.sleep(0.1)
assert [0, SIZE] == [await node.admin_client.dbsize() for node in nodes]
for i in range(SIZE):
assert str(i) == await nodes[1].admin_client.get(f"{{key50}}:{i}")
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
"""Check data migration from one node to another."""
instances = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
for i in range(2)
]
df_local_factory.start_all(instances)
nodes = [await create_node_info(instance) for instance in instances]
nodes[0].slots = [(0, 8000)]
nodes[1].slots = [(8001, 16383)]
logging.debug("Pushing data to slot 6XXX")
SIZE = 10_000
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
for i in range(SIZE):
assert await nodes[0].admin_client.set(f"{{key50}}:{i}", i) # key50 belongs to slot 6686
assert [SIZE, 0] == [await node.admin_client.dbsize() for node in nodes]
nodes[0].migrations = [
MigrationInfo("127.0.0.1", instances[1].admin_port, [(6000, 8000)], nodes[1].id)
]
logging.debug("Migrating slots 6000-8000")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
@ -1373,7 +1422,7 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory):
logging.debug("Reissuing migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", instances[1].port, [(6001, 8000)], nodes[1].id)
MigrationInfo("127.0.0.1", instances[1].admin_port, [(6001, 8000)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await asyncio.sleep(0.1)