celery.app.control — Celery 5.5.2 documentation (original) (raw)

This document describes the current stable version of Celery (5.5). For development docs,go here.

Worker Remote Control Client.

Client for worker remote control commands. Server implementation is in celery.worker.control. There are two types of remote control commands:

class celery.app.control.Control(app=None)[source]

Worker remote control client.

class Mailbox(namespace, type='direct', connection=None, clock=None, accept=None, serializer=None, producer_pool=None, queue_ttl=None, queue_expires=None, reply_queue_ttl=None, reply_queue_expires=10.0)

Process Mailbox.

Node(hostname=None, state=None, channel=None, handlers=None)

abcast(command, kwargs=None)

accept = ['json']

Only accepts json messages by default.

call(destination, command, kwargs=None, timeout=None, callback=None, channel=None)

cast(destination, command, kwargs=None)

connection = None

Connection (if bound).

exchange = None

mailbox exchange (init by constructor).

exchange_fmt = '%s.pidbox'

get_queue(hostname)

get_reply_queue()

multi_call(command, kwargs=None, timeout=1, limit=None, callback=None, channel=None)

namespace = None

Name of application.

node_cls

alias of Node

property oid

producer_or_acquire(producer=None, channel=None)

property producer_pool

reply_exchange = None

exchange to send replies to.

reply_exchange_fmt = 'reply.%s.pidbox'

property reply_queue

serializer = None

Message serializer

type = 'direct'

Exchange type (usually direct, or fanout for broadcast).

add_consumer(queue, exchange=None, exchange_type='direct', routing_key=None, options=None, destination=None, **kwargs)[source]

Tell all (or specific) workers to start consuming from a new queue.

Only the queue name is required as if only the queue is specified then the exchange/routing key will be set to the same name ( like automatic queues do).

Note

This command does not respect the default queue/exchange options in the configuration.

Parameters:

See also

broadcast() for supported keyword arguments.

autoscale(max, min, destination=None, **kwargs)[source]

Change worker(s) autoscale setting.

See also

Supports the same arguments as broadcast().

broadcast(command, arguments=None, destination=None, connection=None, reply=False, timeout=1.0, limit=None, callback=None, channel=None, pattern=None, matcher=None, **extra_kwargs)[source]

Broadcast a control command to the celery workers.

Parameters:

cancel_consumer(queue, destination=None, **kwargs)[source]

Tell all (or specific) workers to stop consuming from queue.

See also

Supports the same arguments as broadcast().

disable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to disable events.

See also

Supports the same arguments as broadcast().

discard_all(connection=None)

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters:

connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.

Returns:

the number of tasks discarded.

Return type:

int

election(id, topic, action=None, connection=None)[source]

enable_events(destination=None, **kwargs)[source]

Tell all (or specific) workers to enable events.

See also

Supports the same arguments as broadcast().

heartbeat(destination=None, **kwargs)[source]

Tell worker(s) to send a heartbeat immediately.

See also

Supports the same arguments as broadcast()

property inspect

Create new Inspect instance.

ping(destination=None, timeout=1.0, **kwargs)[source]

Ping all (or specific) workers.

app.control.ping() [{'celery@node1': {'ok': 'pong'}}, {'celery@node2': {'ok': 'pong'}}] app.control.ping(destination=['celery@node2']) [{'celery@node2': {'ok': 'pong'}}]

Returns:

List of {HOSTNAME: {'ok': 'pong'}} dictionaries.

Return type:

List[Dict]

See also

broadcast() for supported keyword arguments.

pool_grow(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to grow the pool by n.

See also

Supports the same arguments as broadcast().

pool_restart(modules=None, reload=False, reloader=None, destination=None, **kwargs)[source]

Restart the execution pools of all or specific workers.

Keyword Arguments:

See also

Supports the same arguments as broadcast()

pool_shrink(n=1, destination=None, **kwargs)[source]

Tell all (or specific) workers to shrink the pool by n.

See also

Supports the same arguments as broadcast().

purge(connection=None)[source]

Discard all waiting tasks.

This will ignore all tasks waiting for execution, and they will be deleted from the messaging server.

Parameters:

connection (kombu.Connection) – Optional specific connection instance to use. If not provided a connection will be acquired from the connection pool.

Returns:

the number of tasks discarded.

Return type:

int

rate_limit(task_name, rate_limit, destination=None, **kwargs)[source]

Tell workers to set a new rate limit for task by type.

Parameters:

See also

broadcast() for supported keyword arguments.

revoke(task_id, destination=None, terminate=False, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to revoke a task by id (or list of ids).

If a task is revoked, the workers will ignore the task and not execute it after all.

Parameters:

See also

broadcast() for supported keyword arguments.

Tell all (or specific) workers to revoke a task by headers.

If a task is revoked, the workers will ignore the task and not execute it after all.

Parameters:

See also

broadcast() for supported keyword arguments.

shutdown(destination=None, **kwargs)[source]

Shutdown worker(s).

See also

Supports the same arguments as broadcast()

terminate(task_id, destination=None, signal='SIGTERM', **kwargs)[source]

Tell all (or specific) workers to terminate a task by id (or list of ids).

See also

This is just a shortcut to revoke() with the terminate argument enabled.

time_limit(task_name, soft=None, hard=None, destination=None, **kwargs)[source]

Tell workers to set time limits for a task by type.

Parameters:

class celery.app.control.Inspect(destination=None, timeout=1.0, callback=None, connection=None, app=None, limit=None, pattern=None, matcher=None)[source]

API for inspecting workers.

This class provides proxy for accessing Inspect API of workers. The API is defined in celery.worker.control

active(safe=None)[source]

Return list of tasks currently executed by workers.

Parameters:

safe (Boolean) – Set to True to disable deserialization.

Returns:

Dictionary {HOSTNAME: [TASK_INFO,...]}.

Return type:

Dict

See also

For TASK_INFO details see query_task() return value.

active_queues()[source]

Return information about queues from which worker consumes tasks.

Returns:

Dictionary {HOSTNAME: [QUEUE_INFO, QUEUE_INFO,...]}.

Return type:

Dict

Here is the list of QUEUE_INFO fields:

See also

See the RabbitMQ/AMQP documentation for more details aboutqueue_info fields.

Note

The queue_info fields are RabbitMQ/AMQP oriented. Not all fields applies for other transports.

app = None

clock()[source]

Get the Clock value on workers.

app.control.inspect().clock() {'celery@node1': {'clock': 12}}

Returns:

Dictionary {HOSTNAME: CLOCK_VALUE}.

Return type:

Dict

conf(with_defaults=False)[source]

Return configuration of each worker.

Parameters:

with_defaults (bool) – if set to True, method returns also configuration options with default values.

Returns:

Dictionary {HOSTNAME: WORKER_CONFIGURATION}.

Return type:

Dict

See also

WORKER_CONFIGURATION is a dictionary containing current configuration options. See Configuration and defaults for possible values.

hello(from_node, revoked=None)[source]

memdump(samples=10)[source]

Dump statistics of previous memsample requests.

Note

Requires the psutils library.

memsample()[source]

Return sample current RSS memory usage.

Note

Requires the psutils library.

objgraph(type='Request', n=200, max_depth=10)[source]

Create graph of uncollected objects (memory-leak debugging).

Parameters:

Returns:

Dictionary {'filename': FILENAME}

Return type:

Dict

Note

Requires the objgraph library.

ping(destination=None)[source]

Ping all (or specific) workers.

app.control.inspect().ping() {'celery@node1': {'ok': 'pong'}, 'celery@node2': {'ok': 'pong'}} app.control.inspect().ping(destination=['celery@node1']) {'celery@node1': {'ok': 'pong'}}

Parameters:

destination (List) – If set, a list of the hosts to send the command to, when empty broadcast to all workers.

Returns:

Dictionary {HOSTNAME: {'ok': 'pong'}}.

Return type:

Dict

See also

broadcast() for supported keyword arguments.

query_task(*ids)[source]

Return detail of tasks currently executed by workers.

Parameters:

*ids (str) – IDs of tasks to be queried.

Returns:

Dictionary {HOSTNAME: {TASK_ID: [STATE, TASK_INFO]}}.

Return type:

Dict

Here is the list of TASK_INFO fields:

registered(*taskinfoitems)[source]

Return all registered tasks per worker.

app.control.inspect().registered() {'celery@node1': ['task1', 'task1']} app.control.inspect().registered('serializer', 'max_retries') {'celery@node1': ['task_foo [serializer=json max_retries=3]', 'tasb_bar [serializer=json max_retries=3]']}

Parameters:

taskinfoitems (Sequence _[_str]) – List of Taskattributes to include.

Returns:

Dictionary {HOSTNAME: [TASK1_INFO, ...]}.

Return type:

Dict

registered_tasks(*taskinfoitems)

Return all registered tasks per worker.

app.control.inspect().registered() {'celery@node1': ['task1', 'task1']} app.control.inspect().registered('serializer', 'max_retries') {'celery@node1': ['task_foo [serializer=json max_retries=3]', 'tasb_bar [serializer=json max_retries=3]']}

Parameters:

taskinfoitems (Sequence _[_str]) – List of Taskattributes to include.

Returns:

Dictionary {HOSTNAME: [TASK1_INFO, ...]}.

Return type:

Dict

report()[source]

Return human readable report for each worker.

Returns:

Dictionary {HOSTNAME: {'ok': REPORT_STRING}}.

Return type:

Dict

reserved(safe=None)[source]

Return list of currently reserved tasks, not including scheduled/active.

Returns:

Dictionary {HOSTNAME: [TASK_INFO,...]}.

Return type:

Dict

See also

For TASK_INFO details see query_task() return value.

revoked()[source]

Return list of revoked tasks.

app.control.inspect().revoked() {'celery@node1': ['16f527de-1c72-47a6-b477-c472b92fef7a']}

Returns:

Dictionary {HOSTNAME: [TASK_ID, ...]}.

Return type:

Dict

scheduled(safe=None)[source]

Return list of scheduled tasks with details.

Returns:

Dictionary {HOSTNAME: [TASK_SCHEDULED_INFO,...]}.

Return type:

Dict

Here is the list of TASK_SCHEDULED_INFO fields:

See also

For more details about TASK_INFO see query_task() return value.

stats()[source]

Return statistics of worker.

Returns:

Dictionary {HOSTNAME: STAT_INFO}.

Return type:

Dict

Here is the list of STAT_INFO fields:

celery.app.control.flatten_reply(reply)[source]

Flatten node replies.

Convert from a list of replies in this format:

[{'a@example.com': reply}, {'b@example.com': reply}]

into this format:

{'a@example.com': reply, 'b@example.com': reply}