mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix(cluster): crash in cluster migration (#4495)
fix crash in cluster migration Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
451da72c41
commit
d6adedb066
4 changed files with 87 additions and 4 deletions
|
@ -409,6 +409,13 @@ class DashTable<_Key, _Value, Policy>::Iterator {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Iterator& AdvanceIfNotOccupied() {
|
||||||
|
if (!IsOccupied()) {
|
||||||
|
this->operator++();
|
||||||
|
}
|
||||||
|
return *this;
|
||||||
|
}
|
||||||
|
|
||||||
IteratorPairType operator->() const {
|
IteratorPairType operator->() const {
|
||||||
auto* seg = owner_->segment_[seg_id_];
|
auto* seg = owner_->segment_[seg_id_];
|
||||||
return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)};
|
return {seg->Key(bucket_id_, slot_id_), seg->Value(bucket_id_, slot_id_)};
|
||||||
|
|
|
@ -231,7 +231,7 @@ void RestoreStreamer::Run() {
|
||||||
ThisFiber::Yield();
|
ThisFiber::Yield();
|
||||||
last_yield = 0;
|
last_yield = 0;
|
||||||
}
|
}
|
||||||
} while (cursor);
|
} while (cursor && !fiber_cancelled_);
|
||||||
|
|
||||||
VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
|
VLOG(1) << "RestoreStreamer finished loop of " << my_slots_.ToSlotRanges().ToString()
|
||||||
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
|
<< ", shard " << db_slice_->shard_id() << ". Buckets looped " << stats_.buckets_loop;
|
||||||
|
@ -302,7 +302,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
||||||
|
|
||||||
it.SetVersion(snapshot_version_);
|
it.SetVersion(snapshot_version_);
|
||||||
string key_buffer; // we can reuse it
|
string key_buffer; // we can reuse it
|
||||||
for (; !it.is_done(); ++it) {
|
for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) {
|
||||||
const auto& pv = it->second;
|
const auto& pv = it->second;
|
||||||
string_view key = it->first.GetSlice(&key_buffer);
|
string_view key = it->first.GetSlice(&key_buffer);
|
||||||
if (ShouldWrite(key)) {
|
if (ShouldWrite(key)) {
|
||||||
|
|
|
@ -291,11 +291,10 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
|
||||||
it.SetVersion(snapshot_version_);
|
it.SetVersion(snapshot_version_);
|
||||||
unsigned result = 0;
|
unsigned result = 0;
|
||||||
|
|
||||||
while (!it.is_done()) {
|
for (it.AdvanceIfNotOccupied(); !it.is_done(); ++it) {
|
||||||
++result;
|
++result;
|
||||||
// might preempt due to big value serialization.
|
// might preempt due to big value serialization.
|
||||||
SerializeEntry(db_index, it->first, it->second);
|
SerializeEntry(db_index, it->first, it->second);
|
||||||
++it;
|
|
||||||
}
|
}
|
||||||
serialize_bucket_running_ = false;
|
serialize_bucket_running_ = false;
|
||||||
return result;
|
return result;
|
||||||
|
|
|
@ -2685,3 +2685,80 @@ async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_see
|
||||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
assert (await StaticSeeder.capture(nodes[1].client)) == start_capture
|
assert (await StaticSeeder.capture(nodes[1].client)) == start_capture
|
||||||
|
|
||||||
|
|
||||||
|
"""
|
||||||
|
Test cluster node distributing its slots into 2 other nodes.
|
||||||
|
In this test we start migrating to the second node only after the first one finished to
|
||||||
|
reproduce the bug found in issue #4455
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
|
||||||
|
async def test_migration_one_after_another(df_factory: DflyInstanceFactory, df_seeder_factory):
|
||||||
|
# 1. Create cluster of 3 nodes with all slots allocated to first node.
|
||||||
|
instances = [
|
||||||
|
df_factory.create(
|
||||||
|
port=next(next_port),
|
||||||
|
admin_port=next(next_port),
|
||||||
|
vmodule="outgoing_slot_migration=2,cluster_family=2,incoming_slot_migration=2,streamer=2",
|
||||||
|
)
|
||||||
|
for i in range(3)
|
||||||
|
]
|
||||||
|
df_factory.start_all(instances)
|
||||||
|
|
||||||
|
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||||
|
nodes[0].slots = [(0, 16383)]
|
||||||
|
nodes[1].slots = []
|
||||||
|
nodes[2].slots = []
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
logging.debug("DEBUG POPULATE first node")
|
||||||
|
key_num = 100000
|
||||||
|
await StaticSeeder(key_target=key_num, data_size=100).run(nodes[0].client)
|
||||||
|
dbsize_node0 = await nodes[0].client.dbsize()
|
||||||
|
assert dbsize_node0 > (key_num * 0.95)
|
||||||
|
|
||||||
|
# 2. Start migrating part of the slots from first node to second
|
||||||
|
logging.debug("Start first migration")
|
||||||
|
nodes[0].migrations.append(
|
||||||
|
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16300)], nodes[1].id)
|
||||||
|
)
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
# 3. Wait for migratin finish
|
||||||
|
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", timeout=50)
|
||||||
|
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED", timeout=50)
|
||||||
|
|
||||||
|
nodes[0].migrations = []
|
||||||
|
nodes[0].slots = [(16301, 16383)]
|
||||||
|
nodes[1].slots = [(0, 16300)]
|
||||||
|
nodes[2].slots = []
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
# 4. Start migrating remaind slots from first node to third node
|
||||||
|
logging.debug("Start second migration")
|
||||||
|
nodes[0].migrations.append(
|
||||||
|
MigrationInfo("127.0.0.1", nodes[2].instance.admin_port, [(16301, 16383)], nodes[2].id)
|
||||||
|
)
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
# 5. Wait for migratin finish
|
||||||
|
await wait_for_status(nodes[0].admin_client, nodes[2].id, "FINISHED", timeout=10)
|
||||||
|
await wait_for_status(nodes[2].admin_client, nodes[0].id, "FINISHED", timeout=10)
|
||||||
|
|
||||||
|
nodes[0].migrations = []
|
||||||
|
nodes[0].slots = []
|
||||||
|
nodes[1].slots = [(0, 16300)]
|
||||||
|
nodes[2].slots = [(16301, 16383)]
|
||||||
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
|
# 6. Check all data was migrated
|
||||||
|
# Using dbsize to check all the data was migrated to the other nodes.
|
||||||
|
# Note: we can not use the seeder capture as we migrate the data to 2 different nodes.
|
||||||
|
# TODO: improve the migration conrrectness by running the seeder capture on slot range (requiers changes in capture script).
|
||||||
|
dbsize_node1 = await nodes[1].client.dbsize()
|
||||||
|
dbsize_node2 = await nodes[2].client.dbsize()
|
||||||
|
assert dbsize_node1 + dbsize_node2 == dbsize_node0
|
||||||
|
assert dbsize_node2 > 0 and dbsize_node1 > 0
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue