chore: cancel slot migrations on shutdown (#3405)

This commit is contained in:
Vladislav 2024-07-30 12:47:58 +03:00 committed by GitHub
parent e464990643
commit f536f8afbd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 48 additions and 2 deletions

View file

@ -300,6 +300,13 @@ std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(
return new_config;
}
std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithoutMigrations() const {
auto new_config = std::make_shared<ClusterConfig>(*this);
new_config->my_incoming_migrations_.clear();
new_config->my_outgoing_migrations_.clear();
return new_config;
}
bool ClusterConfig::IsMySlot(SlotId id) const {
if (id > cluster::kMaxSlotNum) {
DCHECK(false) << "Requesting a non-existing slot id " << id;

View file

@ -26,6 +26,8 @@ class ClusterConfig {
std::shared_ptr<ClusterConfig> CloneWithChanges(const SlotRanges& enable_slots,
const SlotRanges& disable_slots) const;
std::shared_ptr<ClusterConfig> CloneWithoutMigrations() const;
// If key is in my slots ownership return true
bool IsMySlot(SlotId id) const;
bool IsMySlot(std::string_view key) const;

View file

@ -79,6 +79,21 @@ ClusterConfig* ClusterFamily::cluster_config() {
return tl_cluster_config.get();
}
void ClusterFamily::Shutdown() {
shard_set->pool()->at(0)->Await([this] {
lock_guard lk(set_config_mu);
if (!tl_cluster_config)
return;
auto empty_config = tl_cluster_config->CloneWithoutMigrations();
RemoveOutgoingMigrations(empty_config, tl_cluster_config);
RemoveIncomingMigrations(empty_config->GetFinishedIncomingMigrations(tl_cluster_config));
DCHECK(outgoing_migration_jobs_.empty());
DCHECK(incoming_migrations_jobs_.empty());
});
}
ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const {
ClusterShardInfo info{.slot_ranges = SlotRanges({{.start = 0, .end = kMaxSlotNum}}),
.master = {},

View file

@ -28,6 +28,8 @@ class ClusterFamily {
void Register(CommandRegistry* registry);
void Shutdown();
// Returns a thread-local pointer.
static ClusterConfig* cluster_config();

View file

@ -910,7 +910,9 @@ void Service::Shutdown() {
config_registry.Reset();
// to shutdown all the runtime components that depend on EngineShard.
// to shutdown all the runtime components that depend on EngineShard
cluster_family_.Shutdown();
server_family_.Shutdown();
StringFamily::Shutdown();
GenericFamily::Shutdown();

View file

@ -1117,8 +1117,9 @@ async def test_cluster_flushall_during_migration(
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
@pytest.mark.parametrize("interrupt", [False, True])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
async def test_cluster_data_migration(df_factory: DflyInstanceFactory, interrupt: bool):
# Check data migration from one node to another
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
@ -1145,6 +1146,23 @@ async def test_cluster_data_migration(df_factory: DflyInstanceFactory):
logging.debug("Start migration")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
if interrupt: # Test nodes properly shut down with pending migration
await asyncio.sleep(random.random())
# random instance
stop = random.getrandbits(1)
keep = 1 - stop
nodes[stop].instance.stop()
slots = await nodes[keep].admin_client.execute_command("CLUSTER SLOTS")
slots.sort(key=lambda cfg: cfg[0])
assert 0 in slots[0] and 9000 in slots[0]
assert 9001 in slots[1] and 16383 in slots[1]
await close_clients(*[n.client for n in nodes], *[n.admin_client for n in nodes])
return
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
for i in range(20, 22):