Send Messages to Workers Using Pollable Data Queues - MATLAB & Simulink (original) (raw)
Main Content
This example shows how to use pollable data queues to send data or instructions to workers during asynchronous function evaluations with parfeval
.
You can use PollableDataQueue
objects to transfer data and messages between the client and workers in an interactive parallel pool. By default, a PollableDataQueue
object sends the data only to the client or worker that creates the PollableDataQueue
object. However, starting in R2025a, you can also create a type of PollableDataQueue
object that allows the client or any worker in the pool to poll and receive data.
This example demonstrates how to prepare workers to receive data or instructions you send from the client using a PollableDataQueue
object. You also use the PollableDataQueue
object to smoothly stop a parfeval
computation on a worker. You can adapt this approach for any application that requires sending additional instructions to a worker during an asynchronous parfeval
computation. To see an example of controlling a hardware device using a PollableDataQueue
object, see Control Hardware and Acquire Data in Parallel.
Start a parallel pool with one thread worker.
pool = parpool("Threads",1);
Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 1 workers.
To enable communication from the worker to the client, create a PollableDataQueue
object with the Destination
argument set to "any"
. This type of PollableDataQueue
object allows both the client and worker to send and receive messages.
To simplify the communication between the client and worker, create two PollableDataQueue
objects. The workerToClient
queue sends messages from the worker to the client, while the clientToWorker
queue sends messages from the client to the worker.
workerToClient = parallel.pool.PollableDataQueue(Destination="any"); clientToWorker = parallel.pool.PollableDataQueue(Destination="any");
Define the function processData
, which runs on the worker. The processData
function waits for data from the client, processes it, and sends status updates back to the client. The function stops when it receives the "stop"
message.
function out = processData(workerToClient,clientToWorker) out = 0; send(workerToClient,"Ready to receive data."); while true % Wait for a message data = poll(clientToWorker,Inf); if strcmp(data,"stop") send(workerToClient,"Stopped processing data on worker.") return else response = sprintf("Data %d received.",data(1)); send(workerToClient,response); out = out+data(2); pause(1); end end end
Use parfeval
to execute the processData
function and prepare the worker to start waiting for messages from the client. parfeval
computes the processData
function asynchronously on a worker and does not block the client.
future = parfeval(@processData,1,workerToClient,clientToWorker);
Poll the workerToClient
queue to receive the initial status message from the worker.
status = poll(workerToClient,inf)
status = "Ready to receive data."
In a loop, send data to the worker using the clientToWorker
queue and poll the workerToClient
queue for confirmation before sending the next data point.
for idx = 1:5 send(clientToWorker,[idx rand]); status = poll(workerToClient,inf) end
status = "Data 1 received."
status = "Data 2 received."
status = "Data 3 received."
status = "Data 4 received."
status = "Data 5 received."
To stop the parfeval
computation and terminate the processing loop on the worker, send a "stop"
message to the clientToWorker
queue. If more than one worker is receiving data from the queue, close the queue using the close function instead of sending multiple "stop"
signals.
send(clientToWorker,"stop");
Poll for the final status message, wait for the parfeval
computation to complete, and retrieve the accumulated result using fetchOutputs
.
status = poll(workerToClient,inf)
status = "Stopped processing data on worker."
wait(future) out = fetchOutputs(future)