From 7860a169d9bcb464797d0b7e741aa894b9ccfb9f Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Sun, 5 Jan 2025 16:28:45 +0200 Subject: [PATCH] feat: Yield inside huge values migration serialization (#4197) * feat: Yield inside huge values migration serialization With #4144 we break huge values slot migration into multiple commands. This PR now adds yield between those commands. It also adds a test that checks that modifying huge values while doing a migration works well, and that RSS doesn't grow too much. Fixes #4100 --- src/server/journal/streamer.cc | 16 ++++-- src/server/journal/streamer.h | 1 + tests/dragonfly/cluster_test.py | 81 ++++++++++++++++++++++++++++-- tests/dragonfly/requirements.txt | 1 + tests/dragonfly/seeder/__init__.py | 10 ++-- tests/dragonfly/seeder_test.py | 21 ++++++++ tests/dragonfly/utility.py | 34 ++++++++++--- 7 files changed, 148 insertions(+), 16 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 91480a181..c2df86b39 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -218,6 +218,12 @@ void RestoreStreamer::Run() { if (fiber_cancelled_) // Could have been cancelled in above call too return; + std::lock_guard guard(big_value_mu_); + + // Locking this never preempts. See snapshot.cc for why we need it. + auto* blocking_counter = db_slice_->BlockingCounter(); + std::lock_guard blocking_counter_guard(*blocking_counter); + WriteBucket(it); }); @@ -281,7 +287,6 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const { void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { if (it.GetVersion() < snapshot_version_) { - FiberAtomicGuard fg; it.SetVersion(snapshot_version_); string key_buffer; // we can reuse it for (; !it.is_done(); ++it) { @@ -302,6 +307,7 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { } void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + std::lock_guard guard(big_value_mu_); DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; PrimeTable* table = db_slice_->GetTables(0).first; @@ -319,8 +325,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms) { - CmdSerializer serializer([&](std::string s) { Write(std::move(s)); }, - ServerState::tlocal()->serialization_max_chunk_size); + CmdSerializer serializer( + [&](std::string s) { + Write(std::move(s)); + ThrottleIfNeeded(); + }, + ServerState::tlocal()->serialization_max_chunk_size); serializer.SerializeEntry(key, pk, pv, expire_ms); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index a18615a05..907b6e65e 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -112,6 +112,7 @@ class RestoreStreamer : public JournalStreamer { cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; bool snapshot_finished_ = false; + ThreadLocalMutex big_value_mu_; }; } // namespace dfly diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index b90d48665..a7262aa5d 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -14,8 +14,7 @@ from .replication_test import check_all_replicas_finished from redis.cluster import RedisCluster from redis.cluster import ClusterNode from .proxy import Proxy -from .seeder import SeederBase -from .seeder import StaticSeeder +from .seeder import Seeder, SeederBase, StaticSeeder from . import dfly_args @@ -33,6 +32,11 @@ def monotonically_increasing_port_number(): next_port = monotonically_increasing_port_number() +async def get_memory(client, field): + info = await client.info("memory") + return info[field] + + class RedisClusterNode: def __init__(self, port): self.port = port @@ -1981,6 +1985,7 @@ async def test_cluster_migration_cancel(df_factory: DflyInstanceFactory): @dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) @pytest.mark.asyncio +@pytest.mark.opt_only async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory): instances = [ df_factory.create(port=next(next_port), admin_port=next(next_port)) for i in range(2) @@ -1995,7 +2000,7 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) logging.debug("Generating huge containers") seeder = StaticSeeder( - key_target=10, + key_target=100, data_size=10_000_000, collection_size=10_000, variance=1, @@ -2005,6 +2010,8 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) await seeder.run(nodes[0].client) source_data = await StaticSeeder.capture(nodes[0].client) + mem_before = await get_memory(nodes[0].client, "used_memory_rss") + nodes[0].migrations = [ MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) ] @@ -2017,6 +2024,74 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory) target_data = await StaticSeeder.capture(nodes[1].client) assert source_data == target_data + # Get peak memory, because migration removes the data + mem_after = await get_memory(nodes[0].client, "used_memory_peak_rss") + logging.debug(f"Memory before {mem_before} after {mem_after}") + assert mem_after < mem_before * 1.1 + + +@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"}) +@pytest.mark.parametrize("chunk_size", [1_000_000, 30]) +@pytest.mark.asyncio +async def test_cluster_migration_while_seeding( + df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory, chunk_size +): + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + serialization_max_chunk_size=chunk_size, + ) + for _ in range(2) + ] + df_factory.start_all(instances) + + nodes = [await create_node_info(instance) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + client0 = nodes[0].client + client1 = nodes[1].client + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Seeding cluster") + seeder = df_seeder_factory.create( + keys=10_000, port=instances[0].port, cluster_mode=True, mirror_to_fake_redis=True + ) + await seeder.run(target_deviation=0.1) + + seed = asyncio.create_task(seeder.run()) + await asyncio.sleep(1) + + nodes[0].migrations = [ + MigrationInfo("127.0.0.1", instances[1].admin_port, [(0, 16383)], nodes[1].id) + ] + logging.debug("Migrating slots") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("Waiting for migration to finish") + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=300) + logging.debug("Migration finished") + + logging.debug("Finalizing migration") + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await asyncio.sleep(1) # Let seeder feed dest before migration finishes + + seeder.stop() + await seed + logging.debug("Seeding finished") + + assert ( + await get_memory(client0, "used_memory_peak_rss") + < await get_memory(client0, "used_memory_rss") * 1.1 + ) + + capture = await seeder.capture_fake_redis() + assert await seeder.compare(capture, instances[1].port) + def parse_lag(replication_info: str): lags = re.findall("lag=([0-9]+)\r\n", replication_info) diff --git a/tests/dragonfly/requirements.txt b/tests/dragonfly/requirements.txt index cfbbd8262..25fb8f69e 100644 --- a/tests/dragonfly/requirements.txt +++ b/tests/dragonfly/requirements.txt @@ -25,3 +25,4 @@ pytest-emoji==0.2.0 pytest-icdiff==0.8 pytest-timeout==2.2.0 asyncio==3.4.3 +fakeredis[json]==2.26.2 diff --git a/tests/dragonfly/seeder/__init__.py b/tests/dragonfly/seeder/__init__.py index 4154e4d1c..351fc38a8 100644 --- a/tests/dragonfly/seeder/__init__.py +++ b/tests/dragonfly/seeder/__init__.py @@ -177,14 +177,16 @@ class Seeder(SeederBase): ] sha = await client.script_load(Seeder._load_script("generate")) - await asyncio.gather( - *(self._run_unit(client, sha, unit, using_stopkey, args) for unit in self.units) - ) + for unit in self.units: + # Must be serial, otherwise cluster clients throws an exception + await self._run_unit(client, sha, unit, using_stopkey, args) async def stop(self, client: aioredis.Redis): """Request seeder seeder if it's running without a target, future returned from start() must still be awaited""" - await asyncio.gather(*(client.set(unit.stop_key, "X") for unit in self.units)) + for unit in self.units: + # Must be serial, otherwise cluster clients throws an exception + await client.set(unit.stop_key, "X") def change_key_target(self, target: int): """Change key target, applied only on succeeding runs""" diff --git a/tests/dragonfly/seeder_test.py b/tests/dragonfly/seeder_test.py index 61557270f..3d35242da 100644 --- a/tests/dragonfly/seeder_test.py +++ b/tests/dragonfly/seeder_test.py @@ -4,6 +4,8 @@ import string from redis import asyncio as aioredis from . import dfly_args from .seeder import Seeder, StaticSeeder +from .instance import DflyInstanceFactory, DflyInstance +from .utility import * @dfly_args({"proactor_threads": 4}) @@ -114,3 +116,22 @@ async def test_seeder_capture(async_client: aioredis.Redis): # Do another change await async_client.spop("set1") assert capture != await Seeder.capture(async_client) + + +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 2}) +async def test_seeder_fake_redis( + df_factory: DflyInstanceFactory, df_seeder_factory: DflySeederFactory +): + instance = df_factory.create() + df_factory.start_all([instance]) + + seeder = df_seeder_factory.create( + keys=100, port=instance.port, unsupported_types=[ValueType.JSON], mirror_to_fake_redis=True + ) + + await seeder.run(target_ops=5_000) + + capture = await seeder.capture_fake_redis() + + assert await seeder.compare(capture, instance.port) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 197d2a3d0..40f8ce1dd 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -14,6 +14,7 @@ import json import subprocess import pytest import os +import fakeredis from typing import Iterable, Union from enum import Enum @@ -271,7 +272,7 @@ class CommandGenerator: ("LPUSH {k} {val}", ValueType.LIST), ("LPOP {k}", ValueType.LIST), ("SADD {k} {val}", ValueType.SET), - ("SPOP {k}", ValueType.SET), + # ("SPOP {k}", ValueType.SET), # Disabled because it is inconsistent ("HSETNX {k} v0 {val}", ValueType.HSET), ("HINCRBY {k} v1 1", ValueType.HSET), ("ZPOPMIN {k} 1", ValueType.ZSET), @@ -423,6 +424,7 @@ class DflySeeder: unsupported_types=[], stop_on_failure=True, cluster_mode=False, + mirror_to_fake_redis=False, ): if cluster_mode: max_multikey = 1 @@ -436,11 +438,16 @@ class DflySeeder: self.multi_transaction_probability = multi_transaction_probability self.stop_flag = False self.stop_on_failure = stop_on_failure + self.fake_redis = None self.log_file = log_file if self.log_file is not None: open(self.log_file, "w").close() + if mirror_to_fake_redis: + logging.debug("Creating FakeRedis instance") + self.fake_redis = fakeredis.FakeAsyncRedis() + async def run(self, target_ops=None, target_deviation=None): """ Run a seeding cycle on all dbs either until stop(), a fixed number of commands (target_ops) @@ -474,6 +481,14 @@ class DflySeeder: """Reset internal state. Needs to be called after flush or restart""" self.gen.reset() + async def capture_fake_redis(self): + keys = sorted(list(self.gen.keys_and_types())) + # TODO: support multiple databases + assert self.dbcount == 1 + assert self.fake_redis != None + capture = DataCapture(await self._capture_entries(self.fake_redis, keys)) + return [capture] + async def capture(self, port=None): """Create DataCapture for all dbs""" @@ -588,12 +603,19 @@ class DflySeeder: queue.task_done() break - pipe = client.pipeline(transaction=tx_data[1]) - for cmd in tx_data[0]: - pipe.execute_command(*cmd) - try: - await pipe.execute() + if self.fake_redis is None: + pipe = client.pipeline(transaction=tx_data[1]) + for cmd in tx_data[0]: + pipe.execute_command(*cmd) + await pipe.execute() + else: + # To mirror consistently to Fake Redis we must only send to it successful + # commands. We can't use pipes because they might succeed partially. + for cmd in tx_data[0]: + dfly_resp = await client.execute_command(*cmd) + fake_resp = await self.fake_redis.execute_command(*cmd) + assert dfly_resp == fake_resp except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: if self.stop_on_failure: await self._close_client(client)