mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
* fix: fix RestoreStreamer to prevent bucket skipping #2830
This commit is contained in:
parent
bcbcc5a2c6
commit
7606af706f
2 changed files with 7 additions and 18 deletions
|
@ -83,6 +83,8 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
|
||||||
|
|
||||||
bool written = false;
|
bool written = false;
|
||||||
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
|
cursor = pt->Traverse(cursor, [&](PrimeTable::bucket_iterator it) {
|
||||||
|
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
|
||||||
|
DbSlice::Iterator::FromPrime(it), snapshot_version_);
|
||||||
if (WriteBucket(it)) {
|
if (WriteBucket(it)) {
|
||||||
written = true;
|
written = true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1177,11 +1177,6 @@ async def test_cluster_fuzzymigration(
|
||||||
seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True)
|
seeder = df_seeder_factory.create(keys=keys, port=nodes[0].instance.port, cluster_mode=True)
|
||||||
await seeder.run(target_deviation=0.1)
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
fill_task = asyncio.create_task(seeder.run())
|
|
||||||
|
|
||||||
# some time fo seeder
|
|
||||||
await asyncio.sleep(0.5)
|
|
||||||
|
|
||||||
# Counter that pushes values to a list
|
# Counter that pushes values to a list
|
||||||
async def list_counter(key, client: aioredis.RedisCluster):
|
async def list_counter(key, client: aioredis.RedisCluster):
|
||||||
for i in itertools.count(start=1):
|
for i in itertools.count(start=1):
|
||||||
|
@ -1197,9 +1192,6 @@ async def test_cluster_fuzzymigration(
|
||||||
for key, conn in zip(counter_keys, counter_connections)
|
for key, conn in zip(counter_keys, counter_connections)
|
||||||
]
|
]
|
||||||
|
|
||||||
seeder.stop()
|
|
||||||
await fill_task
|
|
||||||
|
|
||||||
# Generate capture, capture ignores counter keys
|
# Generate capture, capture ignores counter keys
|
||||||
capture = await seeder.capture()
|
capture = await seeder.capture()
|
||||||
|
|
||||||
|
@ -1237,6 +1229,7 @@ async def test_cluster_fuzzymigration(
|
||||||
keeping = node.slots[num_outgoing:]
|
keeping = node.slots[num_outgoing:]
|
||||||
node.next_slots.extend(keeping)
|
node.next_slots.extend(keeping)
|
||||||
|
|
||||||
|
logging.debug("start migrations")
|
||||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
iterations = 0
|
iterations = 0
|
||||||
|
@ -1244,7 +1237,7 @@ async def test_cluster_fuzzymigration(
|
||||||
is_all_finished = True
|
is_all_finished = True
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
||||||
print(states)
|
logging.debug(states)
|
||||||
is_all_finished = is_all_finished and (
|
is_all_finished = is_all_finished and (
|
||||||
all("FINISHED" in s for s in states) or states == "NO_STATE"
|
all("FINISHED" in s for s in states) or states == "NO_STATE"
|
||||||
)
|
)
|
||||||
|
@ -1257,22 +1250,16 @@ async def test_cluster_fuzzymigration(
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
|
|
||||||
# Stop counters
|
|
||||||
for counter in counters:
|
for counter in counters:
|
||||||
counter.cancel()
|
counter.cancel()
|
||||||
|
|
||||||
# clean migrations
|
# clean migrations
|
||||||
for node in nodes:
|
for node in nodes:
|
||||||
node.migrations = []
|
node.migrations = []
|
||||||
|
|
||||||
# TODO this config should be pushed with new slots
|
|
||||||
# Push new config
|
|
||||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
|
||||||
|
|
||||||
# Transfer nodes
|
|
||||||
for node in nodes:
|
|
||||||
node.slots = node.next_slots
|
node.slots = node.next_slots
|
||||||
node.new_slots = []
|
|
||||||
|
logging.debug("remove finished migrations")
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
# Check counter consistency
|
# Check counter consistency
|
||||||
cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)
|
cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue