DataQueue - Send and listen for data between client and workers - MATLAB (original) (raw)

parallel.pool.DataQueue

Send and listen for data between client and workers

Description

A DataQueue object enables asynchronous and automatic processing of data or messages sent between workers and client in a parallel pool while a computation is carried out. For example, you can send intermediate values to the client and automatically calculate the progress of the computation.

To send data from a parallel pool worker back to the client, first create aDataQueue object at the client. Pass this DataQueue into aparfor-loop or other parallel language construct, such asspmd. From the workers, call send to send data back to the client. At the client, specify a function to automatically process the data received by using afterEach.

Creation

Syntax

Description

`q` = parallel.pool.DataQueue creates an object that you can use to send or listen for messages (or data) between the client and workers. Create the DataQueue on the worker or client where you want to receive the data.

example

Properties

expand all

QueueLength — Number of items currently held on the queue

zero or positive integer

This property is read-only.

The number of items of data waiting to be removed from the queue, specified as a zero or positive integer. The value is 0 or a positive integer on the worker or client that created the DataQueue instance. If the client creates the DataQueue instance, the value is 0 on all workers. If a worker creates the DataQueue, the value is0 on the client and all other workers.

Object Functions

afterEach Define a function to call when new data is received on a data queue
send Send data between clients and workers using a data queue

Examples

collapse all

Send a Message in a parfor-Loop, and Dispatch the Message on the Queue

Construct a DataQueue, and callafterEach.

q = parallel.pool.DataQueue; afterEach(q, @disp);

Start a parfor-loop, and send a message. The pending message is passed to the afterEach function, in this example@disp.

parfor i = 1:3 send(q, i); end;

For more details on listening for data using a DataQueue, seeafterEach.

Find Length of DataQueue

When you send a message to a DataQueue object, the message waits in the queue until it is processed by a listener. Each message adds 1 to the queue length. In this example, you use the QueueLength property to find the length of a DataQueue object.

When a client or worker creates a DataQueue object, any messages that are sent to the queue are held in the memory of that client or worker. If the client creates a DataQueue object, the QueueLength property on all workers is 0. In this example, you create a DataQueue object on the client, and send data from a worker.

First, create a parallel pool with one worker.

Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 1 workers.

Then, create a DataQueue.

q = parallel.pool.DataQueue

q = DataQueue with properties:

QueueLength: 0

A newly created DataQueue has an empty queue. You can use parfor to find q.QueueLength on the worker. Find the queue length on the client, and the queue length on the worker.

fprintf('On the client: %i\n', q.QueueLength)

parfor i = 1 fprintf('On the worker: %i\n', q.QueueLength) end

As the queue is empty, the QueueLength is 0 for both the client and the worker. Next, send a message to the queue from the worker. Then, use the QueueLength property to find the length of the queue.

% Send a message first parfor i = 1 send(q, 'A message'); end

% Find the length fprintf('On the client: %i\n', q.QueueLength)

parfor i = 1 fprintf('On the worker: %i\n', q.QueueLength) end

The QueueLength property is 1 on the client, and 0 on the worker. Create a listener to process the queue by immediately displaying the data.

el = afterEach(q, @disp);

Wait until the queue is empty, then delete the listener.

while q.QueueLength > 0 pause(0.1); end delete(el);

Use the QueueLength property to find the length of the queue.

fprintf('On the client: %i\n', q.QueueLength)

QueueLength is 0 because the queue processing is complete.

Use a DataQueue Object and parfor to Update a Wait Bar

In this example, you use a DataQueue to update a wait bar with the progress of a parfor-loop.

When you create a parfor-loop, you offload each iteration to workers in a parallel pool. Information is only returned from the workers when theparfor-loop completes. You can use a DataQueue to update a wait bar at the end of each iteration.

When you update a wait bar with the progress of your parfor-loop, the client must record information about how many iterations remain.

The helper function parforWaitbar, defined at the end of this example, updates a wait bar. The function uses persistent to store information about the number of remaining iterations.

Use waitbar to create a wait bar, w.

w = waitbar(0,'Please wait ...');

Create a DataQueue, D. Then useafterEach to run parforWaitbar after messages are sent to the DataQueue.

% Create DataQueue and listener D = parallel.pool.DataQueue; afterEach(D,@parforWaitbar);

Set the number of iterations for your parfor-loop,N. Use the wait bar w and the number of iterations N to initialize the functionparforWaitbar.

At the end of each iteration of the parfor-loop, the client runsparforWaitbar and incrementally updates the wait bar.

N = 100; parforWaitbar(w,N)

The function parforWaitbar uses persistent variables to store the number of completed iterations on the client. No information is required from the workers.

Run a parfor-loop with N iterations. For this example, use pause and rand to simulate some work. After each iteration, use send to send a message to theDataQueue. When a message is sent to theDataQueue, the wait bar updates. Because no information is required from the workers, send an empty message to avoid unnecessary data transfer.

After the parfor-loop completes, use delete to close the wait bar.

parfor i = 1:N pause(rand) send(D,[]); end

delete(w);

Define the helper function parforWaitbar. When you runparforWaitbar with two input arguments, the function initializes three persistent variables (count, h, andN). When you run parforWaitbar with one input argument, the wait bar updates.

function parforWaitbar(waitbarHandle,iterations) persistent count h N

if nargin == 2
    % Initialize
    
    count = 0;
    h = waitbarHandle;
    N = iterations;
else
    % Update the waitbar
    
    % Check whether the handle is a reference to a deleted object
    if isvalid(h)
        count = count + 1;
        waitbar(count / N,h);
    end
end

end

Status bar indicating roughly one third completion.

Plot During Parameter Sweep with parfeval

This example shows how to perform a parallel parameter sweep with parfeval and send results back during computations with a DataQueue object.

parfeval does not block MATLAB, so you can continue working while computations take place.

The example performs a parameter sweep on the Lorenz system of ordinary differential equations, on the parameters σ and ρ, and shows the chaotic nature of this system.

ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

Set Up Parallel Environment

Create a pool of parallel of thread workers by using the parpool function.

Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 6 workers.

Create Parameter Grid

Define the range of parameters that you want to explore in the parameter sweep.

gridSize = 40; sigma = linspace(5,45,gridSize); rho = linspace(50,100,gridSize); beta = 8/3;

Create a 2-D grid of parameters by using the meshgrid function.

[rho,sigma] = meshgrid(rho,sigma);

Perform Parallel Parameter Sweep

After you define the parameters, you can perform the parallel parameter sweep.

To visualize the interim results of the parameter sweep, create a surface plot. Note that initializing the Z component of the surface with NaN creates an empty plot.

figure; surface = surf(rho,sigma,NaN(size(sigma))); xlabel('\rho','Interpreter','Tex') ylabel('\sigma','Interpreter','Tex')

To send interim data from the workers, create a DataQueue object. Set up a function that updates the surface plot each time a worker sends data by using the afterEach function. The updatePlot function is a supporting function defined at the end of the example.

Q = parallel.pool.DataQueue; afterEach(Q,@(data) updatePlot(surface,data));

parfeval works more efficiently when you distribute the workload. To distribute the workload, group the parameters to explore into partitions. For this example, split into uniform partitions of size step by using the colon operator (:). The resulting array partitions contains the boundaries of the partitions. Note that you must add the end point of the last partition.

step = 100; partitions = [1🪜numel(sigma),numel(sigma)+1]

partitions = 1×17

       1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601

For best performance, try to split into partitions that are:

To represent function executions on parallel workers and hold their results, use future objects.

f(1:numel(partitions)-1) = parallel.FevalFuture;

Offload computations to parallel workers by using the parfeval function. parameterSweep is a helper function defined at the end of this script that solves the Lorenz system on a partition of the parameters to explore. It has one output argument, so you must specify 1 as the number of outputs in parfeval.

for ii = 1:numel(partitions)-1 f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q); end

parfeval does not block MATLAB, so you can continue working while computations take place. The workers compute in parallel and send intermediate results through the DataQueue as soon as they become available.

If you want to block MATLAB until parfeval completes, use the wait function on the future objects. Using the wait function is useful when subsequent code depends on the completion of parfeval.

After parfeval finishes the computations, wait finishes and you can execute more code. For example, plot a selection of the Lorenz system solutions. Use the fetchOutputs function to retrieve the results stored in the future objects.

results = fetchOutputs(f); idxs = randperm(numel(results),4); figure for n = 1:numel(idxs) nexttile a = results{idxs(n)}; plot3(a(:,1),a(:,2),a(:,3)) grid on xlabel("x") ylabel("y") zlabel("z") title("Lorenz System Solution", ... "\rho = "+ num2str(rho(idxs(n)),'%5.2f') + " \sigma = "+ num2str(sigma(idxs(n)),'%5.2f'),Interpreter="tex") end

If your parameter sweep needs more computational resources and you have access to a cluster, you can scale up your parfeval computations. For more information, see Scale Up from Desktop to Cluster.

Define Helper Functions

Define a helper function that solves the Lorenz system on a partition of the parameters to explore. Send intermediate results to the MATLAB client by using the send function on the DataQueue object.

function results = parameterSweep(first,last,sigma,rho,beta,Q) results = cell(last-first,1); for ii = first:last-1 lorenzSystem = @(t,a) [sigma(ii)(a(2) - a(1)); a(1)(rho(ii) - a(3)) - a(2); a(1)a(2) - betaa(3)]; [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]); send(Q,[ii,a(end,3)]); results{ii-first+1} = a; end end

Define another helper function that updates the surface plot when new data arrives.

function updatePlot(surface,data) surface.ZData(data(1)) = data(2); drawnow('limitrate'); end

Tips

Extended Capabilities

Thread-Based Environment

Run code in the background using MATLAB® backgroundPool or accelerate code with Parallel Computing Toolbox™ ThreadPool.

This function fully supports thread-based environments. For more information, see Run MATLAB Functions in Thread-Based Environment.

Version History

Introduced in R2017a