Partition Pools for Efficient Resource Management in Concurrent Parallel Workflows - MATLAB & Simulink (original) (raw)

This example shows how to use pool partitions to effectively manage and optimize resource allocation in concurrent parallel workflows.

Pool partitions allow you to execute multiple workflows simultaneously without interference, enabling precise control over resource usage for each workflow. By assigning specific computations such as parfor, parfeval, and spmd to designated pool partitions, you can ensure that each workflow operates independently and efficiently.

This method is particularly advantageous for organizing parallel pipeline workflows, where different stages can have varying resource needs. In such scenarios, pool partitions facilitate smooth data flow through the pipeline by tailoring resource allocation to match the requirements of each stage. Additionally, pool partitions are beneficial when running separate workflows concurrently, as they help manage and limit the resources each workflow consumes.

This example illustrates the use of pool partitions in executing a simulation parallel pipeline with multiple GPUs alongside a Monte Carlo path planning simulation. You can adapt this approach to manage the execution of various workflows concurrently to ensure optimal resource distribution and workflow performance.

Create Pool Partitions

Start a parallel pool with 16 workers. For this example, the myCluster profile requests a parallel pool with 16 workers from a remote cluster, where each host has eight workers and two GPUs.

pool = parpool("myCluster",16);

Starting parallel pool (parpool) using the 'myCluster' profile ... Connected to parallel pool with 16 workers.

Create pool partitions tailored to the requirements of different stages in the simulation pipeline using the partition function.

Use the partition function to create a pool partition with one worker for each available GPU for best performance. If you do not have a GPU in the pool, the gpuPool partition returns an empty parallel.pool object and the GPU computations run on the client.

[gpuPool,cpuPool] = partition(pool,"MaxNumWorkersPerGPU",1);

Using the remaining workers in the cpuPool partition, create two additional partitions. One with four workers and another with the rest of the workers in the cpuPool partition. You need at least five workers to run the simulation pipeline and parfor-loop in parallel. If you do not have enough workers, reduce the number of workers you partition for the cpuLidarPool partition.

cpuWorkers = cpuPool.Workers; [cpuLidarPool,cpuOtherPool] = partition(cpuPool,"Workers",cpuWorkers(1:4));

This graphic illustrates how the simulation pipeline stages and the Monte Carlo path planning trials use the pool partitions.

Flowchart illustrating simulation pipeline stages and Monte Carlo path planning trials. The simulation pipeline includes four stages: Stage 1 uses 'parfeval' to generate scan parameters using the 'cpuLidarPool' partition, then sends data to a pollable data queue. Stage 2 computes ray-triangle intersections on the 'gpuPool' partition, then sends data to pollable data queues 2 and 3. Stage 3 sorts frames using 'cpuLidarPool' partition, then sends the data to a data queue for client display. Stage 4 collects cloud point data on the 'cpuLidarPool' partition. The Monte Carlo path planning trials use 'parfor' to find optimal paths with Particle Swarm Optimization using the 'cpuOtherPool' partition.

Perform Simulation Pipeline on Pool Partitions

In this example, the parallel simulation pipeline models a lidar scanning system using ray tracing. Lidar sensors, similar to radar and sonar, measure distances by emitting laser pulses that reflect off objects, allowing them to perceive the structure of their surroundings. By implementing ray tracing algorithms and performing intersection calculations on multiple GPUs, you can simulate a lidar scanning system to gather information about nearby structures in a scene and generate a point cloud map of the environment.

The pipeline calculates intersections between the triangulated scene and laser rays to determine how much light hits the object surfaces. To speed up these calculations, use the Möller-Trumbore[1] algorithm, modified to run on the GPU with arrayfun.

To further enhance simulation speed, execute the simulation stages as a parallel pipeline with multiple parfeval computations. To ensure that the different stages run on the correct workers, run the parfeval computations on the gpuPool and cpuLidarPool partitions.

Initialize Simulation Environment and Parameters

Begin by setting up a scene composed of multiple objects with triangulated surfaces. Specify the initial light position and movement parameters using the initializeParameters function, which is defined at the end of this example.

params = initializeParameters;

Next, visualize the scene using the plotScene function, also defined at the end of this example

plotScene(params); view([-110.88 31.50])

Prepare Data Queues and Define Callback Functions

To facilitate data transfer between different workers during the simulation, use a mixture of PollableDataQueue and DataQueue objects.

