diff --git a/src/facade/reply_capture.cc b/src/facade/reply_capture.cc index 02d00356e..363879312 100644 --- a/src/facade/reply_capture.cc +++ b/src/facade/reply_capture.cc @@ -65,6 +65,7 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() { CHECK(stack_.empty()); Payload pl = std::move(current_); current_ = monostate{}; + ConsumeLastError(); return pl; } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 29089a5be..8323fc4e5 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -717,6 +717,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { PrimeTable* table = GetTables(db_index).first; auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) { + it.AdvanceIfNotOccupied(); while (!it.is_done()) { del_entry_cb(it); ++it; @@ -724,7 +725,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) { }; if (const PrimeTable::bucket_iterator* bit = req.update()) { - if (bit->GetVersion() < next_version) { + if (!bit->is_done() && bit->GetVersion() < next_version) { iterate_bucket(db_index, *bit); } } else { diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 9dd2d9472..6cfacaa80 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -297,7 +297,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { bool written = false; - if (it.GetVersion() < snapshot_version_) { + if (!it.is_done() && it.GetVersion() < snapshot_version_) { stats_.buckets_written++; it.SetVersion(snapshot_version_); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index c6c64261a..a7d8812c5 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -390,7 +390,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) const PrimeTable::bucket_iterator* bit = req.update(); if (bit) { - if (bit->GetVersion() < snapshot_version_) { + if (!bit->is_done() && bit->GetVersion() < snapshot_version_) { stats_.side_saved += SerializeBucket(db_index, *bit); } } else { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d2be391c6..a207d3c2b 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -2763,3 +2763,101 @@ async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_s dbsize_node2 = await nodes[2].client.dbsize() assert dbsize_node1 + dbsize_node2 == dbsize_node0 assert dbsize_node2 > 0 and dbsize_node1 > 0 + + +""" +Test cluster node distributing its slots into 3 other nodes. +In this test we randomize the slot ranges that are migrated to each node +For each migration we start migration, wait for it to finish and once it is finished we send migration finalization config +""" + + +@pytest.mark.slow +@pytest.mark.exclude_epoll +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seeder_factory): + # 1. Create cluster of 3 nodes with all slots allocated to first node. + instances = [ + df_factory.create( + port=next(next_port), + admin_port=next(next_port), + vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2", + ) + for i in range(4) + ] + df_factory.start_all(instances) + + def create_random_ranges(): + # Generate 2 random breakpoints within the range + breakpoints = sorted(random.sample(range(1, 16382), 2)) + ranges = [ + (0, breakpoints[0] - 1), + (breakpoints[0], breakpoints[1] - 1), + (breakpoints[1], 16383), + ] + return ranges + + # Create 3 random ranges from 0 to 16383 + random_ranges = create_random_ranges() + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = random_ranges + nodes[1].slots = [] + nodes[2].slots = [] + nodes[3].slots = [] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + key_num = 100000 + logging.debug(f"DEBUG POPULATE first node with number of keys: {key_num}") + await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client) + dbsize_node0 = await nodes[0].client.dbsize() + assert dbsize_node0 > (key_num * 0.95) + + logging.debug("start seeding") + # Running seeder with pipeline mode when finalizing migrations leads to errors + # TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem + seeder = df_seeder_factory.create( + keys=50_000, port=instances[0].port, cluster_mode=True, pipeline=False + ) + await seeder.run(target_deviation=0.1) + seed = asyncio.create_task(seeder.run()) + + migration_info = [ + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [random_ranges[0]], nodes[1].id), + MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [random_ranges[1]], nodes[2].id), + MigrationInfo("127.0.0.1", nodes[3].instance.admin_port, [random_ranges[2]], nodes[3].id), + ] + + nodes_lock = asyncio.Lock() + + async def do_migration(index): + await asyncio.sleep(random.randint(1, 10) / 5) + async with nodes_lock: + logging.debug(f"Start migration from node {index}") + nodes[0].migrations.append(migration_info[index - 1]) + await push_config( + json.dumps(generate_config(nodes)), [node.admin_client for node in nodes] + ) + + logging.debug(f"wait migration from node {index}") + await wait_for_status(nodes[0].admin_client, nodes[index].id, "FINISHED", timeout=50) + await wait_for_status(nodes[index].admin_client, nodes[0].id, "FINISHED", timeout=50) + logging.debug(f"finished migration from node {index}") + await asyncio.sleep(random.randint(1, 5) / 5) + async with nodes_lock: + logging.debug(f"Finalize migration from node {index}") + nodes[index].slots = migration_info[index - 1].slots + nodes[0].slots.remove(migration_info[index - 1].slots[0]) + nodes[0].migrations.remove(migration_info[index - 1]) + await push_config( + json.dumps(generate_config(nodes)), [node.admin_client for node in nodes] + ) + + all_migrations = [asyncio.create_task(do_migration(i)) for i in range(1, 4)] + for migration in all_migrations: + await migration + + logging.debug("stop seeding") + seeder.stop() + await seed diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 8855d7bcc..fc661130d 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -425,6 +425,7 @@ class DflySeeder: stop_on_failure=True, cluster_mode=False, mirror_to_fake_redis=False, + pipeline=True, ): if cluster_mode: max_multikey = 1 @@ -439,6 +440,7 @@ class DflySeeder: self.stop_flag = False self.stop_on_failure = stop_on_failure self.fake_redis = None + self.use_pipeline = pipeline self.log_file = log_file if self.log_file is not None: @@ -447,6 +449,7 @@ class DflySeeder: if mirror_to_fake_redis: logging.debug("Creating FakeRedis instance") self.fake_redis = fakeredis.FakeAsyncRedis() + self.use_pipeline = False async def run(self, target_ops=None, target_deviation=None): """ @@ -604,18 +607,19 @@ class DflySeeder: break try: - if self.fake_redis is None: + if self.use_pipeline: pipe = client.pipeline(transaction=tx_data[1]) for cmd in tx_data[0]: pipe.execute_command(*cmd) await pipe.execute() else: - # To mirror consistently to Fake Redis we must only send to it successful - # commands. We can't use pipes because they might succeed partially. for cmd in tx_data[0]: dfly_resp = await client.execute_command(*cmd) - fake_resp = await self.fake_redis.execute_command(*cmd) - assert dfly_resp == fake_resp + # To mirror consistently to Fake Redis we must only send to it successful + # commands. We can't use pipes because they might succeed partially. + if self.fake_redis is not None: + fake_resp = await self.fake_redis.execute_command(*cmd) + assert dfly_resp == fake_resp except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e: if self.stop_on_failure: await self._close_client(client)