Perform Data Acquisition and Processing on Pool Workers - MATLAB & Simulink (original) (raw)

Main Content

This example shows how to implement a parallel data acquisition and processing pipeline on an interactive parallel pool. The pipeline uses PollableDataQueue objects to facilitate data transfer between workers in the parallel pool.

Starting in R2025a, you can create PollableDataQueue objects that the client or any worker can poll for data. Use this type of PollableDataQueue object to transfer data or messages between workers in pipeline workflows or in applications where multiple workers need access to a single PollableDataQueue object.

This example demonstrates how to parallelize an image acquisition and processing pipeline. You can adapt this approach to accelerate any pipeline workflow.

Parallel Data Processing Pipeline

In this example, you capture streaming images of a simulated pendulum in motion. You then perform real-time image processing and analysis on each frame to determine the center of the pendulum. In this setup, one worker from the parallel pool acquires image data from a simulation and passes the data to a PollableDataQueue object. Two other workers poll this queue to receive and process the data and then send it to another PollableDataQueue object. A fourth worker polls this second queue to receive and analyze the data, finally sending it to a DataQueue object with a callback function that displays the results on the client.

Flowchart illustrating parallel data acquisition and processing pipeline stages. The pipeline includes four stages: Stage 1 consists of one worker that captures the images of the pendulum in motion, then sends data to a pollable data queue. Stage 2 consists of two workers that poll pollable data queue 1 for an image, process the image, then send the processed image to pollable data queues 2. Stage 3 consists of one worker that analyses the image to determine the center of the pendulum, then sends the results to a data queue for display on the client.

Set Up Parallel Environment and Data Queues

Start a parallel pool with four workers.

pool = parpool("Processes",4);

Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 4 workers.

Prepare and initialize plots to visualize data from the workers. The prepareDisplay function is defined at the end of this example.

[fig,p,himage] = prepareDisplay;

To enable data transfer between the workers, create PollableDataQueue objects with the Destination argument set to "any". When you set the Destination argument to "any", the client or any worker can poll the resulting queue for data.

Create two PollableDataQueue objects with Destination set to "any" for the data acquisition and processing stages of the pipeline.

acquisitionToProcessingPdq = parallel.pool.PollableDataQueue(destination="any"); processingToAnalysisPdq = parallel.pool.PollableDataQueue(destination="any");

Create an additional PollableDataQueue object to enable the client to send messages to the worker performing the data acquisition.

stopSignalPdq = parallel.pool.PollableDataQueue(destination="any");

To visualize the processed images and result data on the client, create a DataQueue object, displayResultsDq. Use the afterEach function to run the displayOnClient function when workers send data to the displayResultsDq object. The displayOnClient function is defined at the end of this example.

displayResultsDq = parallel.pool.DataQueue; afterEach(displayResultsDq,@(results) displayOnClient(p,himage,results));

Define Functions for Parallel Pipeline

To manage data flow and smoothly stop computations in the pipeline, define a different function for each stage: data acquisition, data processing, and data analysis. Each function uses specific queues to facilitate data transfer and communication between workers.

Data Acquisition Stage

In the acquireData function, a worker captures data and sends it to the acquisitionToProcessingPdq queue. The worker continuously acquires data until it receives a stop signal from the stopSignalPdq queue, then it closes the acquisitionToProcessingPdq queue. The worker simulates data acquisition at a specified rate using the generateFrames function. The generateFrames function is attached to this example as a supporting file.

function acquireData(stopSignalPdq,acquisitionToProcessingPdq) while isempty(poll(stopSignalPdq)) rawData = generateFrames(10); send(acquisitionToProcessingPdq,rawData); end close(acquisitionToProcessingPdq); clear generateFrames send(stopSignalPdq,"Data acquisition stopped")

end

Data Processing Stage

In the processData function, a worker polls the acquisitionToProcessingPdq queue for new data, processes it, and sends the results to the processingToAnalysisPdq queue. The loop continues until the worker running the data acquisition stage closes the acquisitionToProcessingPdq queue. When the acquisitionToProcessingPdq queue is closed and no data is available in the queue, poll returns the status indicator OK as false. The worker then stops waiting for data and closes the processingToAnalysisPdq queue. The processFrames function is attached to this example as a supporting file.

function processData(acquisitionToProcessingPdq,processingToAnalysisPdq) OK = true; while OK [rawFrame,OK] = poll(acquisitionToProcessingPdq,Inf); if OK processedFrames = processFrames(rawFrame); send(processingToAnalysisPdq,processedFrames); end end close(processingToAnalysisPdq); end

Final Data Analysis Stage

In the analyzeData function, a worker polls the processingToAnalysisPdq queue for processed data, analyzes it, and sends the results to the displayResultsDq queue. The loop continues until a worker running the previous stage closes the processingToAnalysisPdq queue and the worker drains the queue. When the processingToAnalysisPdq queue is closed and no data is available in the queue, poll returns the status indicator OK as false. The worker then stops waiting for data. The findPendulumCenters function is attached to this example as a supporting file.

function allCentroids = analyzeData(processingToAnalysisPdq,displayResultsDq) idx = 0; OK = true; while OK [processedFrames,OK] = poll(processingToAnalysisPdq,Inf); if OK idx = idx+1; results = findPendulumCenters(processedFrames); allCentroids(idx,:) = results.centroids; send(displayResultsDq,results); end end end

Start and Stop Data Acquisition and Analysis

To execute a different function on each worker, use the parfeval function. parfeval allows you to run tasks asynchronously without blocking MATLABĀ®.

Instruct a worker to begin acquiring data.

captureF = parfeval(@acquireData,0,stopSignalPdq,acquisitionToProcessingPdq);

Instruct two workers to execute the data processing function.

processFOne = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq); processFTwo = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq);

Instruct the final worker to perform data analysis and send the results to the client.

analyzeF = parfeval(@analyzeData,1,processingToAnalysisPdq,displayResultsDq);

This figure displays the input frame, processed frame, and analysis results from the workers.

The parfeval function does not block MATLAB, so you can continue working while computations take place. The workers process the different stages of the pipeline in parallel and send intermediate results to the client as soon as they become available.

Collect and analyze data for a fixed amount of time.

Send a message to the stopSignalPdq queue. The worker running the data acquisition periodically polls this queue for messages. When the worker receives a message, the worker stops data acquisition and closes the acquisitionToProcessingPdq queue.

send(stopSignalPdq,"stop");

Wait for the last worker in the pipeline to complete its parfeval computation.

Confirm that the data acquisition worker successfully stopped acquiring data.

status = poll(stopSignalPdq)

status = "Data acquisition stopped"

Use the fetchOutputs function to retrieve the results from the analyzeF future object. Calculate the pendulum length by fitting a circle through the pendulum centers and plot the results. For more details, see the calculateAndPlotLength function attached to this example as a supporting file.

allCentroids = fetchOutputs(analyzeF); calculateAndPlotLength(allCentroids);

Supporting Functions

displayOnClient

The displayOnClient function updates a figure plotting the detected pendulum centroids and updating the image frames. It appends the new centroid coordinates to the plot and refreshes the pendulum images with the latest input frame, region of interest (ROI), and processed frame.

function displayOnClient(p,himage,results) centroids = results.centroids; p.XData = [p.XData centroids(1)]; p.YData = [p.YData centroids(2)];

himage(1).CData = results.inputFrame; himage(2).CData = results.roi; himage(3).CData = results.processedFrame; drawnow limitrate nocallbacks; end

prepareDisplay

The prepareDisplay function sets up a figure window with a tiled layout to display images and plots related to pendulum tracking. The function creates a plot for tracking pendulum centers and initializes image placeholders for displaying various stages of image processing.

function [fig,p,himage] = prepareDisplay fig = figure(Name="Images from Camera",Visible="off"); tiledlayout(fig,3,3) nexttile(4,[2 3]) p = plot(NaN,NaN,"m."); axis ij; axis equal; xlabel("x"); ylabel("y"); title("Pendulum Centers");

himage = gobjects(1,3); titleStr = ["Pendulum Simulation","Cropped Region","Segmented Pendulum"]; for n = 1:3 nexttile(n) himage(n) = imshow(rand(480,640)); title(titleStr(n)) end end

See Also

Functions

Objects

Topics