PollableDataQueue - Send and poll data between client and workers - MATLAB (original) (raw)

parallel.pool.PollableDataQueue

Send and poll data between client and workers

Description

A PollableDataQueue object enables synchronous sending and polling for data or messages between the client and workers in a parallel pool during a computation. For example, from a worker, you can send intermediate values to the client or another worker and use the values in another computation. You can also:

Unlike all other handle objects, PollableDataQueue andDataQueue objects remain connected when you transfer them.

Creation

Syntax

Description

`q` = parallel.pool.PollableDataQueue creates a PollableDataQueue object that you can use to send and poll for data between the client and workers. The resultingPollableDataQueue object can be polled only by the client or worker that creates it. Create the PollableDataQueue on the worker or client where you want to receive the data.

example

`q` = parallel.pool.PollableDataQueue(Destination=[destination](#mw%5F1abea9e0-30cd-4392-8268-0fecefbcaf04)) sets the destination behavior of the PollableDataQueue object. (since R2025a)

If you want the client or any worker to be able to poll thePollableDataQueue object to receive data, setDestination="any".

example

Input Arguments

expand all

Since R2025a

Destination behavior of the queue, specified as one of these values:

Properties

expand all

Since R2025a

This property is read-only after you close the queue using the close object function.

Queue closure state, represented as one of these values:

Data Types: logical

This property is read-only.

The number of items of data currently held in the queue that a worker or the client can potentially poll to receive, represented as zero or a positive integer.

The destination behavior of the queue, set using the Destination name-value argument, determines theQueueLength property value:

Before R2025a: The QueueLength property value is 0 or a positive integer on the worker or client that creates the PollableDataQueue object. If the client creates thePollableDataQueue object, the value is 0 on all workers. If a worker creates the PollableDataQueue, the value is0 on the client and all other workers.

Object Functions

close Close pollable data queue
poll Retrieve data sent to pollable data queue
send Send data between clients and workers using a data queue

Examples

collapse all

Create a PollableDataQueue object.

p = parallel.pool.PollableDataQueue;

Run a parfor-loop, and send a message, such as data with the value 1.

parfor idx = 1 send(p,idx); end

Poll for the result.

For more details on polling for data using a PollableDataQueue object, see poll.

Since R2025a

Use a PollableDataQueue object with Destination set to "any" to send messages from the client to multiple workers in a parallel pool.

Start a pool of four thread workers.

numWorkers = 4; pool = parpool("Threads",numWorkers);

Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 4 workers.

Create two PollableDataQueue objects, a queue named workerPdq (you create by setting Destination to "any") to send messages to workers, and a queue named clientPdq to receive messages back from the workers.

workerPdq = parallel.pool.PollableDataQueue(Destination="any"); clientPdq = parallel.pool.PollableDataQueue;

Use parfevalOnAll to execute the analyzeMessage helper function on all workers. Pass the workerPdq and clientPdq queues as arguments to the function.

parfevalOnAll(@analyzeMessage,0,workerPdq,clientPdq);

Send a personalized message to each worker through the workerPdq queue.

for idx = 1:numWorkers send(workerPdq,compose("Hello, Worker %d!",idx)); end

Poll the clientPdq queue to receive messages from the workers. Use inf to wait indefinitely for each message.

for idx = 1:numWorkers poll(clientPdq,inf) end

ans = "Worker 1 received message!"

ans = "Worker 2 received message!"

ans = "Worker 3 received message!"

ans = "Worker 4 received message!"

Define the helper function analyzeMessage that each worker executes. The function polls the inQueue queue for a message and extracts the worker number. The function then sends a confirmation message back to the outQueue queue.

function analyzeMessage(inQueue,outQueue) message = poll(inQueue,2); workerNum = sscanf(message,"Hello, Worker %u"); send(outQueue,compose("Worker %d received message!",workerNum)); pause(2) end

When you send a message to a PollableDataQueue object, the message waits in the queue. Each message adds 1 to the queue length. When you use poll, one message is collected from the queue. In this example, you use the QueueLength property to find the length of a PollableDataQueue object and observe how the Destination argument affects it.

First, create a parallel pool with one worker.

Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 1 workers.

Create a PollableDataQueue object. By default, the parallel.pool.PollableDataQueue function creates a PollableDataQueue object with the Destination set to "creator". This type of PollableDataQueue object allows only the client or worker that creates the queue to poll the object for data.

queue = parallel.pool.PollableDataQueue

queue = PollableDataQueue with properties:

      QueueLength: 0
         IsClosed: false

Initially, the queue is empty. Check the queue length on the client and the worker. The QueueLength property value is 0 for both the client and the worker.

fprintf("Queue length on the client: %i\n",queue.QueueLength)

Queue length on the client: 0

parfor idx = 1 fprintf("Queue length on the worker: %i\n",queue.QueueLength) end

Queue length on the worker: 0

Next, send a message to the queue from the worker. Then, use the QueueLength property to find the length of the queue. With Destination set to "creator", the QueueLength property value is 1 on the client (which created the queue) and 0 on the worker.

parfor idx = 1 send(queue,"A message"); end fprintf("Queue length on the client: %i\n",queue.QueueLength)

Queue length on the client: 1

parfor idx = 1 fprintf("Queue length on the worker: %i\n",queue.QueueLength) end

Queue length on the worker: 0

Use poll to retrieve the message from the queue.

Check the length of the queue again. The QueueLength property value is now 0 because you have removed a message.

fprintf("Queue length on the client: %i\n",queue.QueueLength)

Queue length on the client: 0

Create a PollableDataQueue object with Destination set to "any". This command creates a PollableDataQueue object that the client or any worker in the pool can poll to receive data.

queueAny = parallel.pool.PollableDataQueue(Destination="any")

queueAny = PollableDataQueue with properties:

      QueueLength: 0
         IsClosed: false

Send a message to this queue.

parfor idx = 1 send(queueAny,"Another message"); end

Check the queue length. With Destination set to "any", both the client and the worker show a QueueLength property value of 1, demonstrating that the client or worker can poll the queue to receive data.

fprintf("Queue length on the client: %i\n", queueAny.QueueLength);

Queue length on the client: 1

parfor idx = 1 fprintf("Queue length on the worker: %i\n",queueAny.QueueLength); end

Queue length on the worker: 1

Finally, retrieve the message from the queue and check the queue length. The QueueLength property value is 0 because the queue processing is complete.

fprintf("Queue length o the client: %i\n",queueAny.QueueLength);

Queue length o the client: 0

parfor idx = 1 fprintf("Queue length on the worker: %i\n",queueAny.QueueLength); end

Queue length on the worker: 0

Tips

Extended Capabilities

Version History

Introduced in R2017a

expand all

Use the close object function to close a PollableDataQueue object. When you close aPollableDataQueue, you change the isClosed property to true and you can no longer send data using thePollableDataQueue object.

You can specify the destination behavior of a PollableDataQueue object using the Destination name-value argument. For example, to create a PollableDataQueue object that can send messages or data to any worker or client in the pool, set theDestination name-value argument to "any".

You can create a PollableDataQueue object on a worker and transfer it to another worker via the client to transfer data or messages between workers.