mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: cancel blocking command during migration finalization (#4904)
* fix: cancel blocking command during migration finalization
This commit is contained in:
parent
d57a581303
commit
150357d960
2 changed files with 52 additions and 1 deletions
|
@ -1006,10 +1006,17 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
|
|||
auto new_config = is_incoming ? ClusterConfig::Current()->CloneWithChanges(slots, {})
|
||||
: ClusterConfig::Current()->CloneWithChanges({}, slots);
|
||||
|
||||
auto blocking_filter = [&new_config](ArgSlice keys) {
|
||||
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
|
||||
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
|
||||
};
|
||||
// we don't need to use DispatchTracker here because for IncomingMingration we don't have
|
||||
// connectionas that should be tracked and for Outgoing migration we do it under Pause
|
||||
server_family_->service().proactor_pool().AwaitFiberOnAll(
|
||||
[&new_config](util::ProactorBase*) { ClusterConfig::SetCurrent(new_config); });
|
||||
[this, &new_config, &blocking_filter](util::ProactorBase*) {
|
||||
server_family_->CancelBlockingOnThread(blocking_filter);
|
||||
ClusterConfig::SetCurrent(new_config);
|
||||
});
|
||||
DCHECK(ClusterConfig::Current() != nullptr);
|
||||
VLOG(1) << "Config is updated for slots ranges: " << slots.ToString() << " for " << MyID()
|
||||
<< " : " << node_id;
|
||||
|
|
|
@ -3172,3 +3172,47 @@ async def test_readonly_replication(
|
|||
|
||||
# This behavior can be changed in the future
|
||||
assert await r1_node.client.execute_command("GET Y") == None
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
|
||||
async def test_cancel_blocking_cmd_during_mygration_finalization(df_factory: DflyInstanceFactory):
|
||||
# blocking commands should be canceled during migration finalization
|
||||
instances = [df_factory.create(port=next(next_port)) for i in range(2)]
|
||||
df_factory.start_all(instances)
|
||||
|
||||
c_nodes = [instance.client() for instance in instances]
|
||||
|
||||
nodes = [(await create_node_info(instance)) for instance in instances]
|
||||
nodes[0].slots = [(0, 16383)]
|
||||
nodes[1].slots = []
|
||||
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
|
||||
|
||||
logging.debug("Start blpop task")
|
||||
blpop_task = asyncio.create_task(c_nodes[0].blpop("list", 0))
|
||||
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
assert not blpop_task.done()
|
||||
|
||||
nodes[0].migrations.append(
|
||||
MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id)
|
||||
)
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
|
||||
|
||||
await wait_for_status(nodes[0].client, nodes[1].id, "FINISHED")
|
||||
|
||||
with pytest.raises(aioredis.ResponseError) as e_info:
|
||||
await blpop_task
|
||||
assert "MOVED" in str(e_info.value)
|
||||
|
||||
assert await c_nodes[1].type("list") == "none"
|
||||
|
||||
nodes[0].migrations = []
|
||||
nodes[0].slots = []
|
||||
nodes[1].slots = [(0, 16383)]
|
||||
|
||||
logging.debug("remove finished migrations")
|
||||
await push_config(json.dumps(generate_config(nodes)), [node.client for node in nodes])
|
||||
|
||||
assert await c_nodes[1].type("list") == "none"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue