fix cluster: migration crash fix (#4508)

* fix cluster: migration crash fix

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2025-01-27 10:39:11 +02:00 committed by GitHub
parent 3fdd9877f4
commit b4ef0a06e2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 112 additions and 8 deletions

View file

@ -65,6 +65,7 @@ CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() {
CHECK(stack_.empty());
Payload pl = std::move(current_);
current_ = monostate{};
ConsumeLastError();
return pl;
}

View file

@ -717,6 +717,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
PrimeTable* table = GetTables(db_index).first;
auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) {
it.AdvanceIfNotOccupied();
while (!it.is_done()) {
del_entry_cb(it);
++it;
@ -724,7 +725,7 @@ void DbSlice::FlushSlotsFb(const cluster::SlotSet& slot_ids) {
};
if (const PrimeTable::bucket_iterator* bit = req.update()) {
if (bit->GetVersion() < next_version) {
if (!bit->is_done() && bit->GetVersion() < next_version) {
iterate_bucket(db_index, *bit);
}
} else {

View file

@ -297,7 +297,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;
if (it.GetVersion() < snapshot_version_) {
if (!it.is_done() && it.GetVersion() < snapshot_version_) {
stats_.buckets_written++;
it.SetVersion(snapshot_version_);

View file

@ -390,7 +390,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
const PrimeTable::bucket_iterator* bit = req.update();
if (bit) {
if (bit->GetVersion() < snapshot_version_) {
if (!bit->is_done() && bit->GetVersion() < snapshot_version_) {
stats_.side_saved += SerializeBucket(db_index, *bit);
}
} else {

View file

@ -2763,3 +2763,101 @@ async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_s
dbsize_node2 = await nodes[2].client.dbsize()
assert dbsize_node1 + dbsize_node2 == dbsize_node0
assert dbsize_node2 > 0 and dbsize_node1 > 0
"""
Test cluster node distributing its slots into 3 other nodes.
In this test we randomize the slot ranges that are migrated to each node
For each migration we start migration, wait for it to finish and once it is finished we send migration finalization config
"""
@pytest.mark.slow
@pytest.mark.exclude_epoll
@pytest.mark.asyncio
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_migration_rebalance_node(df_factory: DflyInstanceFactory, df_seeder_factory):
# 1. Create cluster of 3 nodes with all slots allocated to first node.
instances = [
df_factory.create(
port=next(next_port),
admin_port=next(next_port),
vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2",
)
for i in range(4)
]
df_factory.start_all(instances)
def create_random_ranges():
# Generate 2 random breakpoints within the range
breakpoints = sorted(random.sample(range(1, 16382), 2))
ranges = [
(0, breakpoints[0] - 1),
(breakpoints[0], breakpoints[1] - 1),
(breakpoints[1], 16383),
]
return ranges
# Create 3 random ranges from 0 to 16383
random_ranges = create_random_ranges()
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = random_ranges
nodes[1].slots = []
nodes[2].slots = []
nodes[3].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
key_num = 100000
logging.debug(f"DEBUG POPULATE first node with number of keys: {key_num}")
await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client)
dbsize_node0 = await nodes[0].client.dbsize()
assert dbsize_node0 > (key_num * 0.95)
logging.debug("start seeding")
# Running seeder with pipeline mode when finalizing migrations leads to errors
# TODO: I believe that changing the seeder to generate pipeline command only on specific slot will fix the problem
seeder = df_seeder_factory.create(
keys=50_000, port=instances[0].port, cluster_mode=True, pipeline=False
)
await seeder.run(target_deviation=0.1)
seed = asyncio.create_task(seeder.run())
migration_info = [
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [random_ranges[0]], nodes[1].id),
MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [random_ranges[1]], nodes[2].id),
MigrationInfo("127.0.0.1", nodes[3].instance.admin_port, [random_ranges[2]], nodes[3].id),
]
nodes_lock = asyncio.Lock()
async def do_migration(index):
await asyncio.sleep(random.randint(1, 10) / 5)
async with nodes_lock:
logging.debug(f"Start migration from node {index}")
nodes[0].migrations.append(migration_info[index - 1])
await push_config(
json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]
)
logging.debug(f"wait migration from node {index}")
await wait_for_status(nodes[0].admin_client, nodes[index].id, "FINISHED", timeout=50)
await wait_for_status(nodes[index].admin_client, nodes[0].id, "FINISHED", timeout=50)
logging.debug(f"finished migration from node {index}")
await asyncio.sleep(random.randint(1, 5) / 5)
async with nodes_lock:
logging.debug(f"Finalize migration from node {index}")
nodes[index].slots = migration_info[index - 1].slots
nodes[0].slots.remove(migration_info[index - 1].slots[0])
nodes[0].migrations.remove(migration_info[index - 1])
await push_config(
json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]
)
all_migrations = [asyncio.create_task(do_migration(i)) for i in range(1, 4)]
for migration in all_migrations:
await migration
logging.debug("stop seeding")
seeder.stop()
await seed

View file

@ -425,6 +425,7 @@ class DflySeeder:
stop_on_failure=True,
cluster_mode=False,
mirror_to_fake_redis=False,
pipeline=True,
):
if cluster_mode:
max_multikey = 1
@ -439,6 +440,7 @@ class DflySeeder:
self.stop_flag = False
self.stop_on_failure = stop_on_failure
self.fake_redis = None
self.use_pipeline = pipeline
self.log_file = log_file
if self.log_file is not None:
@ -447,6 +449,7 @@ class DflySeeder:
if mirror_to_fake_redis:
logging.debug("Creating FakeRedis instance")
self.fake_redis = fakeredis.FakeAsyncRedis()
self.use_pipeline = False
async def run(self, target_ops=None, target_deviation=None):
"""
@ -604,18 +607,19 @@ class DflySeeder:
break
try:
if self.fake_redis is None:
if self.use_pipeline:
pipe = client.pipeline(transaction=tx_data[1])
for cmd in tx_data[0]:
pipe.execute_command(*cmd)
await pipe.execute()
else:
# To mirror consistently to Fake Redis we must only send to it successful
# commands. We can't use pipes because they might succeed partially.
for cmd in tx_data[0]:
dfly_resp = await client.execute_command(*cmd)
fake_resp = await self.fake_redis.execute_command(*cmd)
assert dfly_resp == fake_resp
# To mirror consistently to Fake Redis we must only send to it successful
# commands. We can't use pipes because they might succeed partially.
if self.fake_redis is not None:
fake_resp = await self.fake_redis.execute_command(*cmd)
assert dfly_resp == fake_resp
except (redis.exceptions.ConnectionError, redis.exceptions.ResponseError) as e:
if self.stop_on_failure:
await self._close_client(client)