diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index ac6adf198..2495eb8c7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -83,6 +83,8 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { bool written = false; cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) { + db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/, + DbSlice::Iterator::FromPrime(it), snapshot_version_); if (WriteBucket(it)) { written = true; } diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index c25887ff7..19757bf99 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1177,11 +1177,6 @@ async def test_cluster_fuzzymigration( seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True) await seeder.run(target_deviation=0.1) - fill_task = asyncio.create_task(seeder.run()) - - # some time fo seeder - await asyncio.sleep(0.5) - # Counter that pushes values to a list async def list_counter(key, client: aioredis.RedisCluster): for i in itertools.count(start=1): @@ -1197,9 +1192,6 @@ async def test_cluster_fuzzymigration( for key, conn in zip(counter_keys, counter_connections) ] - seeder.stop() - await fill_task - # Generate capture, capture ignores counter keys capture = await seeder.capture() @@ -1237,6 +1229,7 @@ async def test_cluster_fuzzymigration( keeping = node.slots[num_outgoing:] node.next_slots.extend(keeping) + logging.debug("start migrations") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) iterations = 0 @@ -1244,7 +1237,7 @@ async def test_cluster_fuzzymigration( is_all_finished = True for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") - print(states) + logging.debug(states) is_all_finished = is_all_finished and ( all("FINISHED" in s for s in states) or states == "NO_STATE" ) @@ -1257,22 +1250,16 @@ async def test_cluster_fuzzymigration( await asyncio.sleep(0.1) - # Stop counters for counter in counters: counter.cancel() # clean migrations for node in nodes: node.migrations = [] - - # TODO this config should be pushed with new slots - # Push new config - await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - - # Transfer nodes - for node in nodes: node.slots = node.next_slots - node.new_slots = [] + + logging.debug("remove finished migrations") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) # Check counter consistency cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)