Create two PollableDataQueue objects with Destination set to "any" for the simulation pipeline. The first worker in the pipeline generates input parameters for each scan and sends the parameters to the stage2InputQueue pollable data queue for processing. One of the GPU workers then calculates the ray-triangle intersections and sends the results to the stage3SortQueue and stage4CloudQueue pollable data queue for further processing.

stage2InputQueue = parallel.pool.PollableDataQueue(Destination="any"); % Pollable data queue 1 stage3SortQueue = parallel.pool.PollableDataQueue(Destination="any"); % Pollable data queue 2 stage4CloudQueue = parallel.pool.PollableDataQueue(Destination="any"); % Pollable data queue 3

In this example, you run multiple, short parfeval computations on the gpuPool partition to calculate the intersections for each scan. In this way, you can interleave other parallel work on the same gpuPool partition if a worker is free. Create a DataQueue object with the name stage2TriggerQueue and use the afterEach function to define a function to run each time the stage2TriggerQueue data queue object receives data.

When the input parameters are ready, the worker running stage 1 sends a message to stage2TriggerQueue. After stage2TriggerQueue receives data, it automatically submits a parfeval computation to run the findIntersections function on the gpuPool partition. Note that the input data is already in the stage2InputQueue pollable data queue. The findIntersections function is defined at the end of the example.

stage2TriggerQueue = parallel.pool.DataQueue; afterEach(stage2TriggerQueue,@(call) ... parfeval(gpuPool,@findIntersections,0,stage2InputQueue,stage3SortQueue,stage4CloudQueue));

Prepare and initialize plots to visualize the intermediate scan data from the workers. The prepareScanningPlot function is defined at the end of this example.

[fig,s,rays] = prepareScanningPlot(params);

To track the progress of the scans on the client, create a DataQueue object with the name displayQueue. Use the afterEach function to run the displayScanFrames function when workers send data to the displayQueue data queue object. The displayScanFrames function is defined at the end of this example.

displayQueue = parallel.pool.DataQueue; afterEach(displayQueue,@(data) displayScanFrames(data,s,rays));

Start Simulation Pipeline

Stage 1 and Stage 2

For stage 1 of the pipeline, use a worker from the cpuLidarPool partition to run the addParamsToQueue function in the background with parfeval. The addParamsToQueue function continuously generates input parameters for each scan and sends them to the stage2InputQueue pollable data queue object. It also sends a message to the stage2TriggerQueue data queue object to trigger a parfeval computation on the gpuPool partition for stage 2 of the pipeline.

When the worker generates all the scan input parameters, it closes the stage2InputQueue to signal to the workers in the next stage that there is no more data to send. The addParamsToQueue function is defined at the end of this example.

fgenerate = parfeval(cpuLidarPool,@addParamsToQueue,0,params,stage2InputQueue,stage2TriggerQueue);

Stage 3

Use a worker from the cpuLidarPool partition to run the sortFrames helper function in the background with parfeval. The sortFrames function repeatedly polls the stage3SortQueue pollable data queue for new frame data, establishes a buffer for the frames, and sends them to the client in the correct sequence using the displayQueue data queue. The sortFrames function stops execution when a worker from the previous stage closes the stage3SortQueue pollable data queue. The sortFrames function is attached to this example as a supporting file.

fSort = parfeval(cpuLidarPool,@sortFrames,0,stage3SortQueue,displayQueue);

Stage 4

Run two instances of the collectCloudPointData function using workers from the cpuLidarPool partition to generate cloud point data. The collectCloudPointData function repeatedly polls the stage4CloudQueue pollable data queue for new intersection data and stops execution when a worker from the previous stage closes the stage4CloudQueue pollable data queue. The collectCloudPointData function is defined at the end of this example.

fCloud(1) = parfeval(cpuLidarPool,@collectCloudPointData,1,stage4CloudQueue); fCloud(2) = parfeval(cpuLidarPool,@collectCloudPointData,1,stage4CloudQueue);

Finally, make the figure visible to display the progress of the lidar simulation.

Perform Path Planning on Remaining Pool Partition

While the lidar simulation pipeline runs, you can use the remaining cpuOtherPool partition to perform additional computations. For example, you can use a parfor-loop to run multiple trials of a path planning algorithm using Particle Swarm Optimization (PSO). The goal is to find an optimal path from a starting position to a target position while avoiding objects in the scene.

Define Environment and PSO Parameters

Set up the environment parameters, including the starting and target positions, using the same scene from the simulation pipeline. Define the radius of the objects.

startPos = [25 25]; targetPos = [-30 -20]; objectRadius = ones(size(params.objectPositions,1),1)*2; obstacles = [params.objectPositions objectRadius];

Configure the parameters for the PSO algorithm, such as the number of particles, iterations, and trials. Restrict the problem to two dimensions.

numParticles = 1000; numIterations = 300; dim = 2; numTrials = 100;

Perform Path Planning Trials

Prepare to record the best path and scores for each trial. Use the parfor function to parallelize the trials for efficiency. To run the parfor-loop on the cpuOtherPool partition, specify the pool object as the second argument to the parfor function. The planPathPSO helper function is attached to this example as a supporting file.

parfor(trial = 1:numTrials,cpuOtherPool) [bestPath(trial,:,:),scores(trial)] = planPathPSO(startPos,targetPos, ... obstacles,numParticles,dim,numIterations); end

Identify the trial with the minimum score, which corresponds to the best path found by the PSO algorithm.

[~,bestInd] = min(scores);

Visualize the environment, obstacles, and the best path on a plot. The plotBestPath function is defined at the end of the example.

plotBestPath(obstacles,startPos,bestPath(bestInd,:,:),targetPos);

Visualize Points Cloud from Lidar Simulation

With the lidar simulation pipeline complete, you can retrieve the results from the fCloud parfeval computations.

pointCloud = fetchOutputs(fCloud);

Visualize the aggregated lidar sensor points cloud detections using the plotPointCloud function, which is defined at the end of the example. The points cloud visualizations shows the outline of the objects in the scene.

plotPointCloud(pointCloud,params);

References

[1] Möller, Tomas, and Ben Trumbore. "Fast, Minimum Storage Ray-Triangle Intersection." Journal of Graphics Tools 2, no. 1 (January 1997): 21–28. https://doi.org/10.1080/10867651.1997.10487468.

Local Supporting Functions

addParamsToQueue - Simulation Pipeline Stage 1

The addParamsToQueue function generates input parameters for each scan and manages their addition to a data queue for processing. For each step, it calculates the origin of the light source as it moves across the surface of the object, assigns a scan number, and sends this data to the stage2InputQueue pollable data queue. It also signals a parfeval computation to be scheduled on the GPU pool by sending a message to the stage2TriggerQueue pollable data queue. The function includes queue management to prevent the input queue from becoming overloaded by pausing when the queue length exceeds a specified threshold. After generating parameters foe all the scans, it closes the stage2InputQueue and submits a final task to the stage2TriggerQueue to close the stage 2 pollable data queues.The getScanParam helper function is attached to this example as a supporting file.

function addParamsToQueue(params,stage2InputQueue,stage2TriggerQueue) fN = 0;

numScans = size(params.lidarOrigins,1);

for scan = 1:numScans % Set lidar origin (position) lidarOrigin = params.lidarOrigins(scan,:);

for t = 1:params.totalTimeSteps
    data = getScanParam(params,t,lidarOrigin);
    fN = fN + 1;

    % Send scan number and input data to the data queue
    data.fN = fN;
    send(stage2InputQueue,data);

    % Submit parfeval computation on the GPU pool
    send(stage2TriggerQueue,"findIntersections");

    pause(0.5); % Pause to improve visualization

    % Perform some queue management to reduce strain on queue
    while stage2InputQueue.QueueLength > 10
        pause(1);
    end
end

end

% Close queue after generating all input parameters close(stage2InputQueue);

% Submit parfeval computation to close stage 2 queues send(stage2TriggerQueue,"findIntersections"); end

findIntersections - Simulation Pipeline Stage 2

The findIntersections function processes input parameters from a queue to calculate intersections between light rays and triangles of a 3-D object using GPU acceleration. It retrieves data from the stage2InputQueue pollable data queue and converts the ray origin and directions to gpuArray objects and uses arrayfun to perform ray-triangle intersection calculations on the GPU. It gathers the results, extracts the first intersection points and updates the data structure with the number of hits. Finally, it sends the scan number and updated data and intersection points to the stage3SortQueue and stage4CloudQueue pollable data queues respectively. The rayTriangleIntersection helper function is attached to this example as a supporting file. When no data is available in the stage2InputQueue, and poll returns OK as false because stage2InputQueue pollable data queue is closed, the function closes the stage3SortQueue and stage4CloudQueue to indicate the end of processing.

function findIntersections(stage2InputQueue,stage3SortQueue,stage4CloudQueue) [data,OK] = poll(stage2InputQueue,inf); if OK rayOriginsGPU = gpuArray(data.lidarOrigin); rayDirectionsGPU = gpuArray(data.rayDirections);

% Convert to gpuArray and calculate ray triangle intersections on the GPU
[Hits,intXs,intYs,intZs,intersectionDistances] = arrayfun(@rayTriangleIntersection, ...
    rayOriginsGPU(:,1),rayOriginsGPU(:,2),rayOriginsGPU(:,3), ...
    rayDirectionsGPU(:,1),rayDirectionsGPU(:,2),rayDirectionsGPU(:,3), ...
    data.A(:,1)',data.B(:,1)',data.C(:,1)',...
    data.A(:,2)',data.B(:,2)',data.C(:,2)', ...
    data.A(:,3)',data.B(:,3)',data.C(:,3)');

data.rayDirections = [];

% Find the closest intersection for each ray
[~,minIndices] = min(abs(intersectionDistances),[],2,"omitnan");

% Extract the first intersection points using the indices
numRays = size(intXs,1);
X = intXs(sub2ind(size(intXs),(1:numRays)',minIndices));
Y = intYs(sub2ind(size(intYs),(1:numRays)',minIndices));
Z = intZs(sub2ind(size(intZs),(1:numRays)',minIndices));

% Add intersection values to data structure
data.numHits = gather(sum(Hits,1));

% Send scan number and scan data to next worker to sort for display
output{1} = data.fN;
output{2} = data;
send(stage3SortQueue,output);

% Send intersection points to next worker
cloudData.intXs = gather(X);
cloudData.intYs = gather(Y);
cloudData.intZs = gather(Z);
send(stage4CloudQueue,cloudData);

elseif ~OK close(stage3SortQueue); close(stage4CloudQueue); end end

collectCloudPointData - Simulation Pipeline Stage 4

The collectCloudPointData function processes data from a the ray-intersection computation to collect and refine 3-D intersection points. It continuously polls a queue for new data, filters out invalid entries, downsamples the data, and rounds the coordinates to a specified precision. When no data is available in the stage4CloudQueue, and poll returns OK as false because a worker in the previous stage closed the stage4CloudQueue pollable data queue, the function stops execution and returns the point cloud data.

function pointCloud = collectCloudPointData(stage4CloudQueue) intersectionPoints = []; OK = true; while OK [cloudData,OK] = poll(stage4CloudQueue,inf); if ~isempty(cloudData) % Flatten the matrices X_flat = cloudData.intXs(:); Y_flat = cloudData.intYs(:); Z_flat = cloudData.intZs(:);

    % Create a logical mask for valid intersections
    validMask = ~isnan(X_flat);

    % Filter the points using the valid mask
    validX = X_flat(validMask);
    validY = Y_flat(validMask);
    validZ = Z_flat(validMask);

    % Combine the valid points into a single matrix
    thisIntersectionPoints = [validX validY validZ];
    % Downsample by selecting every nth point
    n = 5; % Downsample factor
    thisIntersectionPoints = thisIntersectionPoints(1:n:end,:);

    % Round coordinates to reduce precision
    precisionFactor = 0.01;
    thisIntersectionPoints = round(thisIntersectionPoints/precisionFactor)*precisionFactor;

    intersectionPoints = [intersectionPoints;thisIntersectionPoints];
    pointCloud = intersectionPoints;

elseif ~OK
    pointCloud = intersectionPoints;
    return
end

end end

initializeParameters

The initializeParameters function initializes and returns a structure containing parameters for the lidar scanning system simulation. It sets the number of rays per revolution, the number of vertical layers, the field of view of the sensor and the origin positions for the lidar scans. It also defines the surface profile for the objects in the scene and uses the createTriangulatedSurfaces helper function to generate the triangulated surface data. The createTriangulatedSurfaces helper function is attached to this example as a supporting file.

function params = initializeParameters % Define the object and create the triangulated surface params.surfProfile = 80; params.objectPositions = [-20 -20;20 -20;-20 20;20 20;-10 -5;5 5]; [params.A,params.B,params.C,params.indices] = createTriangulatedSurfaces(params.surfProfile, ... params.objectPositions);

% LiDAR Parameters params.numRaysPerRevolution = 1000; % Number of rays in one complete horizontal revolution params.numLayers = 40; % Number of vertical layers params.verticalFOV = 60; % Vertical field of view in degrees

% Simulation Parameters params.angularSpeed = 10; % Degrees per time step params.totalTimeSteps = 36; params.lidarOrigins = single([0,0,1;0,0,2;0,0,3]); % Set lidar origin (position end

displayScanFrames

The displayScanFrames function updates a visualization of lidar scan data by modifying the surface plot and light source based on new data.

function displayScanFrames(data,s,rays) s(1).UserData = s(1).UserData + data.numHits; indices = s(2).UserData;

% Normalize hits to range between 1 and 256 normalizedHits = rescale(s(1).UserData,1,256);

% Update surface plot with new intensities colors = repmat(normalizedHits',1,3);

for idx = 1:length(indices) rowStart = indices(idx,1); rowEnd = indices(idx,2); s(idx).CData = colors(rowStart:rowEnd,:); end

% Update origin of light source for idx = 1:size(data.rayX,1) rays(idx).XData = data.rayX(idx,:); rays(idx).YData = data.rayY(idx,:); rays(idx).ZData = data.rayZ(idx,:); end drawnow limitrate nocallbacks; end

prepareScanningPlot

The prepareScanningPlot function sets up a figure and axes to visualize the intersections of light rays with the object. A secondary axes plots the position of the light source as a red marker.

function [fig,s,rays] = prepareScanningPlot(params) fig = figure(Position=[933 313 582 483],Visible="off");

% Create axes for surface colors = zeros(size(params.A));

hold on for idx = 1:length(params.indices) rowStart = params.indices(idx,1); rowEnd = params.indices(idx,2); s(idx) = surf(params.A(rowStart:rowEnd,:), ... params.B(rowStart:rowEnd,:), ... params.C(rowStart:rowEnd,:), ... colors(rowStart:rowEnd,:), ... EdgeColor="none"); end hold off view([-110.88 31.50])

title("Intersections of Rays with Objects"); % Add title and axes labels axis square; axis off colormap turbo; c = colorbar; c.Label.String = "Ray Intensity"; view(160,28); s(1).UserData = zeros(1,size(params.A,1)); s(2).UserData = params.indices;

hold on for idx = 1:3 rays(idx) = plot3([NaN NaN],[NaN NaN],[NaN NaN],"r",LineWidth=3); end rays(1).MarkerIndices = 1; rays(1).Marker = "square"; rays(1).MarkerSize = 8; rays(1).MarkerFaceColor = "r"; rays(1).DisplayName = "Sensor position"; hold off axis off legend(rays(1),Location="southwest") end

plotScene

The plotScene function displays the scene for the lidar simulation.

function plotScene(params) figure; hold on for idx = 1:length(params.indices) rowStart = params.indices(idx,1); rowEnd = params.indices(idx,2); surf(params.A(rowStart:rowEnd,:), ... params.B(rowStart:rowEnd,:), ... params.C(rowStart:rowEnd,:)); end hold off view(3) axis square end

plotPointCloud

The plotPointCloud function displays the accumulated point cloud data from the lidar simulation pipeline.

function plotPointCloud(pointCloud,params) figure; scatter3(pointCloud(:,1),pointCloud(:,2),pointCloud(:,3),1,"filled"); hold on scatter3(params.lidarOrigins(:,1),params.lidarOrigins(:,2),params.lidarOrigins(:,3),"filled"); hold off xlabel("X"); ylabel("Y"); zlabel("Z"); title("Lidar Point Cloud"); legend("Point cloud","Sensor position",Location="bestoutside"); axis equal; grid on; end

plotBestPath

The plotBestPath function displays the environment, obstacles, and the best path from the PSO trials.

function plotBestPath(obstacles,startPos,bestPath,targetPos) figure; hold on; scatter(obstacles(:,1),obstacles(:,2),100); plot(startPos(1),startPos(2),"go","MarkerSize",10); plot(targetPos(1),targetPos(2),"rx","MarkerSize",10); plot(bestPath(1,:,1),bestPath(1,:,2),"b--"); legend("Obstacles","Start","Target","Best Path",Location="bestoutside"); title("Robot Path using PSO"); xlabel("X Position"); ylabel("Y Position"); hold off; end

See Also

partition

Topics