Send Messages to Workers Using Pollable Data Queues - MATLAB & Simulink (original) (raw)

Main Content

This example shows how to use pollable data queues to send data or instructions to workers during asynchronous function evaluations with parfeval.

You can use PollableDataQueue objects to transfer data and messages between the client and workers in an interactive parallel pool. By default, a PollableDataQueue object sends the data only to the client or worker that creates the PollableDataQueue object. However, starting in R2025a, you can also create a type of PollableDataQueue object that allows the client or any worker in the pool to poll and receive data.

This example demonstrates how to prepare workers to receive data or instructions you send from the client using a PollableDataQueue object. You also use the PollableDataQueue object to smoothly stop a parfeval computation on a worker. You can adapt this approach for any application that requires sending additional instructions to a worker during an asynchronous parfeval computation. To see an example of controlling a hardware device using a PollableDataQueue object, see Control Hardware and Acquire Data in Parallel.

Start a parallel pool with one thread worker.

pool = parpool("Threads",1);

Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 1 workers.

To enable communication from the worker to the client, create a PollableDataQueue object with the Destination argument set to "any". This type of PollableDataQueue object allows both the client and worker to send and receive messages.

To simplify the communication between the client and worker, create two PollableDataQueue objects. The workerToClient queue sends messages from the worker to the client, while the clientToWorker queue sends messages from the client to the worker.

workerToClient = parallel.pool.PollableDataQueue(Destination="any"); clientToWorker = parallel.pool.PollableDataQueue(Destination="any");

Define the function processData, which runs on the worker. The processData function waits for data from the client, processes it, and sends status updates back to the client. The function stops when it receives the "stop" message.

function out = processData(workerToClient,clientToWorker) out = 0; send(workerToClient,"Ready to receive data."); while true % Wait for a message data = poll(clientToWorker,Inf); if strcmp(data,"stop") send(workerToClient,"Stopped processing data on worker.") return else response = sprintf("Data %d received.",data(1)); send(workerToClient,response); out = out+data(2); pause(1); end end end

Use parfeval to execute the processData function and prepare the worker to start waiting for messages from the client. parfeval computes the processData function asynchronously on a worker and does not block the client.

future = parfeval(@processData,1,workerToClient,clientToWorker);

Poll the workerToClient queue to receive the initial status message from the worker.

status = poll(workerToClient,inf)

status = "Ready to receive data."

In a loop, send data to the worker using the clientToWorker queue and poll the workerToClient queue for confirmation before sending the next data point.

for idx = 1:5 send(clientToWorker,[idx rand]); status = poll(workerToClient,inf) end

status = "Data 1 received."

status = "Data 2 received."

status = "Data 3 received."

status = "Data 4 received."

status = "Data 5 received."

To stop the parfeval computation and terminate the processing loop on the worker, send a "stop" message to the clientToWorker queue. If more than one worker is receiving data from the queue, close the queue using the close function instead of sending multiple "stop" signals.

send(clientToWorker,"stop");

Poll for the final status message, wait for the parfeval computation to complete, and retrieve the accumulated result using fetchOutputs.

status = poll(workerToClient,inf)

status = "Stopped processing data on worker."

wait(future) out = fetchOutputs(future)

See Also

Functions

Objects

Topics